diff --git a/lib/encryption/olm_manager.dart b/lib/encryption/olm_manager.dart index 38db9325..4153e1d0 100644 --- a/lib/encryption/olm_manager.dart +++ b/lib/encryption/olm_manager.dart @@ -242,10 +242,7 @@ class OlmManager { } } - void storeOlmSession(OlmSession session) { - if (client.database == null) { - return; - } + Future storeOlmSession(OlmSession session) async { _olmSessions[session.identityKey] ??= []; final ix = _olmSessions[session.identityKey] .indexWhere((s) => s.sessionId == session.sessionId); @@ -256,7 +253,10 @@ class OlmManager { // update an existing session _olmSessions[session.identityKey][ix] = session; } - client.database.storeOlmSession( + if (client.database == null) { + return; + } + await client.database.storeOlmSession( client.id, session.identityKey, session.sessionId, @@ -285,14 +285,14 @@ class OlmManager { final device = client.userDeviceKeys[event.sender]?.deviceKeys?.values ?.firstWhere((d) => d.curve25519Key == senderKey, orElse: () => null); final existingSessions = olmSessions[senderKey]; - final updateSessionUsage = ([OlmSession session]) => runInRoot(() { + final updateSessionUsage = ([OlmSession session]) => runInRoot(() async { if (session != null) { session.lastReceived = DateTime.now(); - storeOlmSession(session); + await storeOlmSession(session); } if (device != null) { device.lastActive = DateTime.now(); - client.database?.setLastActiveUserDeviceKey( + await client.database?.setLastActiveUserDeviceKey( device.lastActive.millisecondsSinceEpoch, client.id, device.userId, @@ -542,9 +542,10 @@ class OlmManager { 'recipient_keys': {'ed25519': device.ed25519Key}, }; final encryptResult = sess.first.session.encrypt(json.encode(fullPayload)); - storeOlmSession(sess.first); + await storeOlmSession(sess.first); if (client.database != null) { - unawaited(client.database.setLastSentMessageUserDeviceKey( + // ignore: unawaited_futures + runInRoot(() => client.database.setLastSentMessageUserDeviceKey( json.encode({ 'type': type, 'content': payload, diff --git a/lib/src/client.dart b/lib/src/client.dart index 79068e35..fc885b17 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -978,6 +978,12 @@ class Client extends MatrixApi { if (encryptionEnabled) { encryption.onSync(); } + + // try to process the to_device queue + try { + await processToDeviceQueue(); + } catch (_) {} // we want to dispose any errors this throws + _retryDelay = Future.value(); } on MatrixException catch (e, s) { onSyncError.add(SdkError(exception: e, stackTrace: s)); @@ -1654,6 +1660,51 @@ sort order of ${prevState.sortOrder}. This should never happen...'''); } } + /// Processes the to_device queue and tries to send every entry. + /// This function MAY throw an error, which just means the to_device queue wasn't + /// proccessed all the way. + Future processToDeviceQueue() async { + if (database == null) { + return; + } + final entries = await database.getToDeviceQueue(id).get(); + for (final entry in entries) { + // ohgod what is this... + final data = (json.decode(entry.content) as Map).map((k, v) => + MapEntry>>( + k, + (v as Map).map((k, v) => MapEntry>( + k, Map.from(v))))); + await super.sendToDevice(entry.type, entry.txnId, data); + await database.deleteFromToDeviceQueue(id, entry.id); + } + } + + /// Sends a raw to_device event with a [eventType], a [txnId] and a content [data]. + /// Before sending, it tries to re-send potentially queued to_device events and adds + /// the current one to the queue, should it fail. + @override + Future sendToDevice( + String eventType, + String txnId, + Map>> messages, + ) async { + try { + await processToDeviceQueue(); + await super.sendToDevice(eventType, txnId, messages); + } catch (e, s) { + Logs().w( + '[Client] Problem while sending to_device event, retrying later...', + e, + s); + if (database != null) { + await database.insertIntoToDeviceQueue( + id, eventType, txnId, json.encode(messages)); + } + rethrow; + } + } + /// Send an (unencrypted) to device [message] of a specific [eventType] to all /// devices of a set of [users]. Future sendToDevicesOfUserIds( diff --git a/lib/src/database/database.dart b/lib/src/database/database.dart index 9bbaacba..f99d5576 100644 --- a/lib/src/database/database.dart +++ b/lib/src/database/database.dart @@ -70,7 +70,7 @@ class Database extends _$Database { Database.connect(DatabaseConnection connection) : super.connect(connection); @override - int get schemaVersion => 10; + int get schemaVersion => 11; int get maxFileSize => 1 * 1024 * 1024; @@ -170,6 +170,11 @@ class Database extends _$Database { await m.createIndexIfNotExists(olmSessionsIdentityIndex); from++; } + if (from == 10) { + await m.createTableIfNotExists(toDeviceQueue); + await m.createIndexIfNotExists(toDeviceQueueIndex); + from++; + } } catch (e, s) { api.Logs().e('Database migration failed', e, s); onError.add(SdkError(exception: e, stackTrace: s)); @@ -658,6 +663,8 @@ class Database extends _$Database { .go(); await (delete(ssssCache)..where((r) => r.clientId.equals(clientId))).go(); await (delete(clients)..where((r) => r.clientId.equals(clientId))).go(); + await (delete(toDeviceQueue)..where((r) => r.clientId.equals(clientId))) + .go(); } Future getUser(int clientId, String userId, sdk.Room room) async { diff --git a/lib/src/database/database.g.dart b/lib/src/database/database.g.dart index 621b0d15..25db0b21 100644 --- a/lib/src/database/database.g.dart +++ b/lib/src/database/database.g.dart @@ -5595,6 +5595,326 @@ class Presences extends Table with TableInfo { bool get dontWriteConstraints => true; } +class DbToDeviceQueue extends DataClass implements Insertable { + final int clientId; + final int id; + final String type; + final String txnId; + final String content; + DbToDeviceQueue( + {@required this.clientId, + @required this.id, + @required this.type, + @required this.txnId, + @required this.content}); + factory DbToDeviceQueue.fromData( + Map data, GeneratedDatabase db, + {String prefix}) { + final effectivePrefix = prefix ?? ''; + final intType = db.typeSystem.forDartType(); + final stringType = db.typeSystem.forDartType(); + return DbToDeviceQueue( + clientId: + intType.mapFromDatabaseResponse(data['${effectivePrefix}client_id']), + id: intType.mapFromDatabaseResponse(data['${effectivePrefix}id']), + type: stringType.mapFromDatabaseResponse(data['${effectivePrefix}type']), + txnId: + stringType.mapFromDatabaseResponse(data['${effectivePrefix}txn_id']), + content: + stringType.mapFromDatabaseResponse(data['${effectivePrefix}content']), + ); + } + @override + Map toColumns(bool nullToAbsent) { + final map = {}; + if (!nullToAbsent || clientId != null) { + map['client_id'] = Variable(clientId); + } + if (!nullToAbsent || id != null) { + map['id'] = Variable(id); + } + if (!nullToAbsent || type != null) { + map['type'] = Variable(type); + } + if (!nullToAbsent || txnId != null) { + map['txn_id'] = Variable(txnId); + } + if (!nullToAbsent || content != null) { + map['content'] = Variable(content); + } + return map; + } + + ToDeviceQueueCompanion toCompanion(bool nullToAbsent) { + return ToDeviceQueueCompanion( + clientId: clientId == null && nullToAbsent + ? const Value.absent() + : Value(clientId), + id: id == null && nullToAbsent ? const Value.absent() : Value(id), + type: type == null && nullToAbsent ? const Value.absent() : Value(type), + txnId: + txnId == null && nullToAbsent ? const Value.absent() : Value(txnId), + content: content == null && nullToAbsent + ? const Value.absent() + : Value(content), + ); + } + + factory DbToDeviceQueue.fromJson(Map json, + {ValueSerializer serializer}) { + serializer ??= moorRuntimeOptions.defaultSerializer; + return DbToDeviceQueue( + clientId: serializer.fromJson(json['client_id']), + id: serializer.fromJson(json['id']), + type: serializer.fromJson(json['type']), + txnId: serializer.fromJson(json['txn_id']), + content: serializer.fromJson(json['content']), + ); + } + @override + Map toJson({ValueSerializer serializer}) { + serializer ??= moorRuntimeOptions.defaultSerializer; + return { + 'client_id': serializer.toJson(clientId), + 'id': serializer.toJson(id), + 'type': serializer.toJson(type), + 'txn_id': serializer.toJson(txnId), + 'content': serializer.toJson(content), + }; + } + + DbToDeviceQueue copyWith( + {int clientId, int id, String type, String txnId, String content}) => + DbToDeviceQueue( + clientId: clientId ?? this.clientId, + id: id ?? this.id, + type: type ?? this.type, + txnId: txnId ?? this.txnId, + content: content ?? this.content, + ); + @override + String toString() { + return (StringBuffer('DbToDeviceQueue(') + ..write('clientId: $clientId, ') + ..write('id: $id, ') + ..write('type: $type, ') + ..write('txnId: $txnId, ') + ..write('content: $content') + ..write(')')) + .toString(); + } + + @override + int get hashCode => $mrjf($mrjc( + clientId.hashCode, + $mrjc(id.hashCode, + $mrjc(type.hashCode, $mrjc(txnId.hashCode, content.hashCode))))); + @override + bool operator ==(dynamic other) => + identical(this, other) || + (other is DbToDeviceQueue && + other.clientId == this.clientId && + other.id == this.id && + other.type == this.type && + other.txnId == this.txnId && + other.content == this.content); +} + +class ToDeviceQueueCompanion extends UpdateCompanion { + final Value clientId; + final Value id; + final Value type; + final Value txnId; + final Value content; + const ToDeviceQueueCompanion({ + this.clientId = const Value.absent(), + this.id = const Value.absent(), + this.type = const Value.absent(), + this.txnId = const Value.absent(), + this.content = const Value.absent(), + }); + ToDeviceQueueCompanion.insert({ + @required int clientId, + this.id = const Value.absent(), + @required String type, + @required String txnId, + @required String content, + }) : clientId = Value(clientId), + type = Value(type), + txnId = Value(txnId), + content = Value(content); + static Insertable custom({ + Expression clientId, + Expression id, + Expression type, + Expression txnId, + Expression content, + }) { + return RawValuesInsertable({ + if (clientId != null) 'client_id': clientId, + if (id != null) 'id': id, + if (type != null) 'type': type, + if (txnId != null) 'txn_id': txnId, + if (content != null) 'content': content, + }); + } + + ToDeviceQueueCompanion copyWith( + {Value clientId, + Value id, + Value type, + Value txnId, + Value content}) { + return ToDeviceQueueCompanion( + clientId: clientId ?? this.clientId, + id: id ?? this.id, + type: type ?? this.type, + txnId: txnId ?? this.txnId, + content: content ?? this.content, + ); + } + + @override + Map toColumns(bool nullToAbsent) { + final map = {}; + if (clientId.present) { + map['client_id'] = Variable(clientId.value); + } + if (id.present) { + map['id'] = Variable(id.value); + } + if (type.present) { + map['type'] = Variable(type.value); + } + if (txnId.present) { + map['txn_id'] = Variable(txnId.value); + } + if (content.present) { + map['content'] = Variable(content.value); + } + return map; + } + + @override + String toString() { + return (StringBuffer('ToDeviceQueueCompanion(') + ..write('clientId: $clientId, ') + ..write('id: $id, ') + ..write('type: $type, ') + ..write('txnId: $txnId, ') + ..write('content: $content') + ..write(')')) + .toString(); + } +} + +class ToDeviceQueue extends Table + with TableInfo { + final GeneratedDatabase _db; + final String _alias; + ToDeviceQueue(this._db, [this._alias]); + final VerificationMeta _clientIdMeta = const VerificationMeta('clientId'); + GeneratedIntColumn _clientId; + GeneratedIntColumn get clientId => _clientId ??= _constructClientId(); + GeneratedIntColumn _constructClientId() { + return GeneratedIntColumn('client_id', $tableName, false, + $customConstraints: 'NOT NULL REFERENCES clients(client_id)'); + } + + final VerificationMeta _idMeta = const VerificationMeta('id'); + GeneratedIntColumn _id; + GeneratedIntColumn get id => _id ??= _constructId(); + GeneratedIntColumn _constructId() { + return GeneratedIntColumn('id', $tableName, false, + declaredAsPrimaryKey: true, + hasAutoIncrement: true, + $customConstraints: 'NOT NULL PRIMARY KEY AUTOINCREMENT'); + } + + final VerificationMeta _typeMeta = const VerificationMeta('type'); + GeneratedTextColumn _type; + GeneratedTextColumn get type => _type ??= _constructType(); + GeneratedTextColumn _constructType() { + return GeneratedTextColumn('type', $tableName, false, + $customConstraints: 'NOT NULL'); + } + + final VerificationMeta _txnIdMeta = const VerificationMeta('txnId'); + GeneratedTextColumn _txnId; + GeneratedTextColumn get txnId => _txnId ??= _constructTxnId(); + GeneratedTextColumn _constructTxnId() { + return GeneratedTextColumn('txn_id', $tableName, false, + $customConstraints: 'NOT NULL'); + } + + final VerificationMeta _contentMeta = const VerificationMeta('content'); + GeneratedTextColumn _content; + GeneratedTextColumn get content => _content ??= _constructContent(); + GeneratedTextColumn _constructContent() { + return GeneratedTextColumn('content', $tableName, false, + $customConstraints: 'NOT NULL'); + } + + @override + List get $columns => [clientId, id, type, txnId, content]; + @override + ToDeviceQueue get asDslTable => this; + @override + String get $tableName => _alias ?? 'to_device_queue'; + @override + final String actualTableName = 'to_device_queue'; + @override + VerificationContext validateIntegrity(Insertable instance, + {bool isInserting = false}) { + final context = VerificationContext(); + final data = instance.toColumns(true); + if (data.containsKey('client_id')) { + context.handle(_clientIdMeta, + clientId.isAcceptableOrUnknown(data['client_id'], _clientIdMeta)); + } else if (isInserting) { + context.missing(_clientIdMeta); + } + if (data.containsKey('id')) { + context.handle(_idMeta, id.isAcceptableOrUnknown(data['id'], _idMeta)); + } + if (data.containsKey('type')) { + context.handle( + _typeMeta, type.isAcceptableOrUnknown(data['type'], _typeMeta)); + } else if (isInserting) { + context.missing(_typeMeta); + } + if (data.containsKey('txn_id')) { + context.handle( + _txnIdMeta, txnId.isAcceptableOrUnknown(data['txn_id'], _txnIdMeta)); + } else if (isInserting) { + context.missing(_txnIdMeta); + } + if (data.containsKey('content')) { + context.handle(_contentMeta, + content.isAcceptableOrUnknown(data['content'], _contentMeta)); + } else if (isInserting) { + context.missing(_contentMeta); + } + return context; + } + + @override + Set get $primaryKey => {id}; + @override + DbToDeviceQueue map(Map data, {String tablePrefix}) { + final effectivePrefix = tablePrefix != null ? '$tablePrefix.' : null; + return DbToDeviceQueue.fromData(data, _db, prefix: effectivePrefix); + } + + @override + ToDeviceQueue createAlias(String alias) { + return ToDeviceQueue(_db, alias); + } + + @override + bool get dontWriteConstraints => true; +} + class DbSSSSCache extends DataClass implements Insertable { final int clientId; final String type; @@ -6242,6 +6562,12 @@ abstract class _$Database extends GeneratedDatabase { Index _presencesIndex; Index get presencesIndex => _presencesIndex ??= Index('presences_index', 'CREATE INDEX presences_index ON presences(client_id);'); + ToDeviceQueue _toDeviceQueue; + ToDeviceQueue get toDeviceQueue => _toDeviceQueue ??= ToDeviceQueue(this); + Index _toDeviceQueueIndex; + Index get toDeviceQueueIndex => _toDeviceQueueIndex ??= Index( + 'to_device_queue_index', + 'CREATE INDEX to_device_queue_index ON to_device_queue(client_id);'); SsssCache _ssssCache; SsssCache get ssssCache => _ssssCache ??= SsssCache(this); Files _files; @@ -7143,6 +7469,36 @@ abstract class _$Database extends GeneratedDatabase { ); } + Future insertIntoToDeviceQueue( + int client_id, String type, String txn_id, String content) { + return customInsert( + 'INSERT INTO to_device_queue (client_id, type, txn_id, content) VALUES (:client_id, :type, :txn_id, :content)', + variables: [ + Variable.withInt(client_id), + Variable.withString(type), + Variable.withString(txn_id), + Variable.withString(content) + ], + updates: {toDeviceQueue}, + ); + } + + Selectable getToDeviceQueue(int client_id) { + return customSelect( + 'SELECT * FROM to_device_queue WHERE client_id = :client_id', + variables: [Variable.withInt(client_id)], + readsFrom: {toDeviceQueue}).map(toDeviceQueue.mapFromRow); + } + + Future deleteFromToDeviceQueue(int client_id, int id) { + return customUpdate( + 'DELETE FROM to_device_queue WHERE client_id = :client_id AND id = :id', + variables: [Variable.withInt(client_id), Variable.withInt(id)], + updates: {toDeviceQueue}, + updateKind: UpdateKind.delete, + ); + } + @override Iterable get allTables => allSchemaEntities.whereType(); @override @@ -7173,6 +7529,8 @@ abstract class _$Database extends GeneratedDatabase { roomAccountDataIndex, presences, presencesIndex, + toDeviceQueue, + toDeviceQueueIndex, ssssCache, files ]; diff --git a/lib/src/database/database.moor b/lib/src/database/database.moor index 695a384d..6b34f3c3 100644 --- a/lib/src/database/database.moor +++ b/lib/src/database/database.moor @@ -174,6 +174,15 @@ CREATE TABLE files ( UNIQUE(mxc_uri) ) AS DbFile; +CREATE TABLE to_device_queue ( + client_id INTEGER NOT NULL REFERENCES clients(client_id), + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + type TEXT NOT NULL, + txn_id TEXT NOT NULL, + content TEXT NOT NULL +) as DbToDeviceQueue; +CREATE INDEX to_device_queue_index ON to_device_queue(client_id); + -- named queries dbGetClient: SELECT * FROM clients WHERE name = :name; @@ -245,3 +254,6 @@ storeFile: INSERT OR REPLACE INTO files (mxc_uri, bytes, saved_at) VALUES (:mxc_ dbGetFile: SELECT * FROM files WHERE mxc_uri = :mxc_uri; markPendingEventsAsError: UPDATE events SET status = -1 WHERE client_id = :client_id AND status = 0; deleteOldFiles: DELETE FROM files WHERE saved_at < :saved_at; +insertIntoToDeviceQueue: INSERT INTO to_device_queue (client_id, type, txn_id, content) VALUES (:client_id, :type, :txn_id, :content); +getToDeviceQueue: SELECT * FROM to_device_queue WHERE client_id = :client_id; +deleteFromToDeviceQueue: DELETE FROM to_device_queue WHERE client_id = :client_id AND id = :id; diff --git a/test/client_test.dart b/test/client_test.dart index 4eca7c27..65ab1199 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -450,6 +450,66 @@ void main() { .length, 2); }); + test('send to_device queue', () async { + // we test: + // send fox --> fail + // send raccoon --> fox & raccoon sent + // send bunny --> only bunny sent + final client = await getClient(); + FakeMatrixApi.failToDevice = true; + final foxContent = { + '@fox:example.org': { + '*': { + 'fox': 'hole', + }, + }, + }; + final raccoonContent = { + '@fox:example.org': { + '*': { + 'raccoon': 'mask', + }, + }, + }; + final bunnyContent = { + '@fox:example.org': { + '*': { + 'bunny': 'burrow', + }, + }, + }; + await client + .sendToDevice('foxies', 'floof_txnid', foxContent) + .catchError((e) => null); // ignore the error + FakeMatrixApi.failToDevice = false; + FakeMatrixApi.calledEndpoints.clear(); + await client.sendToDevice('raccoon', 'raccoon_txnid', raccoonContent); + expect( + json.decode(FakeMatrixApi + .calledEndpoints['/client/r0/sendToDevice/foxies/floof_txnid'] + [0])['messages'], + foxContent); + expect( + json.decode(FakeMatrixApi.calledEndpoints[ + '/client/r0/sendToDevice/raccoon/raccoon_txnid'][0])['messages'], + raccoonContent); + FakeMatrixApi.calledEndpoints.clear(); + await client.sendToDevice('bunny', 'bunny_txnid', bunnyContent); + expect( + FakeMatrixApi + .calledEndpoints['/client/r0/sendToDevice/foxies/floof_txnid'], + null); + expect( + FakeMatrixApi + .calledEndpoints['/client/r0/sendToDevice/raccoon/raccoon_txnid'], + null); + expect( + json.decode(FakeMatrixApi + .calledEndpoints['/client/r0/sendToDevice/bunny/bunny_txnid'] + [0])['messages'], + bunnyContent); + await client.dispose(closeDatabase: true); + }); test('Test the fake store api', () async { final database = await getDatabase(null); var client1 = Client( diff --git a/test/fake_matrix_api.dart b/test/fake_matrix_api.dart index 428c15a8..d6e2dc0d 100644 --- a/test/fake_matrix_api.dart +++ b/test/fake_matrix_api.dart @@ -40,6 +40,7 @@ class FakeMatrixApi extends MockClient { static final calledEndpoints = >{}; static int eventCounter = 0; static sdk.Client client; + static bool failToDevice = false; FakeMatrixApi() : super((request) async { @@ -92,6 +93,9 @@ class FakeMatrixApi extends MockClient { } else if (method == 'PUT' && action.contains('/client/r0/sendToDevice/')) { res = {}; + if (failToDevice) { + statusCode = 500; + } } else if (method == 'GET' && action.contains('/client/r0/rooms/') && action.contains('/state/m.room.member/')) {