Merge pull request #1814 from famedly/krille/store-upload-inbound-sessions-in-que

refactor: Store not uploaded group sessions in its own database queue
This commit is contained in:
td 2024-05-20 18:40:39 +05:30 committed by GitHub
commit f9fe6b17a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 70 additions and 38 deletions

View File

@ -23,7 +23,8 @@ class StoredInboundGroupSession {
final String content;
final String indexes;
final String allowedAtIndex;
final bool uploaded;
@Deprecated('Only in use in legacy databases!')
final bool? uploaded;
final String senderKey;
final String senderClaimedKeys;
@ -34,7 +35,7 @@ class StoredInboundGroupSession {
required this.content,
required this.indexes,
required this.allowedAtIndex,
required this.uploaded,
@Deprecated('Only in use in legacy databases!') this.uploaded,
required this.senderKey,
required this.senderClaimedKeys,
});
@ -47,6 +48,7 @@ class StoredInboundGroupSession {
content: json['content'],
indexes: json['indexes'],
allowedAtIndex: json['allowed_at_index'],
// ignore: deprecated_member_use_from_same_package
uploaded: json['uploaded'],
senderKey: json['sender_key'],
senderClaimedKeys: json['sender_claimed_keys'],
@ -59,7 +61,8 @@ class StoredInboundGroupSession {
'content': content,
'indexes': indexes,
'allowed_at_index': allowedAtIndex,
'uploaded': uploaded,
// ignore: deprecated_member_use_from_same_package
if (uploaded != null) 'uploaded': uploaded,
'sender_key': senderKey,
'sender_claimed_keys': senderClaimedKeys,
};

View File

@ -52,7 +52,7 @@ import 'package:matrix/src/database/database_file_storage_stub.dart'
/// Learn more at:
/// https://github.com/famedly/matrix-dart-sdk/issues/1642#issuecomment-1865827227
class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
static const int version = 7;
static const int version = 8;
final String name;
late BoxCollection _collection;
late Box<String> _clientBox;
@ -76,6 +76,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
/// Key is a tuple as TupleKey(roomId, type)
late Box<Map> _roomAccountDataBox;
late Box<Map> _inboundGroupSessionsBox;
late Box<String> _inboundGroupSessionsUploadQueueBox;
late Box<Map> _outboundGroupSessionsBox;
late Box<Map> _olmSessionsBox;
@ -131,6 +132,9 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
static const String _inboundGroupSessionsBoxName =
'box_inbound_group_session';
static const String _inboundGroupSessionsUploadQueueBoxName =
'box_inbound_group_sessions_upload_queue';
static const String _outboundGroupSessionsBoxName =
'box_outbound_group_session';
@ -198,6 +202,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
_roomMembersBoxName,
_roomAccountDataBoxName,
_inboundGroupSessionsBoxName,
_inboundGroupSessionsUploadQueueBoxName,
_outboundGroupSessionsBoxName,
_olmSessionsBoxName,
_userDeviceKeysBoxName,
@ -241,6 +246,9 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
_inboundGroupSessionsBox = _collection.openBox(
_inboundGroupSessionsBoxName,
);
_inboundGroupSessionsUploadQueueBox = _collection.openBox(
_inboundGroupSessionsUploadQueueBoxName,
);
_outboundGroupSessionsBox = _collection.openBox(
_outboundGroupSessionsBoxName,
);
@ -288,6 +296,30 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
Future<void> _migrateFromVersion(int currentVersion) async {
Logs().i('Migrate store database from version $currentVersion to $version');
if (version == 8) {
// Migrate to inbound group sessions upload queue:
final allInboundGroupSessions = await getAllInboundGroupSessions();
final sessionsToUpload = allInboundGroupSessions
// ignore: deprecated_member_use_from_same_package
.where((session) => session.uploaded == false)
.toList();
Logs().i(
'Move ${allInboundGroupSessions.length} inbound group sessions to upload to their own queue...');
await transaction(() async {
for (final session in sessionsToUpload) {
await _inboundGroupSessionsUploadQueueBox.put(
session.sessionId,
session.roomId,
);
}
});
if (currentVersion == 7) {
await _clientBox.put('version', version.toString());
return;
}
}
// The default version upgrade:
await clearCache();
await _clientBox.put('version', version.toString());
}
@ -457,17 +489,13 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
@override
Future<List<StoredInboundGroupSession>>
getInboundGroupSessionsToUpload() async {
final sessions = (await _inboundGroupSessionsBox.getAllValues())
.values
.where((rawSession) => rawSession['uploaded'] == false)
final uploadQueue =
await _inboundGroupSessionsUploadQueueBox.getAllValues();
final sessionFutures = uploadQueue.entries
.take(50)
.map(
(json) => StoredInboundGroupSession.fromJson(
copyMap(json),
),
)
.toList();
return sessions;
.map((entry) => getInboundGroupSession(entry.value, entry.key));
final sessions = await Future.wait(sessionFutures);
return sessions.whereType<StoredInboundGroupSession>().toList();
}
@override
@ -786,16 +814,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
@override
Future<void> markInboundGroupSessionAsUploaded(
String roomId, String sessionId) async {
final raw = copyMap(
await _inboundGroupSessionsBox.get(sessionId) ?? {},
);
if (raw.isEmpty) {
Logs().w(
'Tried to mark inbound group session as uploaded which was not found in the database!');
return;
}
raw['uploaded'] = true;
await _inboundGroupSessionsBox.put(sessionId, raw);
await _inboundGroupSessionsUploadQueueBox.delete(sessionId);
return;
}
@ -807,8 +826,9 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
await _inboundGroupSessionsBox.get(sessionId) ?? {},
);
if (raw.isEmpty) continue;
raw['uploaded'] = false;
await _inboundGroupSessionsBox.put(sessionId, raw);
final roomId = raw.tryGet<String>('room_id');
if (roomId == null) continue;
await _inboundGroupSessionsUploadQueueBox.put(sessionId, roomId);
}
return;
}
@ -1129,19 +1149,22 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
String allowedAtIndex,
String senderKey,
String senderClaimedKey) async {
final json = StoredInboundGroupSession(
roomId: roomId,
sessionId: sessionId,
pickle: pickle,
content: content,
indexes: indexes,
allowedAtIndex: allowedAtIndex,
senderKey: senderKey,
senderClaimedKeys: senderClaimedKey,
).toJson();
await _inboundGroupSessionsBox.put(
sessionId,
StoredInboundGroupSession(
roomId: roomId,
sessionId: sessionId,
pickle: pickle,
content: content,
indexes: indexes,
allowedAtIndex: allowedAtIndex,
senderKey: senderKey,
senderClaimedKeys: senderClaimedKey,
uploaded: false,
).toJson());
sessionId,
json,
);
// Mark this session as needing upload too
await _inboundGroupSessionsUploadQueueBox.put(sessionId, roomId);
return;
}
@ -1432,6 +1455,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
_roomAccountDataBoxName: await _roomAccountDataBox.getAllValues(),
_inboundGroupSessionsBoxName:
await _inboundGroupSessionsBox.getAllValues(),
_inboundGroupSessionsUploadQueueBoxName:
await _inboundGroupSessionsUploadQueueBox.getAllValues(),
_outboundGroupSessionsBoxName:
await _outboundGroupSessionsBox.getAllValues(),
_olmSessionsBoxName: await _olmSessionsBox.getAllValues(),
@ -1488,6 +1513,10 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
await _inboundGroupSessionsBox.put(
key, json[_inboundGroupSessionsBoxName]![key]);
}
for (final key in json[_inboundGroupSessionsUploadQueueBoxName]!.keys) {
await _inboundGroupSessionsUploadQueueBox.put(
key, json[_inboundGroupSessionsUploadQueueBoxName]![key]);
}
for (final key in json[_outboundGroupSessionsBoxName]!.keys) {
await _outboundGroupSessionsBox.put(
key, json[_outboundGroupSessionsBoxName]![key]);