feat: Implement a queue for events pending decryption
fixes https://gitlab.com/famedly/company/frontend/famedly-web/-/issues/535
This commit is contained in:
parent
c3ec0ffa2a
commit
c6a8f5a162
|
|
@ -1486,6 +1486,7 @@ class Client extends MatrixApi {
|
||||||
_id = accessToken = syncFilterId =
|
_id = accessToken = syncFilterId =
|
||||||
homeserver = _userID = _deviceID = _deviceName = prevBatch = null;
|
homeserver = _userID = _deviceID = _deviceName = prevBatch = null;
|
||||||
_rooms = [];
|
_rooms = [];
|
||||||
|
_eventsPendingDecryption.clear();
|
||||||
await encryption?.dispose();
|
await encryption?.dispose();
|
||||||
encryption = null;
|
encryption = null;
|
||||||
onLoginStateChanged.add(LoginState.loggedOut);
|
onLoginStateChanged.add(LoginState.loggedOut);
|
||||||
|
|
@ -1700,18 +1701,60 @@ class Client extends MatrixApi {
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> _handleToDeviceEvents(List<BasicEventWithSender> events) async {
|
Future<void> _handleToDeviceEvents(List<BasicEventWithSender> events) async {
|
||||||
|
final Map<String, List<String>> roomsWithNewKeyToSessionId = {};
|
||||||
for (final event in events) {
|
for (final event in events) {
|
||||||
var toDeviceEvent = ToDeviceEvent.fromJson(event.toJson());
|
var toDeviceEvent = ToDeviceEvent.fromJson(event.toJson());
|
||||||
Logs().v('Got to_device event of type ${toDeviceEvent.type}');
|
Logs().v('Got to_device event of type ${toDeviceEvent.type}');
|
||||||
if (toDeviceEvent.type == EventTypes.Encrypted && encryptionEnabled) {
|
|
||||||
toDeviceEvent = await encryption!.decryptToDeviceEvent(toDeviceEvent);
|
|
||||||
Logs().v('Decrypted type is: ${toDeviceEvent.type}');
|
|
||||||
}
|
|
||||||
if (encryptionEnabled) {
|
if (encryptionEnabled) {
|
||||||
|
if (toDeviceEvent.type == EventTypes.Encrypted) {
|
||||||
|
toDeviceEvent = await encryption!.decryptToDeviceEvent(toDeviceEvent);
|
||||||
|
Logs().v('Decrypted type is: ${toDeviceEvent.type}');
|
||||||
|
|
||||||
|
/// collect new keys so that we can find those events in the decryption queue
|
||||||
|
if (toDeviceEvent.type == EventTypes.ForwardedRoomKey ||
|
||||||
|
toDeviceEvent.type == EventTypes.RoomKey) {
|
||||||
|
final roomId = event.content['room_id'];
|
||||||
|
final sessionId = event.content['session_id'];
|
||||||
|
if (roomId is String && sessionId is String) {
|
||||||
|
(roomsWithNewKeyToSessionId[roomId] ??= []).add(sessionId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
await encryption?.handleToDeviceEvent(toDeviceEvent);
|
await encryption?.handleToDeviceEvent(toDeviceEvent);
|
||||||
}
|
}
|
||||||
onToDeviceEvent.add(toDeviceEvent);
|
onToDeviceEvent.add(toDeviceEvent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// emit updates for all events in the queue
|
||||||
|
for (final entry in roomsWithNewKeyToSessionId.entries) {
|
||||||
|
final roomId = entry.key;
|
||||||
|
final sessionIds = entry.value;
|
||||||
|
|
||||||
|
final room = getRoomById(roomId);
|
||||||
|
if (room != null) {
|
||||||
|
final List<BasicEvent> events = [];
|
||||||
|
for (final event in _eventsPendingDecryption) {
|
||||||
|
if (event.event.roomID != roomId) continue;
|
||||||
|
if (!sessionIds.contains(
|
||||||
|
event.event.content['content']?['session_id'])) continue;
|
||||||
|
|
||||||
|
final decryptedEvent = await event.event.decrypt(room);
|
||||||
|
if (decryptedEvent.content.tryGet<String>('type') !=
|
||||||
|
EventTypes.Encrypted) {
|
||||||
|
events.add(BasicEvent.fromJson(decryptedEvent.content));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await _handleRoomEvents(
|
||||||
|
room, events, EventUpdateType.decryptedTimelineQueue);
|
||||||
|
|
||||||
|
_eventsPendingDecryption.removeWhere((e) => events.any(
|
||||||
|
(decryptedEvent) =>
|
||||||
|
decryptedEvent.content['event_id'] ==
|
||||||
|
e.event.content['event_id']));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_eventsPendingDecryption.removeWhere((e) => e.timedOut);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> _handleRooms(Map<String, SyncRoomUpdate> rooms,
|
Future<void> _handleRooms(Map<String, SyncRoomUpdate> rooms,
|
||||||
|
|
@ -1859,6 +1902,9 @@ class Client extends MatrixApi {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stores event that came down /sync but didn't get decrypted because of missing keys yet.
|
||||||
|
final List<_EventPendingDecryption> _eventsPendingDecryption = [];
|
||||||
|
|
||||||
Future<void> _handleRoomEvents(
|
Future<void> _handleRoomEvents(
|
||||||
Room room, List<BasicEvent> events, EventUpdateType type,
|
Room room, List<BasicEvent> events, EventUpdateType type,
|
||||||
{bool store = true}) async {
|
{bool store = true}) async {
|
||||||
|
|
@ -1883,6 +1929,14 @@ class Client extends MatrixApi {
|
||||||
EventUpdate(roomID: room.id, type: type, content: event.toJson());
|
EventUpdate(roomID: room.id, type: type, content: event.toJson());
|
||||||
if (event.type == EventTypes.Encrypted && encryptionEnabled) {
|
if (event.type == EventTypes.Encrypted && encryptionEnabled) {
|
||||||
update = await update.decrypt(room);
|
update = await update.decrypt(room);
|
||||||
|
|
||||||
|
// if the event failed to decrypt, add it to the queue
|
||||||
|
if (update.content.tryGet<String>('type') == EventTypes.Encrypted) {
|
||||||
|
_eventsPendingDecryption.add(_EventPendingDecryption(EventUpdate(
|
||||||
|
roomID: update.roomID,
|
||||||
|
type: EventUpdateType.decryptedTimelineQueue,
|
||||||
|
content: update.content)));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (event.type == EventTypes.Message &&
|
if (event.type == EventTypes.Message &&
|
||||||
!room.isDirectChat &&
|
!room.isDirectChat &&
|
||||||
|
|
@ -1904,7 +1958,9 @@ class Client extends MatrixApi {
|
||||||
}
|
}
|
||||||
onEvent.add(update);
|
onEvent.add(update);
|
||||||
|
|
||||||
if (prevBatch != null && type == EventUpdateType.timeline) {
|
if (prevBatch != null &&
|
||||||
|
(type == EventUpdateType.timeline ||
|
||||||
|
type == EventUpdateType.decryptedTimelineQueue)) {
|
||||||
if (update.content.tryGet<String>('type')?.startsWith('m.call.') ??
|
if (update.content.tryGet<String>('type')?.startsWith('m.call.') ??
|
||||||
false) {
|
false) {
|
||||||
final callEvent = Event.fromJson(update.content, room);
|
final callEvent = Event.fromJson(update.content, room);
|
||||||
|
|
@ -2106,6 +2162,7 @@ class Client extends MatrixApi {
|
||||||
BasicRoomEvent.fromJson(eventUpdate.content);
|
BasicRoomEvent.fromJson(eventUpdate.content);
|
||||||
break;
|
break;
|
||||||
case EventUpdateType.history:
|
case EventUpdateType.history:
|
||||||
|
case EventUpdateType.decryptedTimelineQueue:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
room.onUpdate.add(room.id);
|
room.onUpdate.add(room.id);
|
||||||
|
|
@ -2681,6 +2738,7 @@ class Client extends MatrixApi {
|
||||||
rooms.clear();
|
rooms.clear();
|
||||||
await database?.clearCache();
|
await database?.clearCache();
|
||||||
encryption?.keyManager.clearOutboundGroupSessions();
|
encryption?.keyManager.clearOutboundGroupSessions();
|
||||||
|
_eventsPendingDecryption.clear();
|
||||||
onCacheCleared.add(true);
|
onCacheCleared.add(true);
|
||||||
// Restart the syncloop
|
// Restart the syncloop
|
||||||
backgroundSync = true;
|
backgroundSync = true;
|
||||||
|
|
@ -2978,3 +3036,15 @@ class ArchivedRoom {
|
||||||
|
|
||||||
ArchivedRoom({required this.room, required this.timeline});
|
ArchivedRoom({required this.room, required this.timeline});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An event that is waiting for a key to arrive to decrypt. Times out after some time.
|
||||||
|
class _EventPendingDecryption {
|
||||||
|
DateTime addedAt = DateTime.now();
|
||||||
|
|
||||||
|
EventUpdate event;
|
||||||
|
|
||||||
|
bool get timedOut =>
|
||||||
|
addedAt.add(Duration(minutes: 5)).isBefore(DateTime.now());
|
||||||
|
|
||||||
|
_EventPendingDecryption(this.event);
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -975,8 +975,11 @@ class HiveCollectionsDatabase extends DatabaseApi {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store a common message event
|
// Store a common message event
|
||||||
if ({EventUpdateType.timeline, EventUpdateType.history}
|
if ({
|
||||||
.contains(eventUpdate.type)) {
|
EventUpdateType.timeline,
|
||||||
|
EventUpdateType.history,
|
||||||
|
EventUpdateType.decryptedTimelineQueue
|
||||||
|
}.contains(eventUpdate.type)) {
|
||||||
final eventId = eventUpdate.content['event_id'];
|
final eventId = eventUpdate.content['event_id'];
|
||||||
// Is this ID already in the store?
|
// Is this ID already in the store?
|
||||||
final prevEvent = await _eventsBox
|
final prevEvent = await _eventsBox
|
||||||
|
|
|
||||||
|
|
@ -922,8 +922,11 @@ class FamedlySdkHiveDatabase extends DatabaseApi {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store a common message event
|
// Store a common message event
|
||||||
if ({EventUpdateType.timeline, EventUpdateType.history}
|
if ({
|
||||||
.contains(eventUpdate.type)) {
|
EventUpdateType.timeline,
|
||||||
|
EventUpdateType.history,
|
||||||
|
EventUpdateType.decryptedTimelineQueue
|
||||||
|
}.contains(eventUpdate.type)) {
|
||||||
final eventId = eventUpdate.content['event_id'];
|
final eventId = eventUpdate.content['event_id'];
|
||||||
// Is this ID already in the store?
|
// Is this ID already in the store?
|
||||||
final Map? prevEvent = await _eventsBox
|
final Map? prevEvent = await _eventsBox
|
||||||
|
|
|
||||||
|
|
@ -19,12 +19,26 @@
|
||||||
import 'package:matrix/matrix.dart';
|
import 'package:matrix/matrix.dart';
|
||||||
|
|
||||||
enum EventUpdateType {
|
enum EventUpdateType {
|
||||||
|
/// Newly received events from /sync
|
||||||
timeline,
|
timeline,
|
||||||
|
|
||||||
|
/// A state update not visible in the timeline currently
|
||||||
state,
|
state,
|
||||||
|
|
||||||
|
/// Messages that have been fetched when requesting past history
|
||||||
history,
|
history,
|
||||||
|
|
||||||
|
/// Updates to account data
|
||||||
accountData,
|
accountData,
|
||||||
|
|
||||||
|
/// Ephemeral events like receipts
|
||||||
ephemeral,
|
ephemeral,
|
||||||
inviteState
|
|
||||||
|
/// The state of an invite
|
||||||
|
inviteState,
|
||||||
|
|
||||||
|
/// Events that came down timeline, but we only received the keys for it later so we send a second update for them in the decrypted state
|
||||||
|
decryptedTimelineQueue,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Represents a new event (e.g. a message in a room) or an update for an
|
/// Represents a new event (e.g. a message in a room) or an update for an
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue