Merge branch 'nico/decryption-queue' into 'main'
feat: Implement a queue for events pending decryption Closes famedly-web#535 See merge request famedly/company/frontend/famedlysdk!1143
This commit is contained in:
commit
99c77b9276
|
|
@ -1498,6 +1498,7 @@ class Client extends MatrixApi {
|
|||
_id = accessToken = syncFilterId =
|
||||
homeserver = _userID = _deviceID = _deviceName = prevBatch = null;
|
||||
_rooms = [];
|
||||
_eventsPendingDecryption.clear();
|
||||
await encryption?.dispose();
|
||||
encryption = null;
|
||||
onLoginStateChanged.add(LoginState.loggedOut);
|
||||
|
|
@ -1716,18 +1717,60 @@ class Client extends MatrixApi {
|
|||
}
|
||||
|
||||
Future<void> _handleToDeviceEvents(List<BasicEventWithSender> events) async {
|
||||
final Map<String, List<String>> roomsWithNewKeyToSessionId = {};
|
||||
for (final event in events) {
|
||||
var toDeviceEvent = ToDeviceEvent.fromJson(event.toJson());
|
||||
Logs().v('Got to_device event of type ${toDeviceEvent.type}');
|
||||
if (toDeviceEvent.type == EventTypes.Encrypted && encryptionEnabled) {
|
||||
toDeviceEvent = await encryption!.decryptToDeviceEvent(toDeviceEvent);
|
||||
Logs().v('Decrypted type is: ${toDeviceEvent.type}');
|
||||
}
|
||||
if (encryptionEnabled) {
|
||||
if (toDeviceEvent.type == EventTypes.Encrypted) {
|
||||
toDeviceEvent = await encryption!.decryptToDeviceEvent(toDeviceEvent);
|
||||
Logs().v('Decrypted type is: ${toDeviceEvent.type}');
|
||||
|
||||
/// collect new keys so that we can find those events in the decryption queue
|
||||
if (toDeviceEvent.type == EventTypes.ForwardedRoomKey ||
|
||||
toDeviceEvent.type == EventTypes.RoomKey) {
|
||||
final roomId = event.content['room_id'];
|
||||
final sessionId = event.content['session_id'];
|
||||
if (roomId is String && sessionId is String) {
|
||||
(roomsWithNewKeyToSessionId[roomId] ??= []).add(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
await encryption?.handleToDeviceEvent(toDeviceEvent);
|
||||
}
|
||||
onToDeviceEvent.add(toDeviceEvent);
|
||||
}
|
||||
|
||||
// emit updates for all events in the queue
|
||||
for (final entry in roomsWithNewKeyToSessionId.entries) {
|
||||
final roomId = entry.key;
|
||||
final sessionIds = entry.value;
|
||||
|
||||
final room = getRoomById(roomId);
|
||||
if (room != null) {
|
||||
final List<BasicEvent> events = [];
|
||||
for (final event in _eventsPendingDecryption) {
|
||||
if (event.event.roomID != roomId) continue;
|
||||
if (!sessionIds.contains(
|
||||
event.event.content['content']?['session_id'])) continue;
|
||||
|
||||
final decryptedEvent = await event.event.decrypt(room);
|
||||
if (decryptedEvent.content.tryGet<String>('type') !=
|
||||
EventTypes.Encrypted) {
|
||||
events.add(BasicEvent.fromJson(decryptedEvent.content));
|
||||
}
|
||||
}
|
||||
|
||||
await _handleRoomEvents(
|
||||
room, events, EventUpdateType.decryptedTimelineQueue);
|
||||
|
||||
_eventsPendingDecryption.removeWhere((e) => events.any(
|
||||
(decryptedEvent) =>
|
||||
decryptedEvent.content['event_id'] ==
|
||||
e.event.content['event_id']));
|
||||
}
|
||||
}
|
||||
_eventsPendingDecryption.removeWhere((e) => e.timedOut);
|
||||
}
|
||||
|
||||
Future<void> _handleRooms(Map<String, SyncRoomUpdate> rooms,
|
||||
|
|
@ -1875,6 +1918,9 @@ class Client extends MatrixApi {
|
|||
}
|
||||
}
|
||||
|
||||
/// Stores event that came down /sync but didn't get decrypted because of missing keys yet.
|
||||
final List<_EventPendingDecryption> _eventsPendingDecryption = [];
|
||||
|
||||
Future<void> _handleRoomEvents(
|
||||
Room room, List<BasicEvent> events, EventUpdateType type,
|
||||
{bool store = true}) async {
|
||||
|
|
@ -1892,13 +1938,21 @@ class Client extends MatrixApi {
|
|||
.getState(EventTypes.Encryption)
|
||||
?.content
|
||||
.tryGet<String>('algorithm'))) {
|
||||
return;
|
||||
continue;
|
||||
}
|
||||
|
||||
var update =
|
||||
EventUpdate(roomID: room.id, type: type, content: event.toJson());
|
||||
if (event.type == EventTypes.Encrypted && encryptionEnabled) {
|
||||
update = await update.decrypt(room);
|
||||
|
||||
// if the event failed to decrypt, add it to the queue
|
||||
if (update.content.tryGet<String>('type') == EventTypes.Encrypted) {
|
||||
_eventsPendingDecryption.add(_EventPendingDecryption(EventUpdate(
|
||||
roomID: update.roomID,
|
||||
type: EventUpdateType.decryptedTimelineQueue,
|
||||
content: update.content)));
|
||||
}
|
||||
}
|
||||
if (event.type == EventTypes.Message &&
|
||||
!room.isDirectChat &&
|
||||
|
|
@ -1920,7 +1974,9 @@ class Client extends MatrixApi {
|
|||
}
|
||||
onEvent.add(update);
|
||||
|
||||
if (prevBatch != null && type == EventUpdateType.timeline) {
|
||||
if (prevBatch != null &&
|
||||
(type == EventUpdateType.timeline ||
|
||||
type == EventUpdateType.decryptedTimelineQueue)) {
|
||||
if (update.content.tryGet<String>('type')?.startsWith('m.call.') ??
|
||||
false) {
|
||||
final callEvent = Event.fromJson(update.content, room);
|
||||
|
|
@ -2122,6 +2178,7 @@ class Client extends MatrixApi {
|
|||
BasicRoomEvent.fromJson(eventUpdate.content);
|
||||
break;
|
||||
case EventUpdateType.history:
|
||||
case EventUpdateType.decryptedTimelineQueue:
|
||||
break;
|
||||
}
|
||||
room.onUpdate.add(room.id);
|
||||
|
|
@ -2697,6 +2754,7 @@ class Client extends MatrixApi {
|
|||
rooms.clear();
|
||||
await database?.clearCache();
|
||||
encryption?.keyManager.clearOutboundGroupSessions();
|
||||
_eventsPendingDecryption.clear();
|
||||
onCacheCleared.add(true);
|
||||
// Restart the syncloop
|
||||
backgroundSync = true;
|
||||
|
|
@ -2994,3 +3052,15 @@ class ArchivedRoom {
|
|||
|
||||
ArchivedRoom({required this.room, required this.timeline});
|
||||
}
|
||||
|
||||
/// An event that is waiting for a key to arrive to decrypt. Times out after some time.
|
||||
class _EventPendingDecryption {
|
||||
DateTime addedAt = DateTime.now();
|
||||
|
||||
EventUpdate event;
|
||||
|
||||
bool get timedOut =>
|
||||
addedAt.add(Duration(minutes: 5)).isBefore(DateTime.now());
|
||||
|
||||
_EventPendingDecryption(this.event);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -975,8 +975,11 @@ class HiveCollectionsDatabase extends DatabaseApi {
|
|||
}
|
||||
|
||||
// Store a common message event
|
||||
if ({EventUpdateType.timeline, EventUpdateType.history}
|
||||
.contains(eventUpdate.type)) {
|
||||
if ({
|
||||
EventUpdateType.timeline,
|
||||
EventUpdateType.history,
|
||||
EventUpdateType.decryptedTimelineQueue
|
||||
}.contains(eventUpdate.type)) {
|
||||
final eventId = eventUpdate.content['event_id'];
|
||||
// Is this ID already in the store?
|
||||
final prevEvent = await _eventsBox
|
||||
|
|
|
|||
|
|
@ -922,8 +922,11 @@ class FamedlySdkHiveDatabase extends DatabaseApi {
|
|||
}
|
||||
|
||||
// Store a common message event
|
||||
if ({EventUpdateType.timeline, EventUpdateType.history}
|
||||
.contains(eventUpdate.type)) {
|
||||
if ({
|
||||
EventUpdateType.timeline,
|
||||
EventUpdateType.history,
|
||||
EventUpdateType.decryptedTimelineQueue
|
||||
}.contains(eventUpdate.type)) {
|
||||
final eventId = eventUpdate.content['event_id'];
|
||||
// Is this ID already in the store?
|
||||
final Map? prevEvent = await _eventsBox
|
||||
|
|
|
|||
|
|
@ -19,12 +19,26 @@
|
|||
import 'package:matrix/matrix.dart';
|
||||
|
||||
enum EventUpdateType {
|
||||
/// Newly received events from /sync
|
||||
timeline,
|
||||
|
||||
/// A state update not visible in the timeline currently
|
||||
state,
|
||||
|
||||
/// Messages that have been fetched when requesting past history
|
||||
history,
|
||||
|
||||
/// Updates to account data
|
||||
accountData,
|
||||
|
||||
/// Ephemeral events like receipts
|
||||
ephemeral,
|
||||
inviteState
|
||||
|
||||
/// The state of an invite
|
||||
inviteState,
|
||||
|
||||
/// Events that came down timeline, but we only received the keys for it later so we send a second update for them in the decrypted state
|
||||
decryptedTimelineQueue,
|
||||
}
|
||||
|
||||
/// Represents a new event (e.g. a message in a room) or an update for an
|
||||
|
|
|
|||
Loading…
Reference in New Issue