diff --git a/lib/src/client.dart b/lib/src/client.dart index b9d876a9..1c1118dc 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -1498,6 +1498,7 @@ class Client extends MatrixApi { _id = accessToken = syncFilterId = homeserver = _userID = _deviceID = _deviceName = prevBatch = null; _rooms = []; + _eventsPendingDecryption.clear(); await encryption?.dispose(); encryption = null; onLoginStateChanged.add(LoginState.loggedOut); @@ -1716,18 +1717,60 @@ class Client extends MatrixApi { } Future _handleToDeviceEvents(List events) async { + final Map> roomsWithNewKeyToSessionId = {}; for (final event in events) { var toDeviceEvent = ToDeviceEvent.fromJson(event.toJson()); Logs().v('Got to_device event of type ${toDeviceEvent.type}'); - if (toDeviceEvent.type == EventTypes.Encrypted && encryptionEnabled) { - toDeviceEvent = await encryption!.decryptToDeviceEvent(toDeviceEvent); - Logs().v('Decrypted type is: ${toDeviceEvent.type}'); - } if (encryptionEnabled) { + if (toDeviceEvent.type == EventTypes.Encrypted) { + toDeviceEvent = await encryption!.decryptToDeviceEvent(toDeviceEvent); + Logs().v('Decrypted type is: ${toDeviceEvent.type}'); + + /// collect new keys so that we can find those events in the decryption queue + if (toDeviceEvent.type == EventTypes.ForwardedRoomKey || + toDeviceEvent.type == EventTypes.RoomKey) { + final roomId = event.content['room_id']; + final sessionId = event.content['session_id']; + if (roomId is String && sessionId is String) { + (roomsWithNewKeyToSessionId[roomId] ??= []).add(sessionId); + } + } + } await encryption?.handleToDeviceEvent(toDeviceEvent); } onToDeviceEvent.add(toDeviceEvent); } + + // emit updates for all events in the queue + for (final entry in roomsWithNewKeyToSessionId.entries) { + final roomId = entry.key; + final sessionIds = entry.value; + + final room = getRoomById(roomId); + if (room != null) { + final List events = []; + for (final event in _eventsPendingDecryption) { + if (event.event.roomID != roomId) continue; + if (!sessionIds.contains( + event.event.content['content']?['session_id'])) continue; + + final decryptedEvent = await event.event.decrypt(room); + if (decryptedEvent.content.tryGet('type') != + EventTypes.Encrypted) { + events.add(BasicEvent.fromJson(decryptedEvent.content)); + } + } + + await _handleRoomEvents( + room, events, EventUpdateType.decryptedTimelineQueue); + + _eventsPendingDecryption.removeWhere((e) => events.any( + (decryptedEvent) => + decryptedEvent.content['event_id'] == + e.event.content['event_id'])); + } + } + _eventsPendingDecryption.removeWhere((e) => e.timedOut); } Future _handleRooms(Map rooms, @@ -1875,6 +1918,9 @@ class Client extends MatrixApi { } } + /// Stores event that came down /sync but didn't get decrypted because of missing keys yet. + final List<_EventPendingDecryption> _eventsPendingDecryption = []; + Future _handleRoomEvents( Room room, List events, EventUpdateType type, {bool store = true}) async { @@ -1892,13 +1938,21 @@ class Client extends MatrixApi { .getState(EventTypes.Encryption) ?.content .tryGet('algorithm'))) { - return; + continue; } var update = EventUpdate(roomID: room.id, type: type, content: event.toJson()); if (event.type == EventTypes.Encrypted && encryptionEnabled) { update = await update.decrypt(room); + + // if the event failed to decrypt, add it to the queue + if (update.content.tryGet('type') == EventTypes.Encrypted) { + _eventsPendingDecryption.add(_EventPendingDecryption(EventUpdate( + roomID: update.roomID, + type: EventUpdateType.decryptedTimelineQueue, + content: update.content))); + } } if (event.type == EventTypes.Message && !room.isDirectChat && @@ -1920,7 +1974,9 @@ class Client extends MatrixApi { } onEvent.add(update); - if (prevBatch != null && type == EventUpdateType.timeline) { + if (prevBatch != null && + (type == EventUpdateType.timeline || + type == EventUpdateType.decryptedTimelineQueue)) { if (update.content.tryGet('type')?.startsWith('m.call.') ?? false) { final callEvent = Event.fromJson(update.content, room); @@ -2122,6 +2178,7 @@ class Client extends MatrixApi { BasicRoomEvent.fromJson(eventUpdate.content); break; case EventUpdateType.history: + case EventUpdateType.decryptedTimelineQueue: break; } room.onUpdate.add(room.id); @@ -2697,6 +2754,7 @@ class Client extends MatrixApi { rooms.clear(); await database?.clearCache(); encryption?.keyManager.clearOutboundGroupSessions(); + _eventsPendingDecryption.clear(); onCacheCleared.add(true); // Restart the syncloop backgroundSync = true; @@ -2994,3 +3052,15 @@ class ArchivedRoom { ArchivedRoom({required this.room, required this.timeline}); } + +/// An event that is waiting for a key to arrive to decrypt. Times out after some time. +class _EventPendingDecryption { + DateTime addedAt = DateTime.now(); + + EventUpdate event; + + bool get timedOut => + addedAt.add(Duration(minutes: 5)).isBefore(DateTime.now()); + + _EventPendingDecryption(this.event); +} diff --git a/lib/src/database/hive_collections_database.dart b/lib/src/database/hive_collections_database.dart index 94427453..a096268d 100644 --- a/lib/src/database/hive_collections_database.dart +++ b/lib/src/database/hive_collections_database.dart @@ -975,8 +975,11 @@ class HiveCollectionsDatabase extends DatabaseApi { } // Store a common message event - if ({EventUpdateType.timeline, EventUpdateType.history} - .contains(eventUpdate.type)) { + if ({ + EventUpdateType.timeline, + EventUpdateType.history, + EventUpdateType.decryptedTimelineQueue + }.contains(eventUpdate.type)) { final eventId = eventUpdate.content['event_id']; // Is this ID already in the store? final prevEvent = await _eventsBox diff --git a/lib/src/database/hive_database.dart b/lib/src/database/hive_database.dart index 16549bc8..8cda667f 100644 --- a/lib/src/database/hive_database.dart +++ b/lib/src/database/hive_database.dart @@ -922,8 +922,11 @@ class FamedlySdkHiveDatabase extends DatabaseApi { } // Store a common message event - if ({EventUpdateType.timeline, EventUpdateType.history} - .contains(eventUpdate.type)) { + if ({ + EventUpdateType.timeline, + EventUpdateType.history, + EventUpdateType.decryptedTimelineQueue + }.contains(eventUpdate.type)) { final eventId = eventUpdate.content['event_id']; // Is this ID already in the store? final Map? prevEvent = await _eventsBox diff --git a/lib/src/utils/event_update.dart b/lib/src/utils/event_update.dart index 2f36687e..fb953871 100644 --- a/lib/src/utils/event_update.dart +++ b/lib/src/utils/event_update.dart @@ -19,12 +19,26 @@ import 'package:matrix/matrix.dart'; enum EventUpdateType { + /// Newly received events from /sync timeline, + + /// A state update not visible in the timeline currently state, + + /// Messages that have been fetched when requesting past history history, + + /// Updates to account data accountData, + + /// Ephemeral events like receipts ephemeral, - inviteState + + /// The state of an invite + inviteState, + + /// Events that came down timeline, but we only received the keys for it later so we send a second update for them in the decrypted state + decryptedTimelineQueue, } /// Represents a new event (e.g. a message in a room) or an update for an