diff --git a/lib/src/room.dart b/lib/src/room.dart index bf244d34..2e1fecff 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -902,6 +902,7 @@ class Room { '$id': (JoinedRoomUpdate() ..state = resp.state ..timeline = (TimelineUpdate() + ..limited = false ..events = resp.chunk ..prevBatch = resp.end)), }), diff --git a/lib/src/timeline.dart b/lib/src/timeline.dart index a89f386e..004d49dc 100644 --- a/lib/src/timeline.dart +++ b/lib/src/timeline.dart @@ -33,10 +33,10 @@ typedef onTimelineInsertCallback = void Function(int insertID); /// event list will be retreived when created by the [room.getTimeline] method. class Timeline { final Room room; - List events = []; + final List events; /// Map of event ID to map of type to set of aggregated events - Map>> aggregatedEvents = {}; + final Map>> aggregatedEvents = {}; final onTimelineUpdateCallback onUpdate; final onTimelineInsertCallback onInsert; @@ -62,6 +62,13 @@ class Timeline { return _eventCache[id]; } + // When fetching history, we will collect them into the `_historyUpdates` set + // first, and then only process all events at once, once we have the full history. + // This ensures that the entire history fetching only triggers `onUpdate` only *once*, + // even if /sync's complete while history is being proccessed. + bool _collectHistoryUpdates = false; + final Set _historyUpdates = {}; + Future requestHistory( {int historyCount = Room.DefaultHistoryCount}) async { if (!_requestingHistoryLock) { @@ -69,18 +76,32 @@ class Timeline { await room.requestHistory( historyCount: historyCount, onHistoryReceived: () { - if (room.prev_batch.isEmpty || room.prev_batch == null) { - events.clear(); - aggregatedEvents.clear(); - } + _collectHistoryUpdates = true; }, ); - await Future.delayed(const Duration(seconds: 2)); - _requestingHistoryLock = false; + try { + await Future.delayed(const Duration(seconds: 2)); + _proccessHistoryUpdates(); + } finally { + _collectHistoryUpdates = false; + _requestingHistoryLock = false; + } } } - Timeline({this.room, this.events, this.onUpdate, this.onInsert}) { + void _proccessHistoryUpdates() async { + _collectHistoryUpdates = false; + for (final update in _historyUpdates) { + _handleEventUpdate(await update.decrypt(room, store: true), + sortAndUpdate: false); + } + _historyUpdates.clear(); + _sort(); + if (onUpdate != null) onUpdate(); + } + + Timeline({this.room, List events, this.onUpdate, this.onInsert}) + : events = events ?? [] { sub ??= room.client.onEvent.stream.listen(_handleEventUpdate); // if the timeline is limited we want to clear our events cache // as r.limitedTimeline can be "null" sometimes, we need to check for == true @@ -222,65 +243,75 @@ class Timeline { } } - void _handleEventUpdate(EventUpdate eventUpdate) async { + void _handleEventUpdate(EventUpdate eventUpdate, + {bool sortAndUpdate = true}) { try { if (eventUpdate.roomID != room.id) return; - if (eventUpdate.type == EventUpdateType.timeline || - eventUpdate.type == EventUpdateType.history) { - var status = eventUpdate.content['status'] ?? - (eventUpdate.content['unsigned'] is Map - ? eventUpdate.content['unsigned'][MessageSendingStatusKey] - : null) ?? - 2; - // Redaction events are handled as modification for existing events. - if (eventUpdate.eventType == EventTypes.Redaction) { - final eventId = _findEvent(event_id: eventUpdate.content['redacts']); - if (eventId < events.length) { - removeAggregatedEvent(events[eventId]); - events[eventId].setRedactionEvent(Event.fromJson( - eventUpdate.content, room, eventUpdate.sortOrder)); - } - } else if (status == -2) { - var i = _findEvent(event_id: eventUpdate.content['event_id']); - if (i < events.length) { - removeAggregatedEvent(events[i]); - events.removeAt(i); + if (eventUpdate.type == EventUpdateType.history && + _collectHistoryUpdates) { + _historyUpdates.add(eventUpdate); + return; + } + + if (eventUpdate.type != EventUpdateType.timeline && + eventUpdate.type != EventUpdateType.history) { + return; + } + var status = eventUpdate.content['status'] ?? + (eventUpdate.content['unsigned'] is Map + ? eventUpdate.content['unsigned'][MessageSendingStatusKey] + : null) ?? + 2; + // Redaction events are handled as modification for existing events. + if (eventUpdate.eventType == EventTypes.Redaction) { + final eventId = _findEvent(event_id: eventUpdate.content['redacts']); + if (eventId < events.length) { + removeAggregatedEvent(events[eventId]); + events[eventId].setRedactionEvent( + Event.fromJson(eventUpdate.content, room, eventUpdate.sortOrder)); + } + } else if (status == -2) { + var i = _findEvent(event_id: eventUpdate.content['event_id']); + if (i < events.length) { + removeAggregatedEvent(events[i]); + events.removeAt(i); + } + } else { + var i = _findEvent( + event_id: eventUpdate.content['event_id'], + unsigned_txid: eventUpdate.content['unsigned'] is Map + ? eventUpdate.content['unsigned']['transaction_id'] + : null); + + if (i < events.length) { + // if the old status is larger than the new one, we also want to preserve the old status + final oldStatus = events[i].status; + events[i] = + Event.fromJson(eventUpdate.content, room, eventUpdate.sortOrder); + // do we preserve the status? we should allow 0 -> -1 updates and status increases + if (status < oldStatus && !(status == -1 && oldStatus == 0)) { + events[i].status = oldStatus; } + addAggregatedEvent(events[i]); } else { - var i = _findEvent( - event_id: eventUpdate.content['event_id'], - unsigned_txid: eventUpdate.content['unsigned'] is Map - ? eventUpdate.content['unsigned']['transaction_id'] - : null); + var newEvent = + Event.fromJson(eventUpdate.content, room, eventUpdate.sortOrder); - if (i < events.length) { - // if the old status is larger than the new one, we also want to preserve the old status - final oldStatus = events[i].status; - events[i] = Event.fromJson( - eventUpdate.content, room, eventUpdate.sortOrder); - // do we preserve the status? we should allow 0 -> -1 updates and status increases - if (status < oldStatus && !(status == -1 && oldStatus == 0)) { - events[i].status = oldStatus; - } - addAggregatedEvent(events[i]); - } else { - var newEvent = Event.fromJson( - eventUpdate.content, room, eventUpdate.sortOrder); + if (eventUpdate.type == EventUpdateType.history && + events.indexWhere( + (e) => e.eventId == eventUpdate.content['event_id']) != + -1) return; - if (eventUpdate.type == EventUpdateType.history && - events.indexWhere( - (e) => e.eventId == eventUpdate.content['event_id']) != - -1) return; - - events.insert(0, newEvent); - addAggregatedEvent(newEvent); - if (onInsert != null) onInsert(0); - } + events.insert(0, newEvent); + addAggregatedEvent(newEvent); + if (onInsert != null) onInsert(0); } } - _sort(); - if (onUpdate != null) onUpdate(); + if (sortAndUpdate) { + _sort(); + if (onUpdate != null) onUpdate(); + } } catch (e, s) { Logs.warning('Handle event update failed: ${e.toString()}', s); }