From 8f4da40dec027ca581296489fa9477eed82777e2 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Sun, 19 Oct 2025 17:44:46 +0500 Subject: [PATCH] split Timeline into abstract and RoomTimeline --- lib/src/client.dart | 3 +- lib/src/room_timeline.dart | 622 ++++++++++++++++++++++++++++++++ lib/src/thread.dart | 4 +- lib/src/timeline.dart | 713 ++++--------------------------------- 4 files changed, 702 insertions(+), 640 deletions(-) create mode 100644 lib/src/room_timeline.dart diff --git a/lib/src/client.dart b/lib/src/client.dart index de3213eb..38a0195d 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -25,6 +25,7 @@ import 'dart:typed_data'; import 'package:async/async.dart'; import 'package:collection/collection.dart' show IterableExtension; import 'package:http/http.dart' as http; +import 'package:matrix/src/room_timeline.dart'; import 'package:mime/mime.dart'; import 'package:random_string/random_string.dart'; import 'package:vodozemac/vodozemac.dart' as vod; @@ -1213,7 +1214,7 @@ class Client extends MatrixApi { // Set membership of room to leave, in the case we got a left room passed, otherwise // the left room would have still membership join, which would be wrong for the setState later archivedRoom.membership = Membership.leave; - final timeline = Timeline( + final timeline = RoomTimeline( room: archivedRoom, chunk: TimelineChunk( events: roomUpdate.timeline?.events?.reversed diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart new file mode 100644 index 00000000..46cf9551 --- /dev/null +++ b/lib/src/room_timeline.dart @@ -0,0 +1,622 @@ +/* + * Famedly Matrix SDK + * Copyright (C) 2019, 2020, 2021 Famedly GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import 'dart:async'; +import 'dart:convert'; + +import 'package:collection/collection.dart'; +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; + +/// Represents the main timeline of a room. +class RoomTimeline extends Timeline { + @override + final Room room; + @override + List get events => chunk.events; + + TimelineChunk chunk; + + StreamSubscription? timelineSub; + StreamSubscription? historySub; + StreamSubscription? roomSub; + StreamSubscription? sessionIdReceivedSub; + StreamSubscription? cancelSendEventSub; + + bool isRequestingHistory = false; + bool isRequestingFuture = false; + bool allowNewEvent = true; + bool isFragmentedTimeline = false; + + final Map _eventCache = {}; + + // When fetching history, we will collect them into the `_historyUpdates` set + // first, and then only process all events at once, once we have the full history. + // This ensures that the entire history fetching only triggers `onUpdate` only *once*, + // even if /sync's complete while history is being proccessed. + bool _collectHistoryUpdates = false; + + // We confirmed, that there are no more events to load from the database. + bool _fetchedAllDatabaseEvents = false; + + @override + bool get canRequestHistory { + if (!{Membership.join, Membership.leave}.contains(room.membership)) { + return false; + } + if (events.isEmpty) return true; + return !_fetchedAllDatabaseEvents || + (room.prev_batch != null && events.last.type != EventTypes.RoomCreate); + } + + @override + bool get canRequestFuture => !allowNewEvent; + + RoomTimeline({ + required this.room, + required this.chunk, + super.onUpdate, + super.onChange, + super.onInsert, + super.onRemove, + super.onNewEvent, + }) { + timelineSub = room.client.onTimelineEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.timeline), + ); + historySub = room.client.onHistoryEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.history), + ); + + // If the timeline is limited we want to clear our events cache + roomSub = room.client.onSync.stream + .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true) + .listen(_removeEventsNotInThisSync); + + sessionIdReceivedSub = + room.onSessionKeyReceived.stream.listen(_sessionKeyReceived); + cancelSendEventSub = + room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent); + + // we want to populate our aggregated events + for (final e in events) { + addAggregatedEvent(e); + } + + // we are using a fragmented timeline + if (chunk.nextBatch != '') { + allowNewEvent = false; + isFragmentedTimeline = true; + // fragmented timelines never read from the database. + _fetchedAllDatabaseEvents = true; + } + } + + @override + Future getEventById(String id) async { + for (final event in events) { + if (event.eventId == id) return event; + } + if (_eventCache.containsKey(id)) return _eventCache[id]; + final requestedEvent = await room.getEventById(id); + if (requestedEvent == null) return null; + _eventCache[id] = requestedEvent; + return _eventCache[id]; + } + + @override + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (isRequestingHistory) return; + isRequestingHistory = true; + await _requestEvents( + direction: Direction.b, + historyCount: historyCount, + filter: filter, + ); + isRequestingHistory = false; + } + + @override + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (allowNewEvent) return; + if (isRequestingFuture) return; + isRequestingFuture = true; + await _requestEvents( + direction: Direction.f, + historyCount: historyCount, + filter: filter, + ); + isRequestingFuture = false; + } + + Future _requestEvents({ + int historyCount = Room.defaultHistoryCount, + required Direction direction, + StateFilter? filter, + }) async { + onUpdate?.call(); + + try { + // Look up for events in the database first. With fragmented view, we should delete the database cache + final eventsFromStore = isFragmentedTimeline + ? null + : await room.client.database.getEventList( + room, + start: events.length, + limit: historyCount, + ); + + if (eventsFromStore != null && eventsFromStore.isNotEmpty) { + for (final e in eventsFromStore) { + addAggregatedEvent(e); + } + // Fetch all users from database we have got here. + for (final event in events) { + if (room.getState(EventTypes.RoomMember, event.senderId) != null) { + continue; + } + final dbUser = + await room.client.database.getUser(event.senderId, room); + if (dbUser != null) room.setState(dbUser); + } + + if (direction == Direction.b) { + events.addAll(eventsFromStore); + final startIndex = events.length - eventsFromStore.length; + final endIndex = events.length; + for (var i = startIndex; i < endIndex; i++) { + onInsert?.call(i); + } + } else { + events.insertAll(0, eventsFromStore); + final startIndex = eventsFromStore.length; + final endIndex = 0; + for (var i = startIndex; i > endIndex; i--) { + onInsert?.call(i); + } + } + } else { + _fetchedAllDatabaseEvents = true; + Logs().i('No more events found in the store. Request from server...'); + + if (isFragmentedTimeline) { + await getRoomEvents( + historyCount: historyCount, + direction: direction, + filter: filter, + ); + } else { + if (room.prev_batch == null) { + Logs().i('No more events to request from server...'); + } else { + await room.requestHistory( + historyCount: historyCount, + direction: direction, + onHistoryReceived: () { + _collectHistoryUpdates = true; + }, + filter: filter, + ); + } + } + } + } finally { + _collectHistoryUpdates = false; + isRequestingHistory = false; + onUpdate?.call(); + } + } + + /// Request more previous events from the server. + Future getRoomEvents({ + int historyCount = Room.defaultHistoryCount, + direction = Direction.b, + StateFilter? filter, + }) async { + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + final resp = await room.client.getRoomEvents( + room.id, + direction, + from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, + limit: historyCount, + filter: jsonEncode(filter.toJson()), + ); + + if (resp.end == null) { + Logs().w('We reached the end of the timeline'); + } + + final newNextBatch = direction == Direction.b ? resp.start : resp.end; + final newPrevBatch = direction == Direction.b ? resp.end : resp.start; + + final type = direction == Direction.b + ? EventUpdateType.history + : EventUpdateType.timeline; + + if ((resp.state?.length ?? 0) == 0 && + resp.start != resp.end && + newPrevBatch != null && + newNextBatch != null) { + if (type == EventUpdateType.history) { + Logs().w( + '[nav] we can still request history prevBatch: $type $newPrevBatch', + ); + } else { + Logs().w( + '[nav] we can still request timeline nextBatch: $type $newNextBatch', + ); + } + } + + final newEvents = + resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList(); + + if (!allowNewEvent) { + if (resp.start == resp.end || + (resp.end == null && direction == Direction.f)) { + allowNewEvent = true; + } + + if (allowNewEvent) { + Logs().d('We now allow sync update into the timeline.'); + newEvents.addAll( + await room.client.database.getEventList(room, onlySending: true), + ); + } + } + + // Try to decrypt encrypted events but don't update the database. + if (room.encrypted && room.client.encryptionEnabled) { + for (var i = 0; i < newEvents.length; i++) { + if (newEvents[i].type == EventTypes.Encrypted) { + newEvents[i] = await room.client.encryption!.decryptRoomEvent( + newEvents[i], + ); + } + } + } + + // update chunk anchors + if (type == EventUpdateType.history) { + chunk.prevBatch = newPrevBatch ?? ''; + + final offset = chunk.events.length; + + chunk.events.addAll(newEvents); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i + offset); + } + } else { + chunk.nextBatch = newNextBatch ?? ''; + chunk.events.insertAll(0, newEvents.reversed); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i); + } + } + + if (onUpdate != null) { + onUpdate!(); + } + return resp.chunk.length; + } + + void _cleanUpCancelledEvent(String eventId) { + final i = _findEvent(event_id: eventId); + if (i < events.length) { + removeAggregatedEvent(events[i]); + events.removeAt(i); + onRemove?.call(i); + onUpdate?.call(); + } + } + + /// Removes all entries from [events] which are not in this SyncUpdate. + void _removeEventsNotInThisSync(SyncUpdate sync) { + final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? []; + final keepEventIds = newSyncEvents.map((e) => e.eventId); + events.removeWhere((e) => !keepEventIds.contains(e.eventId)); + } + + @override + void cancelSubscriptions() { + timelineSub?.cancel(); + historySub?.cancel(); + roomSub?.cancel(); + sessionIdReceivedSub?.cancel(); + cancelSendEventSub?.cancel(); + } + + void _sessionKeyReceived(String sessionId) async { + var decryptAtLeastOneEvent = false; + Future decryptFn() async { + final encryption = room.client.encryption; + if (!room.client.encryptionEnabled || encryption == null) { + return; + } + for (var i = 0; i < events.length; i++) { + if (events[i].type == EventTypes.Encrypted && + events[i].messageType == MessageTypes.BadEncrypted && + events[i].content['session_id'] == sessionId) { + events[i] = await encryption.decryptRoomEvent( + events[i], + store: true, + updateType: EventUpdateType.history, + ); + addAggregatedEvent(events[i]); + onChange?.call(i); + if (events[i].type != EventTypes.Encrypted) { + decryptAtLeastOneEvent = true; + } + } + } + } + + await room.client.database.transaction(decryptFn); + if (decryptAtLeastOneEvent) onUpdate?.call(); + } + + @override + void requestKeys({ + bool tryOnlineBackup = true, + bool onlineKeyBackupOnly = true, + }) { + for (final event in events) { + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + final sessionId = event.content.tryGet('session_id'); + final senderKey = event.content.tryGet('sender_key'); + if (sessionId != null && senderKey != null) { + room.client.encryption?.keyManager.maybeAutoRequest( + room.id, + sessionId, + senderKey, + tryOnlineBackup: tryOnlineBackup, + onlineKeyBackupOnly: onlineKeyBackupOnly, + ); + } + } + } + } + + @override + Future setReadMarker({String? eventId, bool? public}) async { + eventId ??= + events.firstWhereOrNull((event) => event.status.isSynced)?.eventId; + if (eventId == null) return; + return room.setReadMarker(eventId, mRead: eventId, public: public); + } + + /// Find event index by event ID or transaction ID + int _findEvent({String? event_id, String? unsigned_txid}) { + final searchNeedle = {}; + if (event_id != null) searchNeedle.add(event_id); + if (unsigned_txid != null) searchNeedle.add(unsigned_txid); + + int i; + for (i = 0; i < events.length; i++) { + final searchHaystack = {events[i].eventId}; + final txnid = events[i].transactionId; + if (txnid != null) searchHaystack.add(txnid); + if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; + } + return i; + } + + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); + } + + @override + void _handleEventUpdate( + Event event, + EventUpdateType type, { + bool update = true, + }) { + try { + if (event.roomId != room.id) return; + + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { + return; + } + + if (type == EventUpdateType.timeline) { + onNewEvent?.call(); + } + + if (!allowNewEvent) return; + + final status = event.status; + + final i = _findEvent( + event_id: event.eventId, + unsigned_txid: event.transactionId, + ); + + if (i < events.length) { + // if the old status is larger than the new one, we also want to preserve the old status + final oldStatus = events[i].status; + events[i] = event; + // do we preserve the status? we should allow 0 -> -1 updates and status increases + if ((latestEventStatus(status, oldStatus) == oldStatus) && + !(status.isError && oldStatus.isSending)) { + events[i].status = oldStatus; + } + addAggregatedEvent(events[i]); + onChange?.call(i); + } else { + if (type == EventUpdateType.history && + events.indexWhere((e) => e.eventId == event.eventId) != -1) { + return; + } + var index = events.length; + if (type == EventUpdateType.history) { + events.add(event); + } else { + index = events.firstIndexWhereNotError; + events.insert(index, event); + } + onInsert?.call(index); + + addAggregatedEvent(event); + } + + // Handle redaction events + if (event.type == EventTypes.Redaction) { + final index = _findEvent(event_id: event.redacts); + if (index < events.length) { + removeAggregatedEvent(events[index]); + + // Is the redacted event a reaction? Then update the event this + // belongs to: + if (onChange != null) { + final relationshipEventId = events[index].relationshipEventId; + if (relationshipEventId != null) { + onChange?.call(_findEvent(event_id: relationshipEventId)); + return; + } + } + + events[index].setRedactionEvent(event); + onChange?.call(index); + } + } + + if (update && !_collectHistoryUpdates) { + onUpdate?.call(); + } + } catch (e, s) { + Logs().w('Handle event update failed', e, s); + } + } + + @override + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + @Deprecated('Use [prevBatch] instead.') String? sinceEventId, + int? limit, + bool Function(Event)? searchFunc, + }) async* { + assert(searchTerm != null || searchFunc != null); + searchFunc ??= (event) => + event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? ''); + final found = []; + + if (sinceEventId == null) { + // Search locally + for (final event in events) { + if (searchFunc(event)) { + yield (found..add(event), null); + } + } + + // Search in database + var start = events.length; + while (true) { + final eventsFromStore = await room.client.database.getEventList( + room, + start: start, + limit: requestHistoryCount, + ); + if (eventsFromStore.isEmpty) break; + start += eventsFromStore.length; + for (final event in eventsFromStore) { + if (searchFunc(event)) { + yield (found..add(event), null); + } + } + } + } + + // Search on the server + prevBatch ??= room.prev_batch; + if (sinceEventId != null) { + prevBatch = + (await room.client.getEventContext(room.id, sinceEventId)).end; + } + final encryption = room.client.encryption; + for (var i = 0; i < maxHistoryRequests; i++) { + if (prevBatch == null) break; + if (limit != null && found.length >= limit) break; + try { + final resp = await room.client.getRoomEvents( + room.id, + Direction.b, + from: prevBatch, + limit: requestHistoryCount, + filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), + ); + for (final matrixEvent in resp.chunk) { + var event = Event.fromMatrixEvent(matrixEvent, room); + if (event.type == EventTypes.Encrypted && encryption != null) { + event = await encryption.decryptRoomEvent(event); + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + // Await requestKey() here to ensure decrypted message bodies + await event.requestKey(); + } + } + if (searchFunc(event)) { + yield (found..add(event), resp.end); + if (limit != null && found.length >= limit) break; + } + } + prevBatch = resp.end; + // We are at the beginning of the room + if (resp.chunk.length < requestHistoryCount) break; + } on MatrixException catch (e) { + // We have no permission anymore to request the history + if (e.error == MatrixError.M_FORBIDDEN) { + break; + } + rethrow; + } + } + return; + } +} + +extension on List { + int get firstIndexWhereNotError { + if (isEmpty) return 0; + final index = indexWhere((event) => !event.status.isError); + if (index == -1) return length; + return index; + } +} \ No newline at end of file diff --git a/lib/src/thread.dart b/lib/src/thread.dart index ed9f8216..aa707a8e 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -16,11 +16,11 @@ import 'package:matrix/src/utils/space_child.dart'; class Thread { final Room room; - final String threadRootId; + final MatrixEvent rootEvent; Thread({ required Room this.room, - required String this.threadRootId + required MatrixEvent this.rootEvent }) { } diff --git a/lib/src/timeline.dart b/lib/src/timeline.dart index 5ad7758d..3decd3cb 100644 --- a/lib/src/timeline.dart +++ b/lib/src/timeline.dart @@ -20,492 +20,89 @@ import 'dart:async'; import 'dart:convert'; import 'package:collection/collection.dart'; - import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; -/// Represents the timeline of a room. The callback [onUpdate] will be triggered -/// automatically. The initial -/// event list will be retreived when created by the `room.getTimeline()` method. - -class Timeline { - final Room room; - List get events => chunk.events; +/// Abstract base class for all timeline implementations. +/// Provides common functionality for event management, aggregation, and search. +abstract class Timeline { + /// The list of events in this timeline + List get events; /// Map of event ID to map of type to set of aggregated events final Map>> aggregatedEvents = {}; + /// Called when the timeline is updated final void Function()? onUpdate; + + /// Called when an event at specific index changes final void Function(int index)? onChange; + + /// Called when an event is inserted at specific index final void Function(int index)? onInsert; + + /// Called when an event is removed from specific index final void Function(int index)? onRemove; + + /// Called when a new event is added to the timeline final void Function()? onNewEvent; - StreamSubscription? timelineSub; - StreamSubscription? historySub; - StreamSubscription? roomSub; - StreamSubscription? sessionIdReceivedSub; - StreamSubscription? cancelSendEventSub; - bool isRequestingHistory = false; - bool isRequestingFuture = false; - - bool allowNewEvent = true; - bool isFragmentedTimeline = false; - - final Map _eventCache = {}; - - TimelineChunk chunk; - - /// Searches for the event in this timeline. If not - /// found, requests from the server. Requested events - /// are cached. - Future getEventById(String id) async { - for (final event in events) { - if (event.eventId == id) return event; - } - if (_eventCache.containsKey(id)) return _eventCache[id]; - final requestedEvent = await room.getEventById(id); - if (requestedEvent == null) return null; - _eventCache[id] = requestedEvent; - return _eventCache[id]; - } - - // When fetching history, we will collect them into the `_historyUpdates` set - // first, and then only process all events at once, once we have the full history. - // This ensures that the entire history fetching only triggers `onUpdate` only *once*, - // even if /sync's complete while history is being proccessed. - bool _collectHistoryUpdates = false; - - // We confirmed, that there are no more events to load from the database. - bool _fetchedAllDatabaseEvents = false; - - bool get canRequestHistory { - if (!{Membership.join, Membership.leave}.contains(room.membership)) { - return false; - } - if (events.isEmpty) return true; - return !_fetchedAllDatabaseEvents || - (room.prev_batch != null && events.last.type != EventTypes.RoomCreate); - } - - /// Request more previous events from the server. [historyCount] defines how many events should - /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the - /// events, which can include various criteria such as event types (e.g., [EventTypes.Message]) - /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to - /// true by default, but this can be overridden. - /// This method does not return a value. - Future requestHistory({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, - }) async { - if (isRequestingHistory) { - return; - } - - isRequestingHistory = true; - await _requestEvents( - direction: Direction.b, - historyCount: historyCount, - filter: filter, - ); - isRequestingHistory = false; - } - - bool get canRequestFuture => !allowNewEvent; - - /// Request more future events from the server. [historyCount] defines how many events should - /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the - /// events, which can include various criteria such as event types (e.g., [EventTypes.Message]) - /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to - /// true by default, but this can be overridden. - /// This method does not return a value. - Future requestFuture({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, - }) async { - if (allowNewEvent) { - return; // we shouldn't force to add new events if they will autatically be added - } - - if (isRequestingFuture) return; - isRequestingFuture = true; - await _requestEvents( - direction: Direction.f, - historyCount: historyCount, - filter: filter, - ); - isRequestingFuture = false; - } - - Future _requestEvents({ - int historyCount = Room.defaultHistoryCount, - required Direction direction, - StateFilter? filter, - }) async { - onUpdate?.call(); - - try { - // Look up for events in the database first. With fragmented view, we should delete the database cache - final eventsFromStore = isFragmentedTimeline - ? null - : await room.client.database.getEventList( - room, - start: events.length, - limit: historyCount, - ); - - if (eventsFromStore != null && eventsFromStore.isNotEmpty) { - for (final e in eventsFromStore) { - addAggregatedEvent(e); - } - // Fetch all users from database we have got here. - for (final event in events) { - if (room.getState(EventTypes.RoomMember, event.senderId) != null) { - continue; - } - final dbUser = - await room.client.database.getUser(event.senderId, room); - if (dbUser != null) room.setState(dbUser); - } - - if (direction == Direction.b) { - events.addAll(eventsFromStore); - final startIndex = events.length - eventsFromStore.length; - final endIndex = events.length; - for (var i = startIndex; i < endIndex; i++) { - onInsert?.call(i); - } - } else { - events.insertAll(0, eventsFromStore); - final startIndex = eventsFromStore.length; - final endIndex = 0; - for (var i = startIndex; i > endIndex; i--) { - onInsert?.call(i); - } - } - } else { - _fetchedAllDatabaseEvents = true; - Logs().i('No more events found in the store. Request from server...'); - - if (isFragmentedTimeline) { - await getRoomEvents( - historyCount: historyCount, - direction: direction, - filter: filter, - ); - } else { - if (room.prev_batch == null) { - Logs().i('No more events to request from server...'); - } else { - await room.requestHistory( - historyCount: historyCount, - direction: direction, - onHistoryReceived: () { - _collectHistoryUpdates = true; - }, - filter: filter, - ); - } - } - } - } finally { - _collectHistoryUpdates = false; - isRequestingHistory = false; - onUpdate?.call(); - } - } - - /// Request more previous events from the server. [historyCount] defines how much events should - /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before** - /// the historical events will be published in the onEvent stream. [filter] allows you to specify a - /// [StateFilter] object to filter the events, which can include various criteria such as - /// event types (e.g., [EventTypes.Message]) and other state-related filters. - /// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden. - /// Returns the actual count of received timeline events. - Future getRoomEvents({ - int historyCount = Room.defaultHistoryCount, - direction = Direction.b, - StateFilter? filter, - }) async { - // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set - filter ??= StateFilter(lazyLoadMembers: true); - filter.lazyLoadMembers ??= true; - - final resp = await room.client.getRoomEvents( - room.id, - direction, - from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, - limit: historyCount, - filter: jsonEncode(filter.toJson()), - ); - - if (resp.end == null) { - Logs().w('We reached the end of the timeline'); - } - - final newNextBatch = direction == Direction.b ? resp.start : resp.end; - final newPrevBatch = direction == Direction.b ? resp.end : resp.start; - - final type = direction == Direction.b - ? EventUpdateType.history - : EventUpdateType.timeline; - - if ((resp.state?.length ?? 0) == 0 && - resp.start != resp.end && - newPrevBatch != null && - newNextBatch != null) { - if (type == EventUpdateType.history) { - Logs().w( - '[nav] we can still request history prevBatch: $type $newPrevBatch', - ); - } else { - Logs().w( - '[nav] we can still request timeline nextBatch: $type $newNextBatch', - ); - } - } - - final newEvents = - resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList(); - - if (!allowNewEvent) { - if (resp.start == resp.end || - (resp.end == null && direction == Direction.f)) { - allowNewEvent = true; - } - - if (allowNewEvent) { - Logs().d('We now allow sync update into the timeline.'); - newEvents.addAll( - await room.client.database.getEventList(room, onlySending: true), - ); - } - } - - // Try to decrypt encrypted events but don't update the database. - if (room.encrypted && room.client.encryptionEnabled) { - for (var i = 0; i < newEvents.length; i++) { - if (newEvents[i].type == EventTypes.Encrypted) { - newEvents[i] = await room.client.encryption!.decryptRoomEvent( - newEvents[i], - ); - } - } - } - - // update chunk anchors - if (type == EventUpdateType.history) { - chunk.prevBatch = newPrevBatch ?? ''; - - final offset = chunk.events.length; - - chunk.events.addAll(newEvents); - - for (var i = 0; i < newEvents.length; i++) { - onInsert?.call(i + offset); - } - } else { - chunk.nextBatch = newNextBatch ?? ''; - chunk.events.insertAll(0, newEvents.reversed); - - for (var i = 0; i < newEvents.length; i++) { - onInsert?.call(i); - } - } - - if (onUpdate != null) { - onUpdate!(); - } - return resp.chunk.length; - } + bool get canRequestHistory; + bool get canRequestFuture; Timeline({ - required this.room, this.onUpdate, this.onChange, this.onInsert, this.onRemove, this.onNewEvent, - required this.chunk, - }) { - timelineSub = room.client.onTimelineEvent.stream.listen( - (event) => _handleEventUpdate( - event, - EventUpdateType.timeline, - ), - ); - historySub = room.client.onHistoryEvent.stream.listen( - (event) => _handleEventUpdate( - event, - EventUpdateType.history, - ), - ); + }); - // If the timeline is limited we want to clear our events cache - roomSub = room.client.onSync.stream - .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true) - .listen(_removeEventsNotInThisSync); + /// Searches for the event in this timeline. If not found, requests from server. + Future getEventById(String id); - sessionIdReceivedSub = - room.onSessionKeyReceived.stream.listen(_sessionKeyReceived); - cancelSendEventSub = - room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent); + /// Request more previous events + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }); - // we want to populate our aggregated events - for (final e in events) { - addAggregatedEvent(e); - } + /// Request more future events + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }); - // we are using a fragmented timeline - if (chunk.nextBatch != '') { - allowNewEvent = false; - isFragmentedTimeline = true; - // fragmented timelines never read from the database. - _fetchedAllDatabaseEvents = true; - } - } + /// Set the read marker to an event in this timeline + Future setReadMarker({String? eventId, bool? public}); - void _cleanUpCancelledEvent(String eventId) { - final i = _findEvent(event_id: eventId); - if (i < events.length) { - removeAggregatedEvent(events[i]); - events.removeAt(i); - onRemove?.call(i); - onUpdate?.call(); - } - } - - /// Removes all entries from [events] which are not in this SyncUpdate. - void _removeEventsNotInThisSync(SyncUpdate sync) { - final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? []; - final keepEventIds = newSyncEvents.map((e) => e.eventId); - events.removeWhere((e) => !keepEventIds.contains(e.eventId)); - } - - /// Don't forget to call this before you dismiss this object! - void cancelSubscriptions() { - // ignore: discarded_futures - timelineSub?.cancel(); - // ignore: discarded_futures - historySub?.cancel(); - // ignore: discarded_futures - roomSub?.cancel(); - // ignore: discarded_futures - sessionIdReceivedSub?.cancel(); - // ignore: discarded_futures - cancelSendEventSub?.cancel(); - } - - void _sessionKeyReceived(String sessionId) async { - var decryptAtLeastOneEvent = false; - Future decryptFn() async { - final encryption = room.client.encryption; - if (!room.client.encryptionEnabled || encryption == null) { - return; - } - for (var i = 0; i < events.length; i++) { - if (events[i].type == EventTypes.Encrypted && - events[i].messageType == MessageTypes.BadEncrypted && - events[i].content['session_id'] == sessionId) { - events[i] = await encryption.decryptRoomEvent( - events[i], - store: true, - updateType: EventUpdateType.history, - ); - addAggregatedEvent(events[i]); - onChange?.call(i); - if (events[i].type != EventTypes.Encrypted) { - decryptAtLeastOneEvent = true; - } - } - } - } - - await room.client.database.transaction(decryptFn); - if (decryptAtLeastOneEvent) onUpdate?.call(); - } - - /// Request the keys for undecryptable events of this timeline + /// Request keys for undecryptable events void requestKeys({ bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true, - }) { - for (final event in events) { - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - final sessionId = event.content.tryGet('session_id'); - final senderKey = event.content.tryGet('sender_key'); - if (sessionId != null && senderKey != null) { - room.client.encryption?.keyManager.maybeAutoRequest( - room.id, - sessionId, - senderKey, - tryOnlineBackup: tryOnlineBackup, - onlineKeyBackupOnly: onlineKeyBackupOnly, - ); - } - } - } - } + }); - /// Set the read marker to the last synced event in this timeline. - Future setReadMarker({String? eventId, bool? public}) async { - eventId ??= - events.firstWhereOrNull((event) => event.status.isSynced)?.eventId; - if (eventId == null) return; - return room.setReadMarker(eventId, mRead: eventId, public: public); - } - - int _findEvent({String? event_id, String? unsigned_txid}) { - // we want to find any existing event where either the passed event_id or the passed unsigned_txid - // matches either the event_id or transaction_id of the existing event. - // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match. - // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair, - // thus meaning we found our element. - final searchNeedle = {}; - if (event_id != null) { - searchNeedle.add(event_id); - } - if (unsigned_txid != null) { - searchNeedle.add(unsigned_txid); - } - int i; - for (i = 0; i < events.length; i++) { - final searchHaystack = {events[i].eventId}; - - final txnid = events[i].transactionId; - if (txnid != null) { - searchHaystack.add(txnid); - } - if (searchNeedle.intersection(searchHaystack).isNotEmpty) { - break; - } - } - return i; - } - - void _removeEventFromSet(Set eventSet, Event event) { - eventSet.removeWhere( - (e) => - e.matchesEventOrTransactionId(event.eventId) || - event.unsigned != null && - e.matchesEventOrTransactionId(event.transactionId), - ); - } + /// Search events in this timeline + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + @Deprecated('Use [prevBatch] instead.') String? sinceEventId, + int? limit, + bool Function(Event)? searchFunc, + }); + /// Add an event to the aggregation tree void addAggregatedEvent(Event event) { - // we want to add an event to the aggregation tree final relationshipType = event.relationshipType; final relationshipEventId = event.relationshipEventId; if (relationshipType == null || relationshipEventId == null) { - return; // nothing to do + return; } final e = (aggregatedEvents[relationshipEventId] ??= >{})[relationshipType] ??= {}; - // remove a potential old event _removeEventFromSet(e, event); - // add the new one e.add(event); if (onChange != null) { final index = _findEvent(event_id: relationshipEventId); @@ -513,6 +110,7 @@ class Timeline { } } + /// Remove an event from aggregation void removeAggregatedEvent(Event event) { aggregatedEvents.remove(event.eventId); if (event.transactionId != null) { @@ -525,91 +123,38 @@ class Timeline { } } - void _handleEventUpdate( - Event event, - EventUpdateType type, { - bool update = true, - }) { - try { - if (event.roomId != room.id) return; - - if (type != EventUpdateType.timeline && type != EventUpdateType.history) { - return; - } - - if (type == EventUpdateType.timeline) { - onNewEvent?.call(); - } - - if (!allowNewEvent) return; - - final status = event.status; - - final i = _findEvent( - event_id: event.eventId, - unsigned_txid: event.transactionId, - ); - - if (i < events.length) { - // if the old status is larger than the new one, we also want to preserve the old status - final oldStatus = events[i].status; - events[i] = event; - // do we preserve the status? we should allow 0 -> -1 updates and status increases - if ((latestEventStatus(status, oldStatus) == oldStatus) && - !(status.isError && oldStatus.isSending)) { - events[i].status = oldStatus; - } - addAggregatedEvent(events[i]); - onChange?.call(i); - } else { - if (type == EventUpdateType.history && - events.indexWhere( - (e) => e.eventId == event.eventId, - ) != - -1) { - return; - } - var index = events.length; - if (type == EventUpdateType.history) { - events.add(event); - } else { - index = events.firstIndexWhereNotError; - events.insert(index, event); - } - onInsert?.call(index); - - addAggregatedEvent(event); - } - - // Handle redaction events - if (event.type == EventTypes.Redaction) { - final index = _findEvent(event_id: event.redacts); - if (index < events.length) { - removeAggregatedEvent(events[index]); - - // Is the redacted event a reaction? Then update the event this - // belongs to: - if (onChange != null) { - final relationshipEventId = events[index].relationshipEventId; - if (relationshipEventId != null) { - onChange?.call(_findEvent(event_id: relationshipEventId)); - return; - } - } - - events[index].setRedactionEvent(event); - onChange?.call(index); - } - } - - if (update && !_collectHistoryUpdates) { - onUpdate?.call(); - } - } catch (e, s) { - Logs().w('Handle event update failed', e, s); + /// Find event index by event ID or transaction ID + int _findEvent({String? event_id, String? unsigned_txid}) { + final searchNeedle = {}; + if (event_id != null) searchNeedle.add(event_id); + if (unsigned_txid != null) searchNeedle.add(unsigned_txid); + + int i; + for (i = 0; i < events.length; i++) { + final searchHaystack = {events[i].eventId}; + final txnid = events[i].transactionId; + if (txnid != null) searchHaystack.add(txnid); + if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; } + return i; } + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); + } + + /// Handle event updates (to be implemented by subclasses) + void _handleEventUpdate(Event event, EventUpdateType type, {bool update = true}); + + /// Cancel all subscriptions + void cancelSubscriptions(); + @Deprecated('Use [startSearch] instead.') Stream> searchEvent({ String? searchTerm, @@ -623,114 +168,8 @@ class Timeline { searchTerm: searchTerm, requestHistoryCount: requestHistoryCount, maxHistoryRequests: maxHistoryRequests, - // ignore: deprecated_member_use_from_same_package sinceEventId: sinceEventId, limit: limit, searchFunc: searchFunc, ).map((result) => result.$1); - - /// Searches [searchTerm] in this timeline. It first searches in the - /// cache, then in the database and then on the server. The search can - /// take a while, which is why this returns a stream so the already found - /// events can already be displayed. - /// Override the [searchFunc] if you need another search. This will then - /// ignore [searchTerm]. - /// Returns the List of Events and the next prevBatch at the end of the - /// search. - Stream<(List, String?)> startSearch({ - String? searchTerm, - int requestHistoryCount = 100, - int maxHistoryRequests = 10, - String? prevBatch, - @Deprecated('Use [prevBatch] instead.') String? sinceEventId, - int? limit, - bool Function(Event)? searchFunc, - }) async* { - assert(searchTerm != null || searchFunc != null); - searchFunc ??= (event) => - event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? ''); - final found = []; - - if (sinceEventId == null) { - // Search locally - for (final event in events) { - if (searchFunc(event)) { - yield (found..add(event), null); - } - } - - // Search in database - var start = events.length; - while (true) { - final eventsFromStore = await room.client.database.getEventList( - room, - start: start, - limit: requestHistoryCount, - ); - if (eventsFromStore.isEmpty) break; - start += eventsFromStore.length; - for (final event in eventsFromStore) { - if (searchFunc(event)) { - yield (found..add(event), null); - } - } - } - } - - // Search on the server - prevBatch ??= room.prev_batch; - if (sinceEventId != null) { - prevBatch = - (await room.client.getEventContext(room.id, sinceEventId)).end; - } - final encryption = room.client.encryption; - for (var i = 0; i < maxHistoryRequests; i++) { - if (prevBatch == null) break; - if (limit != null && found.length >= limit) break; - try { - final resp = await room.client.getRoomEvents( - room.id, - Direction.b, - from: prevBatch, - limit: requestHistoryCount, - filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), - ); - for (final matrixEvent in resp.chunk) { - var event = Event.fromMatrixEvent(matrixEvent, room); - if (event.type == EventTypes.Encrypted && encryption != null) { - event = await encryption.decryptRoomEvent(event); - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - // Await requestKey() here to ensure decrypted message bodies - await event.requestKey(); - } - } - if (searchFunc(event)) { - yield (found..add(event), resp.end); - if (limit != null && found.length >= limit) break; - } - } - prevBatch = resp.end; - // We are at the beginning of the room - if (resp.chunk.length < requestHistoryCount) break; - } on MatrixException catch (e) { - // We have no permission anymore to request the history - if (e.error == MatrixError.M_FORBIDDEN) { - break; - } - rethrow; - } - } - return; - } -} - -extension on List { - int get firstIndexWhereNotError { - if (isEmpty) return 0; - final index = indexWhere((event) => !event.status.isError); - if (index == -1) return length; - return index; - } -} +} \ No newline at end of file