From 6723c01a31807bb145c9528bddf2d6ceba9f5d63 Mon Sep 17 00:00:00 2001 From: Sorunome Date: Tue, 22 Dec 2020 17:12:38 +0100 Subject: [PATCH] feat: Lazy-send room keys, chunked and sorted by importance --- lib/encryption/key_manager.dart | 4 +- lib/encryption/olm_manager.dart | 23 +++++-- lib/src/client.dart | 42 +++++++++++- lib/src/database/database.dart | 7 +- lib/src/database/database.g.dart | 91 +++++++++++++++++++++---- lib/src/database/database.moor | 4 +- lib/src/utils/device_keys_list.dart | 7 +- test/client_test.dart | 101 ++++++++++++++++++++++------ 8 files changed, 234 insertions(+), 45 deletions(-) diff --git a/lib/encryption/key_manager.dart b/lib/encryption/key_manager.dart index a8038cf7..d4000651 100644 --- a/lib/encryption/key_manager.dart +++ b/lib/encryption/key_manager.dart @@ -384,7 +384,7 @@ class KeyManager { sess.outboundGroupSession.session_id()); } // send out the key - await client.sendToDeviceEncrypted( + await client.sendToDeviceEncryptedChunked( devicesToReceive, EventTypes.RoomKey, rawSession); } } catch (e, s) { @@ -484,7 +484,7 @@ class KeyManager { key: client.userID, ); try { - await client.sendToDeviceEncrypted( + await client.sendToDeviceEncryptedChunked( deviceKeys, EventTypes.RoomKey, rawSession); await storeOutboundGroupSession(roomId, sess); _outboundGroupSessions[roomId] = sess; diff --git a/lib/encryption/olm_manager.dart b/lib/encryption/olm_manager.dart index b3130381..cdae41b4 100644 --- a/lib/encryption/olm_manager.dart +++ b/lib/encryption/olm_manager.dart @@ -227,7 +227,7 @@ class OlmManager { if (client.database == null) { return; } - _olmSessions[session.identityKey] ??= []; + _olmSessions[session.identityKey] ??= []; final ix = _olmSessions[session.identityKey] .indexWhere((s) => s.sessionId == session.sessionId); if (ix == -1) { @@ -262,10 +262,22 @@ class OlmManager { if (type != 0 && type != 1) { throw ('Unknown message type'); } + final device = client.userDeviceKeys[event.sender]?.deviceKeys?.values + ?.firstWhere((d) => d.curve25519Key == senderKey, orElse: () => null); final existingSessions = olmSessions[senderKey]; - final updateSessionUsage = (OlmSession session) => runInRoot(() { - session.lastReceived = DateTime.now(); - storeOlmSession(session); + final updateSessionUsage = ([OlmSession session]) => runInRoot(() { + if (session != null) { + session.lastReceived = DateTime.now(); + storeOlmSession(session); + } + if (device != null) { + device.lastActive = DateTime.now(); + client.database?.setLastActiveUserDeviceKey( + device.lastActive.millisecondsSinceEpoch, + client.id, + device.userId, + device.deviceId); + } }); if (existingSessions != null) { for (var session in existingSessions) { @@ -302,6 +314,7 @@ class OlmManager { session: newSession, lastReceived: DateTime.now(), ))); + updateSessionUsage(); } catch (_) { newSession?.free(); rethrow; @@ -416,7 +429,6 @@ class OlmManager { Future startOutgoingOlmSessions(List deviceKeys) async { Logs().v( '[OlmManager] Starting session with ${deviceKeys.length} devices...'); - var requestingKeysFrom = >{}; for (var device in deviceKeys) { if (requestingKeysFrom[device.userId] == null) { @@ -468,7 +480,6 @@ class OlmManager { if (sess.isEmpty) { throw ('No olm session found for ${device.userId}:${device.deviceId}'); } - final fullPayload = { 'type': type, 'content': payload, diff --git a/lib/src/client.dart b/lib/src/client.dart index 14dad06d..3df64343 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -1426,8 +1426,8 @@ sort order of ${prevState.sortOrder}. This should never happen...'''); final deviceId = rawDeviceKeyEntry.key; // Set the new device key for this device - final entry = - DeviceKeys.fromMatrixDeviceKeys(rawDeviceKeyEntry.value, this); + final entry = DeviceKeys.fromMatrixDeviceKeys( + rawDeviceKeyEntry.value, this, oldKeys[deviceId]?.lastActive); if (entry.isValid) { // is this a new key or the same one as an old one? // better store an update - the signatures might have changed! @@ -1453,6 +1453,7 @@ sort order of ${prevState.sortOrder}. This should never happen...'''); json.encode(entry.toJson()), entry.directVerified, entry.blocked, + entry.lastActive.millisecondsSinceEpoch, )); } } else if (oldKeys.containsKey(deviceId)) { @@ -1621,6 +1622,43 @@ sort order of ${prevState.sortOrder}. This should never happen...'''); eventType, messageId ?? generateUniqueTransactionId(), data); } + /// Sends an encrypted [message] of this [type] to these [deviceKeys]. This request happens + /// partly in the background and partly in the foreground. It automatically chunks sending + /// to device keys based on activity + Future sendToDeviceEncryptedChunked( + List deviceKeys, + String eventType, + Map message, + ) async { + if (!encryptionEnabled) return; + deviceKeys.removeWhere((DeviceKeys k) => + k.blocked || (k.userId == userID && k.deviceId == deviceID)); + if (deviceKeys.isEmpty) return; + message = + json.decode(json.encode(message)); // make sure we deep-copy the message + // make sure all the olm sessions are loaded from database + Logs().v('Sending to device chunked... (${deviceKeys.length} devices)'); + // sort so that devices we last received messages from get our message first + deviceKeys.sort((keyA, keyB) => keyB.lastActive.compareTo(keyA.lastActive)); + Logs().v(deviceKeys.map((k) => '${k.userId}:${k.deviceId}').toList()); + // and now send out in chunks of 20 + const chunkSize = 20; + for (var i = 0; i < deviceKeys.length; i += chunkSize) { + Logs().v('Sending chunk $i...'); + final chunk = deviceKeys.sublist( + i, + i + chunkSize > deviceKeys.length + ? deviceKeys.length + : i + chunkSize); + // and send + Logs().v(chunk.map((k) => '${k.userId}:${k.deviceId}').toList()); + final future = sendToDeviceEncrypted(chunk, eventType, message); + if (i == 0) { + await future; + } + } + } + /// Whether all push notifications are muted using the [.m.rule.master] /// rule of the push rules: https://matrix.org/docs/spec/client_server/r0.6.0#m-rule-master bool get allPushNotificationsMuted { diff --git a/lib/src/database/database.dart b/lib/src/database/database.dart index 2099ca8c..fc2a58df 100644 --- a/lib/src/database/database.dart +++ b/lib/src/database/database.dart @@ -54,7 +54,7 @@ class Database extends _$Database { Database.connect(DatabaseConnection connection) : super.connect(connection); @override - int get schemaVersion => 8; + int get schemaVersion => 9; int get maxFileSize => 1 * 1024 * 1024; @@ -143,6 +143,11 @@ class Database extends _$Database { inboundGroupSessions, inboundGroupSessions.allowedAtIndex); from++; } + if (from == 8) { + await m.addColumnIfNotExists( + userDeviceKeysKey, userDeviceKeysKey.lastActive); + from++; + } } catch (e, s) { Logs().e('Database migration failed', e, s); onError.add(SdkError(exception: e, stackTrace: s)); diff --git a/lib/src/database/database.g.dart b/lib/src/database/database.g.dart index 85cc7538..5eecc674 100644 --- a/lib/src/database/database.g.dart +++ b/lib/src/database/database.g.dart @@ -768,13 +768,15 @@ class DbUserDeviceKeysKey extends DataClass final String content; final bool verified; final bool blocked; + final int lastActive; DbUserDeviceKeysKey( {@required this.clientId, @required this.userId, @required this.deviceId, @required this.content, this.verified, - this.blocked}); + this.blocked, + this.lastActive}); factory DbUserDeviceKeysKey.fromData( Map data, GeneratedDatabase db, {String prefix}) { @@ -795,6 +797,8 @@ class DbUserDeviceKeysKey extends DataClass boolType.mapFromDatabaseResponse(data['${effectivePrefix}verified']), blocked: boolType.mapFromDatabaseResponse(data['${effectivePrefix}blocked']), + lastActive: intType + .mapFromDatabaseResponse(data['${effectivePrefix}last_active']), ); } @override @@ -818,6 +822,9 @@ class DbUserDeviceKeysKey extends DataClass if (!nullToAbsent || blocked != null) { map['blocked'] = Variable(blocked); } + if (!nullToAbsent || lastActive != null) { + map['last_active'] = Variable(lastActive); + } return map; } @@ -840,6 +847,9 @@ class DbUserDeviceKeysKey extends DataClass blocked: blocked == null && nullToAbsent ? const Value.absent() : Value(blocked), + lastActive: lastActive == null && nullToAbsent + ? const Value.absent() + : Value(lastActive), ); } @@ -853,6 +863,7 @@ class DbUserDeviceKeysKey extends DataClass content: serializer.fromJson(json['content']), verified: serializer.fromJson(json['verified']), blocked: serializer.fromJson(json['blocked']), + lastActive: serializer.fromJson(json['last_active']), ); } @override @@ -865,6 +876,7 @@ class DbUserDeviceKeysKey extends DataClass 'content': serializer.toJson(content), 'verified': serializer.toJson(verified), 'blocked': serializer.toJson(blocked), + 'last_active': serializer.toJson(lastActive), }; } @@ -874,7 +886,8 @@ class DbUserDeviceKeysKey extends DataClass String deviceId, String content, bool verified, - bool blocked}) => + bool blocked, + int lastActive}) => DbUserDeviceKeysKey( clientId: clientId ?? this.clientId, userId: userId ?? this.userId, @@ -882,6 +895,7 @@ class DbUserDeviceKeysKey extends DataClass content: content ?? this.content, verified: verified ?? this.verified, blocked: blocked ?? this.blocked, + lastActive: lastActive ?? this.lastActive, ); @override String toString() { @@ -891,7 +905,8 @@ class DbUserDeviceKeysKey extends DataClass ..write('deviceId: $deviceId, ') ..write('content: $content, ') ..write('verified: $verified, ') - ..write('blocked: $blocked') + ..write('blocked: $blocked, ') + ..write('lastActive: $lastActive') ..write(')')) .toString(); } @@ -903,8 +918,10 @@ class DbUserDeviceKeysKey extends DataClass userId.hashCode, $mrjc( deviceId.hashCode, - $mrjc(content.hashCode, - $mrjc(verified.hashCode, blocked.hashCode)))))); + $mrjc( + content.hashCode, + $mrjc(verified.hashCode, + $mrjc(blocked.hashCode, lastActive.hashCode))))))); @override bool operator ==(dynamic other) => identical(this, other) || @@ -914,7 +931,8 @@ class DbUserDeviceKeysKey extends DataClass other.deviceId == this.deviceId && other.content == this.content && other.verified == this.verified && - other.blocked == this.blocked); + other.blocked == this.blocked && + other.lastActive == this.lastActive); } class UserDeviceKeysKeyCompanion extends UpdateCompanion { @@ -924,6 +942,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { final Value content; final Value verified; final Value blocked; + final Value lastActive; const UserDeviceKeysKeyCompanion({ this.clientId = const Value.absent(), this.userId = const Value.absent(), @@ -931,6 +950,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { this.content = const Value.absent(), this.verified = const Value.absent(), this.blocked = const Value.absent(), + this.lastActive = const Value.absent(), }); UserDeviceKeysKeyCompanion.insert({ @required int clientId, @@ -939,6 +959,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { @required String content, this.verified = const Value.absent(), this.blocked = const Value.absent(), + this.lastActive = const Value.absent(), }) : clientId = Value(clientId), userId = Value(userId), deviceId = Value(deviceId), @@ -950,6 +971,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { Expression content, Expression verified, Expression blocked, + Expression lastActive, }) { return RawValuesInsertable({ if (clientId != null) 'client_id': clientId, @@ -958,6 +980,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { if (content != null) 'content': content, if (verified != null) 'verified': verified, if (blocked != null) 'blocked': blocked, + if (lastActive != null) 'last_active': lastActive, }); } @@ -967,7 +990,8 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { Value deviceId, Value content, Value verified, - Value blocked}) { + Value blocked, + Value lastActive}) { return UserDeviceKeysKeyCompanion( clientId: clientId ?? this.clientId, userId: userId ?? this.userId, @@ -975,6 +999,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { content: content ?? this.content, verified: verified ?? this.verified, blocked: blocked ?? this.blocked, + lastActive: lastActive ?? this.lastActive, ); } @@ -999,6 +1024,9 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { if (blocked.present) { map['blocked'] = Variable(blocked.value); } + if (lastActive.present) { + map['last_active'] = Variable(lastActive.value); + } return map; } @@ -1010,7 +1038,8 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion { ..write('deviceId: $deviceId, ') ..write('content: $content, ') ..write('verified: $verified, ') - ..write('blocked: $blocked') + ..write('blocked: $blocked, ') + ..write('lastActive: $lastActive') ..write(')')) .toString(); } @@ -1071,9 +1100,17 @@ class UserDeviceKeysKey extends Table defaultValue: const CustomExpression('false')); } + final VerificationMeta _lastActiveMeta = const VerificationMeta('lastActive'); + GeneratedIntColumn _lastActive; + GeneratedIntColumn get lastActive => _lastActive ??= _constructLastActive(); + GeneratedIntColumn _constructLastActive() { + return GeneratedIntColumn('last_active', $tableName, true, + $customConstraints: ''); + } + @override List get $columns => - [clientId, userId, deviceId, content, verified, blocked]; + [clientId, userId, deviceId, content, verified, blocked, lastActive]; @override UserDeviceKeysKey get asDslTable => this; @override @@ -1118,6 +1155,12 @@ class UserDeviceKeysKey extends Table context.handle(_blockedMeta, blocked.isAcceptableOrUnknown(data['blocked'], _blockedMeta)); } + if (data.containsKey('last_active')) { + context.handle( + _lastActiveMeta, + lastActive.isAcceptableOrUnknown( + data['last_active'], _lastActiveMeta)); + } return context; } @@ -6462,17 +6505,24 @@ abstract class _$Database extends GeneratedDatabase { ); } - Future storeUserDeviceKey(int client_id, String user_id, - String device_id, String content, bool verified, bool blocked) { + Future storeUserDeviceKey( + int client_id, + String user_id, + String device_id, + String content, + bool verified, + bool blocked, + int last_active) { return customInsert( - 'INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked)', + 'INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked, last_active) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked, :last_active)', variables: [ Variable.withInt(client_id), Variable.withString(user_id), Variable.withString(device_id), Variable.withString(content), Variable.withBool(verified), - Variable.withBool(blocked) + Variable.withBool(blocked), + Variable.withInt(last_active) ], updates: {userDeviceKeysKey}, ); @@ -6492,6 +6542,21 @@ abstract class _$Database extends GeneratedDatabase { ); } + Future setLastActiveUserDeviceKey( + int last_active, int client_id, String user_id, String device_id) { + return customUpdate( + 'UPDATE user_device_keys_key SET last_active = :last_active WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id', + variables: [ + Variable.withInt(last_active), + Variable.withInt(client_id), + Variable.withString(user_id), + Variable.withString(device_id) + ], + updates: {userDeviceKeysKey}, + updateKind: UpdateKind.update, + ); + } + Future setVerifiedUserCrossSigningKey( bool verified, int client_id, String user_id, String public_key) { return customUpdate( diff --git a/lib/src/database/database.moor b/lib/src/database/database.moor index 427d7b60..4bddbc90 100644 --- a/lib/src/database/database.moor +++ b/lib/src/database/database.moor @@ -28,6 +28,7 @@ CREATE TABLE user_device_keys_key ( content TEXT NOT NULL, verified BOOLEAN DEFAULT false, blocked BOOLEAN DEFAULT false, + last_active BIGINT, UNIQUE(client_id, user_id, device_id) ) as DbUserDeviceKeysKey; CREATE INDEX user_device_keys_key_index ON user_device_keys_key(client_id); @@ -199,8 +200,9 @@ markInboundGroupSessionsAsNeedingUpload: UPDATE inbound_group_sessions SET uploa storeUserDeviceKeysInfo: INSERT OR REPLACE INTO user_device_keys (client_id, user_id, outdated) VALUES (:client_id, :user_id, :outdated); setVerifiedUserDeviceKey: UPDATE user_device_keys_key SET verified = :verified WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id; setBlockedUserDeviceKey: UPDATE user_device_keys_key SET blocked = :blocked WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id; -storeUserDeviceKey: INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked); +storeUserDeviceKey: INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked, last_active) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked, :last_active); removeUserDeviceKey: DELETE FROM user_device_keys_key WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id; +setLastActiveUserDeviceKey: UPDATE user_device_keys_key SET last_active = :last_active WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id; setVerifiedUserCrossSigningKey: UPDATE user_cross_signing_keys SET verified = :verified WHERE client_id = :client_id AND user_id = :user_id AND public_key = :public_key; setBlockedUserCrossSigningKey: UPDATE user_cross_signing_keys SET blocked = :blocked WHERE client_id = :client_id AND user_id = :user_id AND public_key = :public_key; storeUserCrossSigningKey: INSERT OR REPLACE INTO user_cross_signing_keys (client_id, user_id, public_key, content, verified, blocked) VALUES (:client_id, :user_id, :public_key, :content, :verified, :blocked); diff --git a/lib/src/utils/device_keys_list.dart b/lib/src/utils/device_keys_list.dart index 96402b5b..31c2e368 100644 --- a/lib/src/utils/device_keys_list.dart +++ b/lib/src/utils/device_keys_list.dart @@ -363,6 +363,7 @@ class CrossSigningKey extends SignableKey { class DeviceKeys extends SignableKey { String get deviceId => identifier; List algorithms; + DateTime lastActive; String get curve25519Key => keys['curve25519:$deviceId']; String get deviceDisplayName => @@ -407,11 +408,13 @@ class DeviceKeys extends SignableKey { ?.setBlockedUserDeviceKey(newBlocked, client.id, userId, deviceId); } - DeviceKeys.fromMatrixDeviceKeys(MatrixDeviceKeys k, Client cl) + DeviceKeys.fromMatrixDeviceKeys(MatrixDeviceKeys k, Client cl, + [DateTime lastActiveTs]) : super.fromJson(k.toJson().copy(), cl) { final json = toJson(); identifier = k.deviceId; algorithms = json['algorithms'].cast(); + lastActive = lastActiveTs ?? DateTime.now(); } DeviceKeys.fromDb(DbUserDeviceKeysKey dbEntry, Client cl) @@ -421,6 +424,7 @@ class DeviceKeys extends SignableKey { algorithms = json['algorithms'].cast(); _verified = dbEntry.verified; blocked = dbEntry.blocked; + lastActive = DateTime.fromMillisecondsSinceEpoch(dbEntry.lastActive ?? 0); } DeviceKeys.fromJson(Map json, Client cl) @@ -428,6 +432,7 @@ class DeviceKeys extends SignableKey { final json = toJson(); identifier = json['device_id']; algorithms = json['algorithms'].cast(); + lastActive = DateTime.fromMillisecondsSinceEpoch(0); } KeyVerification startVerification() { diff --git a/test/client_test.dart b/test/client_test.dart index 72c8f3d2..ba49f55a 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -17,6 +17,7 @@ */ import 'dart:async'; +import 'dart:convert'; import 'dart:typed_data'; import 'package:famedlysdk/famedlysdk.dart'; @@ -29,6 +30,7 @@ import 'package:logger/logger.dart'; import 'package:famedlysdk/src/utils/room_update.dart'; import 'package:olm/olm.dart' as olm; import 'package:test/test.dart'; +import 'package:canonical_json/canonical_json.dart'; import 'fake_client.dart'; import 'fake_database.dart'; @@ -359,32 +361,93 @@ void main() { 'mxc://example.org/SEsfnsuifSDFSSEF'); expect(aliceProfile.displayname, 'Alice Margatroid'); }); - var deviceKeys = DeviceKeys.fromJson({ - 'user_id': '@alice:example.com', - 'device_id': 'JLAFKJWSCS', - 'algorithms': [ - AlgorithmTypes.olmV1Curve25519AesSha2, - AlgorithmTypes.megolmV1AesSha2 - ], - 'keys': { - 'curve25519:JLAFKJWSCS': '3C5BFWi2Y8MaVvjM8M22DBmh24PmgR0nPvJOIArzgyI', - 'ed25519:JLAFKJWSCS': 'lEuiRJBit0IG6nUf5pUzWTUEsRVVe/HJkoKuEww9ULI' - }, - 'signatures': { - '@alice:example.com': { - 'ed25519:JLAFKJWSCS': - 'dSO80A01XiigH3uBiDVx/EjzaoycHcjq9lfQX0uWsqxl2giMIiSPR8a4d291W1ihKJL/a+myXS367WT6NAIcBA' - } - } - }, matrix); test('sendToDeviceEncrypted', () async { + if (!olmEnabled) { + return; + } + FakeMatrixApi.calledEndpoints.clear(); await matrix.sendToDeviceEncrypted( - [deviceKeys], + matrix.userDeviceKeys['@alice:example.com'].deviceKeys.values + .toList(), 'm.message', { 'msgtype': 'm.text', 'body': 'Hello world', }); + expect( + FakeMatrixApi.calledEndpoints.keys.any( + (k) => k.startsWith('/client/r0/sendToDevice/m.room.encrypted')), + true); + }); + test('sendToDeviceEncryptedChunked', () async { + if (!olmEnabled) { + return; + } + FakeMatrixApi.calledEndpoints.clear(); + await matrix.sendToDeviceEncryptedChunked( + matrix.userDeviceKeys['@alice:example.com'].deviceKeys.values + .toList(), + 'm.message', + { + 'msgtype': 'm.text', + 'body': 'Hello world', + }); + await Future.delayed(Duration(milliseconds: 50)); + expect( + FakeMatrixApi.calledEndpoints.keys + .where((k) => + k.startsWith('/client/r0/sendToDevice/m.room.encrypted')) + .length, + 1); + + final deviceKeys = []; + for (var i = 0; i < 30; i++) { + final account = olm.Account(); + account.create(); + final keys = json.decode(account.identity_keys()); + final userId = '@testuser:example.org'; + final deviceId = 'DEVICE$i'; + final keyObj = { + 'user_id': userId, + 'device_id': deviceId, + 'algorithms': [ + 'm.olm.v1.curve25519-aes-sha2', + 'm.megolm.v1.aes-sha2', + ], + 'keys': { + 'curve25519:$deviceId': keys['curve25519'], + 'ed25519:$deviceId': keys['ed25519'], + }, + }; + final signature = + account.sign(String.fromCharCodes(canonicalJson.encode(keyObj))); + keyObj['signatures'] = { + userId: { + 'ed25519:$deviceId': signature, + }, + }; + account.free(); + deviceKeys.add(DeviceKeys.fromJson(keyObj, matrix)); + } + FakeMatrixApi.calledEndpoints.clear(); + await matrix.sendToDeviceEncryptedChunked(deviceKeys, 'm.message', { + 'msgtype': 'm.text', + 'body': 'Hello world', + }); + // it should send the first chunk right away + expect( + FakeMatrixApi.calledEndpoints.keys + .where((k) => + k.startsWith('/client/r0/sendToDevice/m.room.encrypted')) + .length, + 1); + await Future.delayed(Duration(milliseconds: 50)); + expect( + FakeMatrixApi.calledEndpoints.keys + .where((k) => + k.startsWith('/client/r0/sendToDevice/m.room.encrypted')) + .length, + 2); }); test('Test the fake store api', () async { final database = await getDatabase(null);