fix: Add to_device queue to prevent olm session corruptions

This commit is contained in:
Sorunome 2021-02-08 14:57:19 +01:00
parent 771b552801
commit d373a06aa2
No known key found for this signature in database
GPG Key ID: B19471D07FC9BE9C
7 changed files with 504 additions and 11 deletions

View File

@ -242,10 +242,7 @@ class OlmManager {
}
}
void storeOlmSession(OlmSession session) {
if (client.database == null) {
return;
}
Future<void> storeOlmSession(OlmSession session) async {
_olmSessions[session.identityKey] ??= <OlmSession>[];
final ix = _olmSessions[session.identityKey]
.indexWhere((s) => s.sessionId == session.sessionId);
@ -256,7 +253,10 @@ class OlmManager {
// update an existing session
_olmSessions[session.identityKey][ix] = session;
}
client.database.storeOlmSession(
if (client.database == null) {
return;
}
await client.database.storeOlmSession(
client.id,
session.identityKey,
session.sessionId,
@ -285,14 +285,14 @@ class OlmManager {
final device = client.userDeviceKeys[event.sender]?.deviceKeys?.values
?.firstWhere((d) => d.curve25519Key == senderKey, orElse: () => null);
final existingSessions = olmSessions[senderKey];
final updateSessionUsage = ([OlmSession session]) => runInRoot(() {
final updateSessionUsage = ([OlmSession session]) => runInRoot(() async {
if (session != null) {
session.lastReceived = DateTime.now();
storeOlmSession(session);
await storeOlmSession(session);
}
if (device != null) {
device.lastActive = DateTime.now();
client.database?.setLastActiveUserDeviceKey(
await client.database?.setLastActiveUserDeviceKey(
device.lastActive.millisecondsSinceEpoch,
client.id,
device.userId,
@ -542,9 +542,10 @@ class OlmManager {
'recipient_keys': {'ed25519': device.ed25519Key},
};
final encryptResult = sess.first.session.encrypt(json.encode(fullPayload));
storeOlmSession(sess.first);
await storeOlmSession(sess.first);
if (client.database != null) {
unawaited(client.database.setLastSentMessageUserDeviceKey(
// ignore: unawaited_futures
runInRoot(() => client.database.setLastSentMessageUserDeviceKey(
json.encode({
'type': type,
'content': payload,

View File

@ -957,6 +957,12 @@ class Client extends MatrixApi {
if (encryptionEnabled) {
encryption.onSync();
}
// try to process the to_device queue
try {
await processToDeviceQueue();
} catch (_) {} // we want to dispose any errors this throws
_retryDelay = Future.value();
} on MatrixException catch (e, s) {
onSyncError.add(SdkError(exception: e, stackTrace: s));
@ -1633,6 +1639,51 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
}
}
/// Processes the to_device queue and tries to send every entry.
/// This function MAY throw an error, which just means the to_device queue wasn't
/// proccessed all the way.
Future<void> processToDeviceQueue() async {
if (database == null) {
return;
}
final entries = await database.getToDeviceQueue(id).get();
for (final entry in entries) {
// ohgod what is this...
final data = (json.decode(entry.content) as Map).map((k, v) =>
MapEntry<String, Map<String, Map<String, dynamic>>>(
k,
(v as Map).map((k, v) => MapEntry<String, Map<String, dynamic>>(
k, Map<String, dynamic>.from(v)))));
await super.sendToDevice(entry.type, entry.txnId, data);
await database.deleteFromToDeviceQueue(id, entry.id);
}
}
/// Sends a raw to_device event with a [eventType], a [txnId] and a content [data].
/// Before sending, it tries to re-send potentially queued to_device events and adds
/// the current one to the queue, should it fail.
@override
Future<void> sendToDevice(
String eventType,
String txnId,
Map<String, Map<String, Map<String, dynamic>>> messages,
) async {
try {
await processToDeviceQueue();
await super.sendToDevice(eventType, txnId, messages);
} catch (e, s) {
Logs().w(
'[Client] Problem while sending to_device event, retrying later...',
e,
s);
if (database != null) {
await database.insertIntoToDeviceQueue(
id, eventType, txnId, json.encode(messages));
}
rethrow;
}
}
/// Send an (unencrypted) to device [message] of a specific [eventType] to all
/// devices of a set of [users].
Future<void> sendToDevicesOfUserIds(

View File

@ -70,7 +70,7 @@ class Database extends _$Database {
Database.connect(DatabaseConnection connection) : super.connect(connection);
@override
int get schemaVersion => 10;
int get schemaVersion => 11;
int get maxFileSize => 1 * 1024 * 1024;
@ -170,6 +170,11 @@ class Database extends _$Database {
await m.createIndexIfNotExists(olmSessionsIdentityIndex);
from++;
}
if (from == 10) {
await m.createTableIfNotExists(toDeviceQueue);
await m.createIndexIfNotExists(toDeviceQueueIndex);
from++;
}
} catch (e, s) {
api.Logs().e('Database migration failed', e, s);
onError.add(SdkError(exception: e, stackTrace: s));
@ -658,6 +663,8 @@ class Database extends _$Database {
.go();
await (delete(ssssCache)..where((r) => r.clientId.equals(clientId))).go();
await (delete(clients)..where((r) => r.clientId.equals(clientId))).go();
await (delete(toDeviceQueue)..where((r) => r.clientId.equals(clientId)))
.go();
}
Future<sdk.User> getUser(int clientId, String userId, sdk.Room room) async {

View File

@ -5595,6 +5595,326 @@ class Presences extends Table with TableInfo<Presences, DbPresence> {
bool get dontWriteConstraints => true;
}
class DbToDeviceQueue extends DataClass implements Insertable<DbToDeviceQueue> {
final int clientId;
final int id;
final String type;
final String txnId;
final String content;
DbToDeviceQueue(
{@required this.clientId,
@required this.id,
@required this.type,
@required this.txnId,
@required this.content});
factory DbToDeviceQueue.fromData(
Map<String, dynamic> data, GeneratedDatabase db,
{String prefix}) {
final effectivePrefix = prefix ?? '';
final intType = db.typeSystem.forDartType<int>();
final stringType = db.typeSystem.forDartType<String>();
return DbToDeviceQueue(
clientId:
intType.mapFromDatabaseResponse(data['${effectivePrefix}client_id']),
id: intType.mapFromDatabaseResponse(data['${effectivePrefix}id']),
type: stringType.mapFromDatabaseResponse(data['${effectivePrefix}type']),
txnId:
stringType.mapFromDatabaseResponse(data['${effectivePrefix}txn_id']),
content:
stringType.mapFromDatabaseResponse(data['${effectivePrefix}content']),
);
}
@override
Map<String, Expression> toColumns(bool nullToAbsent) {
final map = <String, Expression>{};
if (!nullToAbsent || clientId != null) {
map['client_id'] = Variable<int>(clientId);
}
if (!nullToAbsent || id != null) {
map['id'] = Variable<int>(id);
}
if (!nullToAbsent || type != null) {
map['type'] = Variable<String>(type);
}
if (!nullToAbsent || txnId != null) {
map['txn_id'] = Variable<String>(txnId);
}
if (!nullToAbsent || content != null) {
map['content'] = Variable<String>(content);
}
return map;
}
ToDeviceQueueCompanion toCompanion(bool nullToAbsent) {
return ToDeviceQueueCompanion(
clientId: clientId == null && nullToAbsent
? const Value.absent()
: Value(clientId),
id: id == null && nullToAbsent ? const Value.absent() : Value(id),
type: type == null && nullToAbsent ? const Value.absent() : Value(type),
txnId:
txnId == null && nullToAbsent ? const Value.absent() : Value(txnId),
content: content == null && nullToAbsent
? const Value.absent()
: Value(content),
);
}
factory DbToDeviceQueue.fromJson(Map<String, dynamic> json,
{ValueSerializer serializer}) {
serializer ??= moorRuntimeOptions.defaultSerializer;
return DbToDeviceQueue(
clientId: serializer.fromJson<int>(json['client_id']),
id: serializer.fromJson<int>(json['id']),
type: serializer.fromJson<String>(json['type']),
txnId: serializer.fromJson<String>(json['txn_id']),
content: serializer.fromJson<String>(json['content']),
);
}
@override
Map<String, dynamic> toJson({ValueSerializer serializer}) {
serializer ??= moorRuntimeOptions.defaultSerializer;
return <String, dynamic>{
'client_id': serializer.toJson<int>(clientId),
'id': serializer.toJson<int>(id),
'type': serializer.toJson<String>(type),
'txn_id': serializer.toJson<String>(txnId),
'content': serializer.toJson<String>(content),
};
}
DbToDeviceQueue copyWith(
{int clientId, int id, String type, String txnId, String content}) =>
DbToDeviceQueue(
clientId: clientId ?? this.clientId,
id: id ?? this.id,
type: type ?? this.type,
txnId: txnId ?? this.txnId,
content: content ?? this.content,
);
@override
String toString() {
return (StringBuffer('DbToDeviceQueue(')
..write('clientId: $clientId, ')
..write('id: $id, ')
..write('type: $type, ')
..write('txnId: $txnId, ')
..write('content: $content')
..write(')'))
.toString();
}
@override
int get hashCode => $mrjf($mrjc(
clientId.hashCode,
$mrjc(id.hashCode,
$mrjc(type.hashCode, $mrjc(txnId.hashCode, content.hashCode)))));
@override
bool operator ==(dynamic other) =>
identical(this, other) ||
(other is DbToDeviceQueue &&
other.clientId == this.clientId &&
other.id == this.id &&
other.type == this.type &&
other.txnId == this.txnId &&
other.content == this.content);
}
class ToDeviceQueueCompanion extends UpdateCompanion<DbToDeviceQueue> {
final Value<int> clientId;
final Value<int> id;
final Value<String> type;
final Value<String> txnId;
final Value<String> content;
const ToDeviceQueueCompanion({
this.clientId = const Value.absent(),
this.id = const Value.absent(),
this.type = const Value.absent(),
this.txnId = const Value.absent(),
this.content = const Value.absent(),
});
ToDeviceQueueCompanion.insert({
@required int clientId,
this.id = const Value.absent(),
@required String type,
@required String txnId,
@required String content,
}) : clientId = Value(clientId),
type = Value(type),
txnId = Value(txnId),
content = Value(content);
static Insertable<DbToDeviceQueue> custom({
Expression<int> clientId,
Expression<int> id,
Expression<String> type,
Expression<String> txnId,
Expression<String> content,
}) {
return RawValuesInsertable({
if (clientId != null) 'client_id': clientId,
if (id != null) 'id': id,
if (type != null) 'type': type,
if (txnId != null) 'txn_id': txnId,
if (content != null) 'content': content,
});
}
ToDeviceQueueCompanion copyWith(
{Value<int> clientId,
Value<int> id,
Value<String> type,
Value<String> txnId,
Value<String> content}) {
return ToDeviceQueueCompanion(
clientId: clientId ?? this.clientId,
id: id ?? this.id,
type: type ?? this.type,
txnId: txnId ?? this.txnId,
content: content ?? this.content,
);
}
@override
Map<String, Expression> toColumns(bool nullToAbsent) {
final map = <String, Expression>{};
if (clientId.present) {
map['client_id'] = Variable<int>(clientId.value);
}
if (id.present) {
map['id'] = Variable<int>(id.value);
}
if (type.present) {
map['type'] = Variable<String>(type.value);
}
if (txnId.present) {
map['txn_id'] = Variable<String>(txnId.value);
}
if (content.present) {
map['content'] = Variable<String>(content.value);
}
return map;
}
@override
String toString() {
return (StringBuffer('ToDeviceQueueCompanion(')
..write('clientId: $clientId, ')
..write('id: $id, ')
..write('type: $type, ')
..write('txnId: $txnId, ')
..write('content: $content')
..write(')'))
.toString();
}
}
class ToDeviceQueue extends Table
with TableInfo<ToDeviceQueue, DbToDeviceQueue> {
final GeneratedDatabase _db;
final String _alias;
ToDeviceQueue(this._db, [this._alias]);
final VerificationMeta _clientIdMeta = const VerificationMeta('clientId');
GeneratedIntColumn _clientId;
GeneratedIntColumn get clientId => _clientId ??= _constructClientId();
GeneratedIntColumn _constructClientId() {
return GeneratedIntColumn('client_id', $tableName, false,
$customConstraints: 'NOT NULL REFERENCES clients(client_id)');
}
final VerificationMeta _idMeta = const VerificationMeta('id');
GeneratedIntColumn _id;
GeneratedIntColumn get id => _id ??= _constructId();
GeneratedIntColumn _constructId() {
return GeneratedIntColumn('id', $tableName, false,
declaredAsPrimaryKey: true,
hasAutoIncrement: true,
$customConstraints: 'NOT NULL PRIMARY KEY AUTOINCREMENT');
}
final VerificationMeta _typeMeta = const VerificationMeta('type');
GeneratedTextColumn _type;
GeneratedTextColumn get type => _type ??= _constructType();
GeneratedTextColumn _constructType() {
return GeneratedTextColumn('type', $tableName, false,
$customConstraints: 'NOT NULL');
}
final VerificationMeta _txnIdMeta = const VerificationMeta('txnId');
GeneratedTextColumn _txnId;
GeneratedTextColumn get txnId => _txnId ??= _constructTxnId();
GeneratedTextColumn _constructTxnId() {
return GeneratedTextColumn('txn_id', $tableName, false,
$customConstraints: 'NOT NULL');
}
final VerificationMeta _contentMeta = const VerificationMeta('content');
GeneratedTextColumn _content;
GeneratedTextColumn get content => _content ??= _constructContent();
GeneratedTextColumn _constructContent() {
return GeneratedTextColumn('content', $tableName, false,
$customConstraints: 'NOT NULL');
}
@override
List<GeneratedColumn> get $columns => [clientId, id, type, txnId, content];
@override
ToDeviceQueue get asDslTable => this;
@override
String get $tableName => _alias ?? 'to_device_queue';
@override
final String actualTableName = 'to_device_queue';
@override
VerificationContext validateIntegrity(Insertable<DbToDeviceQueue> instance,
{bool isInserting = false}) {
final context = VerificationContext();
final data = instance.toColumns(true);
if (data.containsKey('client_id')) {
context.handle(_clientIdMeta,
clientId.isAcceptableOrUnknown(data['client_id'], _clientIdMeta));
} else if (isInserting) {
context.missing(_clientIdMeta);
}
if (data.containsKey('id')) {
context.handle(_idMeta, id.isAcceptableOrUnknown(data['id'], _idMeta));
}
if (data.containsKey('type')) {
context.handle(
_typeMeta, type.isAcceptableOrUnknown(data['type'], _typeMeta));
} else if (isInserting) {
context.missing(_typeMeta);
}
if (data.containsKey('txn_id')) {
context.handle(
_txnIdMeta, txnId.isAcceptableOrUnknown(data['txn_id'], _txnIdMeta));
} else if (isInserting) {
context.missing(_txnIdMeta);
}
if (data.containsKey('content')) {
context.handle(_contentMeta,
content.isAcceptableOrUnknown(data['content'], _contentMeta));
} else if (isInserting) {
context.missing(_contentMeta);
}
return context;
}
@override
Set<GeneratedColumn> get $primaryKey => {id};
@override
DbToDeviceQueue map(Map<String, dynamic> data, {String tablePrefix}) {
final effectivePrefix = tablePrefix != null ? '$tablePrefix.' : null;
return DbToDeviceQueue.fromData(data, _db, prefix: effectivePrefix);
}
@override
ToDeviceQueue createAlias(String alias) {
return ToDeviceQueue(_db, alias);
}
@override
bool get dontWriteConstraints => true;
}
class DbSSSSCache extends DataClass implements Insertable<DbSSSSCache> {
final int clientId;
final String type;
@ -6242,6 +6562,12 @@ abstract class _$Database extends GeneratedDatabase {
Index _presencesIndex;
Index get presencesIndex => _presencesIndex ??= Index('presences_index',
'CREATE INDEX presences_index ON presences(client_id);');
ToDeviceQueue _toDeviceQueue;
ToDeviceQueue get toDeviceQueue => _toDeviceQueue ??= ToDeviceQueue(this);
Index _toDeviceQueueIndex;
Index get toDeviceQueueIndex => _toDeviceQueueIndex ??= Index(
'to_device_queue_index',
'CREATE INDEX to_device_queue_index ON to_device_queue(client_id);');
SsssCache _ssssCache;
SsssCache get ssssCache => _ssssCache ??= SsssCache(this);
Files _files;
@ -7143,6 +7469,36 @@ abstract class _$Database extends GeneratedDatabase {
);
}
Future<int> insertIntoToDeviceQueue(
int client_id, String type, String txn_id, String content) {
return customInsert(
'INSERT INTO to_device_queue (client_id, type, txn_id, content) VALUES (:client_id, :type, :txn_id, :content)',
variables: [
Variable.withInt(client_id),
Variable.withString(type),
Variable.withString(txn_id),
Variable.withString(content)
],
updates: {toDeviceQueue},
);
}
Selectable<DbToDeviceQueue> getToDeviceQueue(int client_id) {
return customSelect(
'SELECT * FROM to_device_queue WHERE client_id = :client_id',
variables: [Variable.withInt(client_id)],
readsFrom: {toDeviceQueue}).map(toDeviceQueue.mapFromRow);
}
Future<int> deleteFromToDeviceQueue(int client_id, int id) {
return customUpdate(
'DELETE FROM to_device_queue WHERE client_id = :client_id AND id = :id',
variables: [Variable.withInt(client_id), Variable.withInt(id)],
updates: {toDeviceQueue},
updateKind: UpdateKind.delete,
);
}
@override
Iterable<TableInfo> get allTables => allSchemaEntities.whereType<TableInfo>();
@override
@ -7173,6 +7529,8 @@ abstract class _$Database extends GeneratedDatabase {
roomAccountDataIndex,
presences,
presencesIndex,
toDeviceQueue,
toDeviceQueueIndex,
ssssCache,
files
];

View File

@ -174,6 +174,15 @@ CREATE TABLE files (
UNIQUE(mxc_uri)
) AS DbFile;
CREATE TABLE to_device_queue (
client_id INTEGER NOT NULL REFERENCES clients(client_id),
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
txn_id TEXT NOT NULL,
content TEXT NOT NULL
) as DbToDeviceQueue;
CREATE INDEX to_device_queue_index ON to_device_queue(client_id);
-- named queries
dbGetClient: SELECT * FROM clients WHERE name = :name;
@ -245,3 +254,6 @@ storeFile: INSERT OR REPLACE INTO files (mxc_uri, bytes, saved_at) VALUES (:mxc_
dbGetFile: SELECT * FROM files WHERE mxc_uri = :mxc_uri;
markPendingEventsAsError: UPDATE events SET status = -1 WHERE client_id = :client_id AND status = 0;
deleteOldFiles: DELETE FROM files WHERE saved_at < :saved_at;
insertIntoToDeviceQueue: INSERT INTO to_device_queue (client_id, type, txn_id, content) VALUES (:client_id, :type, :txn_id, :content);
getToDeviceQueue: SELECT * FROM to_device_queue WHERE client_id = :client_id;
deleteFromToDeviceQueue: DELETE FROM to_device_queue WHERE client_id = :client_id AND id = :id;

View File

@ -450,6 +450,66 @@ void main() {
.length,
2);
});
test('send to_device queue', () async {
// we test:
// send fox --> fail
// send raccoon --> fox & raccoon sent
// send bunny --> only bunny sent
final client = await getClient();
FakeMatrixApi.failToDevice = true;
final foxContent = {
'@fox:example.org': {
'*': {
'fox': 'hole',
},
},
};
final raccoonContent = {
'@fox:example.org': {
'*': {
'raccoon': 'mask',
},
},
};
final bunnyContent = {
'@fox:example.org': {
'*': {
'bunny': 'burrow',
},
},
};
await client
.sendToDevice('foxies', 'floof_txnid', foxContent)
.catchError((e) => null); // ignore the error
FakeMatrixApi.failToDevice = false;
FakeMatrixApi.calledEndpoints.clear();
await client.sendToDevice('raccoon', 'raccoon_txnid', raccoonContent);
expect(
json.decode(FakeMatrixApi
.calledEndpoints['/client/r0/sendToDevice/foxies/floof_txnid']
[0])['messages'],
foxContent);
expect(
json.decode(FakeMatrixApi.calledEndpoints[
'/client/r0/sendToDevice/raccoon/raccoon_txnid'][0])['messages'],
raccoonContent);
FakeMatrixApi.calledEndpoints.clear();
await client.sendToDevice('bunny', 'bunny_txnid', bunnyContent);
expect(
FakeMatrixApi
.calledEndpoints['/client/r0/sendToDevice/foxies/floof_txnid'],
null);
expect(
FakeMatrixApi
.calledEndpoints['/client/r0/sendToDevice/raccoon/raccoon_txnid'],
null);
expect(
json.decode(FakeMatrixApi
.calledEndpoints['/client/r0/sendToDevice/bunny/bunny_txnid']
[0])['messages'],
bunnyContent);
await client.dispose(closeDatabase: true);
});
test('Test the fake store api', () async {
final database = await getDatabase(null);
var client1 = Client(

View File

@ -40,6 +40,7 @@ class FakeMatrixApi extends MockClient {
static final calledEndpoints = <String, List<dynamic>>{};
static int eventCounter = 0;
static sdk.Client client;
static bool failToDevice = false;
FakeMatrixApi()
: super((request) async {
@ -92,6 +93,9 @@ class FakeMatrixApi extends MockClient {
} else if (method == 'PUT' &&
action.contains('/client/r0/sendToDevice/')) {
res = {};
if (failToDevice) {
statusCode = 500;
}
} else if (method == 'GET' &&
action.contains('/client/r0/rooms/') &&
action.contains('/state/m.room.member/')) {