diff --git a/find.js b/find.js new file mode 100644 index 00000000..695558b4 --- /dev/null +++ b/find.js @@ -0,0 +1,23 @@ +import fs from 'fs'; + +const files = fs.readdirSync('lib/', { + recursive: true +}); + +const q = process.argv[2]; + +var total = 0; + +for (const f of files) { + try { + const b = fs.readFileSync(`lib/${f}`, 'utf-8'); + if (b.includes(q) || f.includes(q)) { + total ++; + console.log(f); + } + } catch (error) { + + } +} + +console.log(`${total} files in total`); \ No newline at end of file diff --git a/lib/matrix.dart b/lib/matrix.dart index 8ad93add..461d7fb3 100644 --- a/lib/matrix.dart +++ b/lib/matrix.dart @@ -49,7 +49,10 @@ export 'src/voip/utils/famedly_call_extension.dart'; export 'src/voip/utils/types.dart'; export 'src/voip/utils/wrapped_media_stream.dart'; export 'src/room.dart'; +export 'src/thread.dart'; export 'src/timeline.dart'; +export 'src/room_timeline.dart'; +export 'src/thread_timeline.dart'; export 'src/user.dart'; export 'src/utils/cached_profile_information.dart'; export 'src/utils/commands_extension.dart'; diff --git a/lib/msc_extensions/extension_timeline_export/timeline_export.dart b/lib/msc_extensions/extension_timeline_export/timeline_export.dart index 47d7751f..6f829258 100644 --- a/lib/msc_extensions/extension_timeline_export/timeline_export.dart +++ b/lib/msc_extensions/extension_timeline_export/timeline_export.dart @@ -2,9 +2,9 @@ import 'dart:convert'; import 'package:matrix/matrix_api_lite.dart'; import 'package:matrix/src/event.dart'; -import 'package:matrix/src/timeline.dart'; +import 'package:matrix/src/room_timeline.dart'; -extension TimelineExportExtension on Timeline { +extension TimelineExportExtension on RoomTimeline { /// Exports timeline events from a Matrix room within a specified date range. /// /// The export process provides progress updates through the returned stream with the following information: diff --git a/lib/src/client.dart b/lib/src/client.dart index de3213eb..b4f6c7bc 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -25,6 +25,7 @@ import 'dart:typed_data'; import 'package:async/async.dart'; import 'package:collection/collection.dart' show IterableExtension; import 'package:http/http.dart' as http; +import 'package:matrix/src/room_timeline.dart'; import 'package:mime/mime.dart'; import 'package:random_string/random_string.dart'; import 'package:vodozemac/vodozemac.dart' as vod; @@ -1213,7 +1214,7 @@ class Client extends MatrixApi { // Set membership of room to leave, in the case we got a left room passed, otherwise // the left room would have still membership join, which would be wrong for the setState later archivedRoom.membership = Membership.leave; - final timeline = Timeline( + final timeline = RoomTimeline( room: archivedRoom, chunk: TimelineChunk( events: roomUpdate.timeline?.events?.reversed @@ -2775,13 +2776,13 @@ class Client extends MatrixApi { final List receipts = []; for (final event in events) { + room.setEphemeral(event); // Receipt events are deltas between two states. We will create a // fake room account data event for this and store the difference // there. if (event.type != 'm.receipt') continue; - receipts.add(ReceiptEventContent.fromJson(event.content)); } @@ -2796,6 +2797,7 @@ class Client extends MatrixApi { type: LatestReceiptState.eventType, content: receiptStateContent.toJson(), ); + await database.storeRoomAccountData(room.id, event); room.roomAccountData[event.type] = event; } diff --git a/lib/src/database/database_api.dart b/lib/src/database/database_api.dart index 644ecfff..5a56d22f 100644 --- a/lib/src/database/database_api.dart +++ b/lib/src/database/database_api.dart @@ -59,6 +59,21 @@ abstract class DatabaseApi { Future> getRoomList(Client client); + Future> getThreadList(String roomId, Client client); + + Future getThread(String roomId, String threadRootEventId, Client client); + + Future storeThread( + String roomId, + Event threadRootEvent, + Event? lastEvent, + bool currentUserParticipated, + int? notificationCount, + int? highlightCount, + int count, + Client client, + ); + Future getSingleRoom( Client client, String roomId, { @@ -78,6 +93,8 @@ abstract class DatabaseApi { Future deleteTimelineForRoom(String roomId); + Future deleteTimelineForThread(String roomId, String threadRootEventId); + /// Stores an EventUpdate object in the database. Must be called inside of /// [transaction]. Future storeEventUpdate( @@ -115,6 +132,13 @@ abstract class DatabaseApi { int? limit, }); + Future> getThreadEventList( + Thread thread, { + int start = 0, + bool onlySending = false, + int? limit, + }); + Future> getEventIdList( Room room, { int start = 0, @@ -265,6 +289,13 @@ abstract class DatabaseApi { Future removeEvent(String eventId, String roomId); + Future setThreadPrevBatch( + String? prevBatch, + String roomId, + String threadRootEventId, + Client client, + ); + Future setRoomPrevBatch( String? prevBatch, String roomId, diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart index fc2ddeaf..3f0263c6 100644 --- a/lib/src/database/matrix_sdk_database.dart +++ b/lib/src/database/matrix_sdk_database.dart @@ -59,6 +59,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { late Box _clientBox; late Box _accountDataBox; late Box _roomsBox; + late Box _threadsBox; late Box _toDeviceQueueBox; /// Key is a tuple as TupleKey(roomId, type, stateKey) where stateKey can be @@ -122,6 +123,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { static const String _roomsBoxName = 'box_rooms'; + static const String _threadsBoxName = 'box_threads'; + static const String _toDeviceQueueBoxName = 'box_to_device_queue'; static const String _preloadRoomStateBoxName = 'box_preload_room_states'; @@ -218,6 +221,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _clientBoxName, _accountDataBoxName, _roomsBoxName, + _threadsBoxName, _toDeviceQueueBoxName, _preloadRoomStateBoxName, _nonPreloadRoomStateBoxName, @@ -252,6 +256,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _roomsBox = _collection.openBox( _roomsBoxName, ); + _threadsBox = _collection.openBox(_threadsBoxName); _preloadRoomStateBox = _collection.openBox( _preloadRoomStateBoxName, ); @@ -357,6 +362,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _clientBox.clearQuickAccessCache(); _accountDataBox.clearQuickAccessCache(); _roomsBox.clearQuickAccessCache(); + _threadsBox.clearQuickAccessCache(); _preloadRoomStateBox.clearQuickAccessCache(); _nonPreloadRoomStateBox.clearQuickAccessCache(); _roomMembersBox.clearQuickAccessCache(); @@ -383,6 +389,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { @override Future clearCache() => transaction(() async { await _roomsBox.clear(); + await _threadsBox.clear(); await _accountDataBox.clear(); await _roomAccountDataBox.clear(); await _preloadRoomStateBox.clear(); @@ -532,6 +539,46 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return await _getEventsByIds(eventIds.cast(), room); }); + @override + Future> getThreadEventList( + Thread thread, { + int start = 0, + bool onlySending = false, + int? limit, + }) => + runBenchmarked>('Get event list', () async { + // Get the synced event IDs from the store + final timelineKey = + TupleKey(thread.room.id, '', thread.rootEvent.eventId).toString(); + final timelineEventIds = + (await _timelineFragmentsBox.get(timelineKey) ?? []); + + // Get the local stored SENDING events from the store + late final List sendingEventIds; + if (start != 0) { + sendingEventIds = []; + } else { + final sendingTimelineKey = + TupleKey(thread.room.id, 'SENDING', thread.rootEvent.eventId) + .toString(); + sendingEventIds = + (await _timelineFragmentsBox.get(sendingTimelineKey) ?? []); + } + + // Combine those two lists while respecting the start and limit parameters. + final end = min( + timelineEventIds.length, + start + (limit ?? timelineEventIds.length), + ); + final eventIds = [ + ...sendingEventIds, + if (!onlySending && start < timelineEventIds.length) + ...timelineEventIds.getRange(start, end), + ]; + + return await _getEventsByIds(eventIds.cast(), thread.room); + }); + @override Future getInboundGroupSession( String roomId, @@ -1053,6 +1100,22 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return; } + @override + Future setThreadPrevBatch( + String? prevBatch, + String roomId, + String threadRootEventId, + Client client, + ) async { + final raw = + await _threadsBox.get(TupleKey(roomId, threadRootEventId).toString()); + if (raw == null) return; + final thread = Thread.fromJson(copyMap(raw), client); + thread.prev_batch = prevBatch; + await _threadsBox.put(roomId, thread.toJson()); + return; + } + @override Future setVerifiedUserCrossSigningKey( bool verified, @@ -1302,6 +1365,64 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return; } + @override + Future> getThreadList(String roomId, Client client) async { + final allThreadsKeys = await _threadsBox.getAllKeys(); + final threads = {}; + + // TERRIBLE implementation. Better to create another box (String[roomId]->List[event ids]) + for (final key in allThreadsKeys) { + if (key.startsWith('$roomId|')) { + final thread = await getThread(roomId, key.split('|')[1], client); + if (thread != null) { + threads.add(thread); + } + } + } + + return threads.toList(); + } + + @override + Future getThread( + String roomId, + String threadRootEventId, + Client client, + ) async { + final key = TupleKey(roomId, threadRootEventId).toString(); + final thread = await _threadsBox.get(key); + if (thread == null) return null; + return Thread.fromJson(thread.cast(), client); + } + + @override + Future storeThread( + String roomId, + Event threadRootEvent, + Event? lastEvent, + bool currentUserParticipated, + int? notificationCount, + int? highlightCount, + int count, + Client client, + ) async { + final key = TupleKey(roomId, threadRootEvent.eventId).toString(); + // final currentRawThread = await _threadsBox.get(key); + await _threadsBox.put( + key, + Thread( + room: Room(id: roomId, client: client), + rootEvent: threadRootEvent, + lastEvent: lastEvent, + client: client, + currentUserParticipated: currentUserParticipated, + count: count, + notificationCount: notificationCount ?? 0, + highlightCount: highlightCount ?? 0, + ).toJson(), + ); + } + @override Future storeRoomUpdate( String roomId, @@ -1314,6 +1435,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { await forgetRoom(roomId); return; } + final membership = roomUpdate is LeftRoomUpdate ? Membership.leave : roomUpdate is InvitedRoomUpdate @@ -1376,6 +1498,12 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { Future deleteTimelineForRoom(String roomId) => _timelineFragmentsBox.delete(TupleKey(roomId, '').toString()); + @override + Future deleteTimelineForThread( + String roomId, String threadRootEventId) => + _timelineFragmentsBox + .delete(TupleKey(roomId, '', threadRootEventId).toString()); + @override Future storeSSSSCache( String type, diff --git a/lib/src/room.dart b/lib/src/room.dart index 76d72a40..1ae4c40f 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -129,9 +129,71 @@ class Room { for (final state in allStates) { setState(state); } + + await loadThreadsFromServer(); + partial = false; } + Map threads = {}; + String? getThreadRootsBatch; + bool loadedAllThreads = false; + + Future loadThreadsFromServer() async { + try { + if (loadedAllThreads) return; + final response = + await client.getThreadRoots(id, from: getThreadRootsBatch); + + for (final threadEvent in response.chunk) { + final event = Event.fromMatrixEvent(threadEvent, this); + final thread = Thread.fromJson(threadEvent.toJson(), client); + // Store thread in database + await client.database.storeThread( + id, + event, + thread.lastEvent, // lastEvent + thread.currentUserParticipated ?? false, // currentUserParticipated + 0, 0, + thread.count ?? 1, // count + client, + ); + threads[event.eventId] = thread; + } + + if (response.nextBatch == null) { + loadedAllThreads = true; + } else { + getThreadRootsBatch = response.nextBatch; + } + } catch (e) { + Logs().w('Failed to load threads from server', e); + } + } + + Future handleThreadSync(Event event) async { + // This should be called from the client's sync handling + // when a thread-related event is received + + // if (event.relationshipType == RelationshipTypes.thread && + // event.relationshipEventId != null) { + // Update thread metadata in database + final root = await getEventById(event.relationshipEventId!); + if (root == null) return; + final thread = await client.database.getThread(id, event.relationshipEventId!, client); + await client.database.storeThread( + id, + root, + event, // update last event + event.senderId == client.userID || (thread?.currentUserParticipated ?? false), // currentUserParticipated + (thread?.count ?? 0) + 1, // increment count - should be calculated properly + 0, 0, + client, + ); + threads[event.relationshipEventId!] = (await client.database.getThread(id, event.relationshipEventId!, client))!; + //} + } + /// Returns the [Event] for the given [typeKey] and optional [stateKey]. /// If no [stateKey] is provided, it defaults to an empty string. /// This returns either a `StrippedStateEvent` for rooms with membership @@ -173,6 +235,31 @@ class Room { client.onRoomState.add((roomId: id, state: state)); } + Future> getThreads() async { + final dict = {}; + final list = await client.database.getThreadList(id, client); + for (final thread in list) { + dict[thread.rootEvent.eventId] = thread; + } + return dict; + } + + Future getThread(Event rootEvent) async { + final threads = await getThreads(); + if (threads.containsKey(rootEvent.eventId)) { + return threads[rootEvent.eventId]!; + } + return Thread( + room: this, + rootEvent: rootEvent, + client: client, + currentUserParticipated: false, + count: 0, + highlightCount: 0, + notificationCount: 0, + ); + } + /// ID of the fully read marker event. String get fullyRead => roomAccountData['m.fully_read']?.content.tryGet('event_id') ?? ''; @@ -1442,6 +1529,8 @@ class Room { direction = Direction.b, StateFilter? filter, }) async { + unawaited(loadThreadsFromServer()); + final prev_batch = this.prev_batch; final storeInDatabase = !isArchived; @@ -1648,7 +1737,7 @@ class Room { /// [onChange], [onRemove], [onInsert] and the [onHistoryReceived] callbacks. /// This method can also retrieve the timeline at a specific point by setting /// the [eventContextId] - Future getTimeline({ + Future getTimeline({ void Function(int index)? onChange, void Function(int index)? onRemove, void Function(int insertID)? onInsert, @@ -1690,7 +1779,7 @@ class Room { } } - final timeline = Timeline( + final timeline = RoomTimeline( room: this, chunk: chunk, onChange: onChange, diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart new file mode 100644 index 00000000..b0306a87 --- /dev/null +++ b/lib/src/room_timeline.dart @@ -0,0 +1,660 @@ +/* + * Famedly Matrix SDK + * Copyright (C) 2019, 2020, 2021 Famedly GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import 'dart:async'; +import 'dart:convert'; + +import 'package:collection/collection.dart'; +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; + +/// Represents the main timeline of a room. +class RoomTimeline extends Timeline { + final Room room; + @override + List get events => chunk.events; + + TimelineChunk chunk; + + StreamSubscription? timelineSub; + StreamSubscription? historySub; + StreamSubscription? roomSub; + StreamSubscription? sessionIdReceivedSub; + StreamSubscription? cancelSendEventSub; + + @override + bool isRequestingHistory = false; + @override + bool isRequestingFuture = false; + @override + bool allowNewEvent = true; + @override + bool isFragmentedTimeline = false; + + final Map _eventCache = {}; + + // When fetching history, we will collect them into the `_historyUpdates` set + // first, and then only process all events at once, once we have the full history. + // This ensures that the entire history fetching only triggers `onUpdate` only *once*, + // even if /sync's complete while history is being proccessed. + bool _collectHistoryUpdates = false; + + // We confirmed, that there are no more events to load from the database. + bool _fetchedAllDatabaseEvents = false; + + @override + bool get canRequestHistory { + if (!{Membership.join, Membership.leave}.contains(room.membership)) { + return false; + } + if (events.isEmpty) return true; + return !_fetchedAllDatabaseEvents || + (room.prev_batch != null && events.last.type != EventTypes.RoomCreate); + } + + @override + bool get canRequestFuture => !allowNewEvent; + + RoomTimeline({ + required this.room, + required this.chunk, + super.onUpdate, + super.onChange, + super.onInsert, + super.onRemove, + super.onNewEvent, + }) { + timelineSub = room.client.onTimelineEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.timeline), + ); + historySub = room.client.onHistoryEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.history), + ); + + // If the timeline is limited we want to clear our events cache + roomSub = room.client.onSync.stream + .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true) + .listen(_removeEventsNotInThisSync); + + sessionIdReceivedSub = + room.onSessionKeyReceived.stream.listen(_sessionKeyReceived); + cancelSendEventSub = + room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent); + + // we want to populate our aggregated events + for (final e in events) { + addAggregatedEvent(e); + } + + // we are using a fragmented timeline + if (chunk.nextBatch != '') { + allowNewEvent = false; + isFragmentedTimeline = true; + // fragmented timelines never read from the database. + _fetchedAllDatabaseEvents = true; + } + } + + @override + Future getEventById(String id) async { + for (final event in events) { + if (event.eventId == id) return event; + } + if (_eventCache.containsKey(id)) return _eventCache[id]; + final requestedEvent = await room.getEventById(id); + if (requestedEvent == null) return null; + _eventCache[id] = requestedEvent; + return _eventCache[id]; + } + + @override + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (isRequestingHistory) return; + isRequestingHistory = true; + await _requestEvents( + direction: Direction.b, + historyCount: historyCount, + filter: filter, + ); + isRequestingHistory = false; + } + + @override + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (allowNewEvent) return; + if (isRequestingFuture) return; + isRequestingFuture = true; + await _requestEvents( + direction: Direction.f, + historyCount: historyCount, + filter: filter, + ); + isRequestingFuture = false; + } + + Future _requestEvents({ + int historyCount = Room.defaultHistoryCount, + required Direction direction, + StateFilter? filter, + }) async { + onUpdate?.call(); + + try { + // Look up for events in the database first. With fragmented view, we should delete the database cache + final eventsFromStore = isFragmentedTimeline + ? null + : await room.client.database.getEventList( + room, + start: events.length, + limit: historyCount, + ); + + if (eventsFromStore != null && eventsFromStore.isNotEmpty) { + for (final e in eventsFromStore) { + addAggregatedEvent(e); + } + // Fetch all users from database we have got here. + for (final event in events) { + if (room.getState(EventTypes.RoomMember, event.senderId) != null) { + continue; + } + final dbUser = + await room.client.database.getUser(event.senderId, room); + if (dbUser != null) room.setState(dbUser); + } + + if (direction == Direction.b) { + events.addAll(eventsFromStore); + final startIndex = events.length - eventsFromStore.length; + final endIndex = events.length; + for (var i = startIndex; i < endIndex; i++) { + onInsert?.call(i); + } + } else { + events.insertAll(0, eventsFromStore); + final startIndex = eventsFromStore.length; + final endIndex = 0; + for (var i = startIndex; i > endIndex; i--) { + onInsert?.call(i); + } + } + } else { + _fetchedAllDatabaseEvents = true; + Logs().i('No more events found in the store. Request from server...'); + + if (isFragmentedTimeline) { + await getRoomEvents( + historyCount: historyCount, + direction: direction, + filter: filter, + ); + } else { + if (room.prev_batch == null) { + Logs().i('No more events to request from server...'); + } else { + await room.requestHistory( + historyCount: historyCount, + direction: direction, + onHistoryReceived: () { + _collectHistoryUpdates = true; + }, + filter: filter, + ); + } + } + } + } finally { + _collectHistoryUpdates = false; + isRequestingHistory = false; + onUpdate?.call(); + } + } + + /// Request more previous events from the server. + Future getRoomEvents({ + int historyCount = Room.defaultHistoryCount, + direction = Direction.b, + StateFilter? filter, + }) async { + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + final resp = await room.client.getRoomEvents( + room.id, + direction, + from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, + limit: historyCount, + filter: jsonEncode(filter.toJson()), + ); + + if (resp.end == null) { + Logs().w('We reached the end of the timeline'); + } + + final newNextBatch = direction == Direction.b ? resp.start : resp.end; + final newPrevBatch = direction == Direction.b ? resp.end : resp.start; + + final type = direction == Direction.b + ? EventUpdateType.history + : EventUpdateType.timeline; + + if ((resp.state?.length ?? 0) == 0 && + resp.start != resp.end && + newPrevBatch != null && + newNextBatch != null) { + if (type == EventUpdateType.history) { + Logs().w( + '[nav] we can still request history prevBatch: $type $newPrevBatch', + ); + } else { + Logs().w( + '[nav] we can still request timeline nextBatch: $type $newNextBatch', + ); + } + } + + final newEvents = + resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList(); + + if (!allowNewEvent) { + if (resp.start == resp.end || + (resp.end == null && direction == Direction.f)) { + allowNewEvent = true; + } + + if (allowNewEvent) { + Logs().d('We now allow sync update into the timeline.'); + newEvents.addAll( + await room.client.database.getEventList(room, onlySending: true), + ); + } + } + + // Try to decrypt encrypted events but don't update the database. + if (room.encrypted && room.client.encryptionEnabled) { + for (var i = 0; i < newEvents.length; i++) { + if (newEvents[i].type == EventTypes.Encrypted) { + newEvents[i] = await room.client.encryption!.decryptRoomEvent( + newEvents[i], + ); + } + } + } + + // update chunk anchors + if (type == EventUpdateType.history) { + chunk.prevBatch = newPrevBatch ?? ''; + + final offset = chunk.events.length; + + chunk.events.addAll(newEvents); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i + offset); + } + } else { + chunk.nextBatch = newNextBatch ?? ''; + chunk.events.insertAll(0, newEvents.reversed); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i); + } + } + + if (onUpdate != null) { + onUpdate!(); + } + return resp.chunk.length; + } + + void _cleanUpCancelledEvent(String eventId) { + final i = _findEvent(event_id: eventId); + if (i < events.length) { + removeAggregatedEvent(events[i]); + events.removeAt(i); + onRemove?.call(i); + onUpdate?.call(); + } + } + + /// Removes all entries from [events] which are not in this SyncUpdate. + void _removeEventsNotInThisSync(SyncUpdate sync) { + final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? []; + final keepEventIds = newSyncEvents.map((e) => e.eventId); + events.removeWhere((e) => !keepEventIds.contains(e.eventId)); + } + + @override + void cancelSubscriptions() { + timelineSub?.cancel(); + historySub?.cancel(); + roomSub?.cancel(); + sessionIdReceivedSub?.cancel(); + cancelSendEventSub?.cancel(); + } + + void _sessionKeyReceived(String sessionId) async { + var decryptAtLeastOneEvent = false; + Future decryptFn() async { + final encryption = room.client.encryption; + if (!room.client.encryptionEnabled || encryption == null) { + return; + } + for (var i = 0; i < events.length; i++) { + if (events[i].type == EventTypes.Encrypted && + events[i].messageType == MessageTypes.BadEncrypted && + events[i].content['session_id'] == sessionId) { + events[i] = await encryption.decryptRoomEvent( + events[i], + store: true, + updateType: EventUpdateType.history, + ); + addAggregatedEvent(events[i]); + onChange?.call(i); + if (events[i].type != EventTypes.Encrypted) { + decryptAtLeastOneEvent = true; + } + } + } + } + + await room.client.database.transaction(decryptFn); + if (decryptAtLeastOneEvent) onUpdate?.call(); + } + + @override + void requestKeys({ + bool tryOnlineBackup = true, + bool onlineKeyBackupOnly = true, + }) { + for (final event in events) { + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + final sessionId = event.content.tryGet('session_id'); + final senderKey = event.content.tryGet('sender_key'); + if (sessionId != null && senderKey != null) { + room.client.encryption?.keyManager.maybeAutoRequest( + room.id, + sessionId, + senderKey, + tryOnlineBackup: tryOnlineBackup, + onlineKeyBackupOnly: onlineKeyBackupOnly, + ); + } + } + } + } + + @override + Future setReadMarker({String? eventId, bool? public}) async { + eventId ??= + events.firstWhereOrNull((event) => event.status.isSynced)?.eventId; + if (eventId == null) return; + return room.setReadMarker(eventId, mRead: eventId, public: public); + } + + /// Find event index by event ID or transaction ID + int _findEvent({String? event_id, String? unsigned_txid}) { + final searchNeedle = {}; + if (event_id != null) searchNeedle.add(event_id); + if (unsigned_txid != null) searchNeedle.add(unsigned_txid); + + int i; + for (i = 0; i < events.length; i++) { + final searchHaystack = {events[i].eventId}; + final txnid = events[i].transactionId; + if (txnid != null) searchHaystack.add(txnid); + if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; + } + return i; + } + + void _handleEventUpdate( + Event event, + EventUpdateType type, { + bool update = true, + }) { + try { + if (event.roomId != room.id) return; + + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { + return; + } + + // Skip thread events in main timeline - THEY SHOULD ONLY APPEAR IN THREAD TIMELINES + if (event.relationshipType == RelationshipTypes.thread && + event.relationshipEventId != null) { + unawaited(room.handleThreadSync(event)); + } + + if (type == EventUpdateType.timeline) { + onNewEvent?.call(); + } + + if (!allowNewEvent) return; + + final status = event.status; + + final i = _findEvent( + event_id: event.eventId, + unsigned_txid: event.transactionId, + ); + + if (i < events.length) { + // if the old status is larger than the new one, we also want to preserve the old status + final oldStatus = events[i].status; + events[i] = event; + // do we preserve the status? we should allow 0 -> -1 updates and status increases + if ((latestEventStatus(status, oldStatus) == oldStatus) && + !(status.isError && oldStatus.isSending)) { + events[i].status = oldStatus; + } + addAggregatedEvent(events[i]); + onChange?.call(i); + } else { + if (type == EventUpdateType.history && + events.indexWhere((e) => e.eventId == event.eventId) != -1) { + return; + } + var index = events.length; + if (type == EventUpdateType.history) { + events.add(event); + } else { + index = events.firstIndexWhereNotError; + events.insert(index, event); + } + onInsert?.call(index); + + addAggregatedEvent(event); + } + + if (event.relationshipEventId != null && + (event.relationshipType == RelationshipTypes.edit || + event.relationshipType == RelationshipTypes.reaction || + event.relationshipType == RelationshipTypes.reference)) { + final parentEventIndex = + _findEvent(event_id: event.relationshipEventId); + unawaited(room.handleThreadSync(events[parentEventIndex])); + } + + // Handle redaction events + if (event.type == EventTypes.Redaction) { + final index = _findEvent(event_id: event.redacts); + if (index < events.length) { + removeAggregatedEvent(events[index]); + + // Is the redacted event a reaction? Then update the event this + // belongs to: + if (onChange != null) { + final relationshipEventId = events[index].relationshipEventId; + if (relationshipEventId != null) { + onChange?.call(_findEvent(event_id: relationshipEventId)); + return; + } + } + + events[index].setRedactionEvent(event); + onChange?.call(index); + } + } + + if (update && !_collectHistoryUpdates) { + onUpdate?.call(); + } + } catch (e, s) { + Logs().w('Handle event update failed', e, s); + } + } + + @override + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + @Deprecated('Use [prevBatch] instead.') String? sinceEventId, + int? limit, + bool Function(Event)? searchFunc, + }) async* { + assert(searchTerm != null || searchFunc != null); + searchFunc ??= (event) => + event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? ''); + final found = []; + + if (sinceEventId == null) { + // Search locally + for (final event in events) { + if (searchFunc(event)) { + yield (found..add(event), null); + } + } + + // Search in database + var start = events.length; + while (true) { + final eventsFromStore = await room.client.database.getEventList( + room, + start: start, + limit: requestHistoryCount, + ); + if (eventsFromStore.isEmpty) break; + start += eventsFromStore.length; + for (final event in eventsFromStore) { + if (searchFunc(event)) { + yield (found..add(event), null); + } + } + } + } + + // Search on the server + prevBatch ??= room.prev_batch; + if (sinceEventId != null) { + prevBatch = + (await room.client.getEventContext(room.id, sinceEventId)).end; + } + final encryption = room.client.encryption; + for (var i = 0; i < maxHistoryRequests; i++) { + if (prevBatch == null) break; + if (limit != null && found.length >= limit) break; + try { + final resp = await room.client.getRoomEvents( + room.id, + Direction.b, + from: prevBatch, + limit: requestHistoryCount, + filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), + ); + for (final matrixEvent in resp.chunk) { + var event = Event.fromMatrixEvent(matrixEvent, room); + if (event.type == EventTypes.Encrypted && encryption != null) { + event = await encryption.decryptRoomEvent(event); + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + // Await requestKey() here to ensure decrypted message bodies + await event.requestKey(); + } + } + if (searchFunc(event)) { + yield (found..add(event), resp.end); + if (limit != null && found.length >= limit) break; + } + } + prevBatch = resp.end; + // We are at the beginning of the room + if (resp.chunk.length < requestHistoryCount) break; + } on MatrixException catch (e) { + // We have no permission anymore to request the history + if (e.error == MatrixError.M_FORBIDDEN) { + break; + } + rethrow; + } + } + return; + } + + /// Add an event to the aggregation tree + void addAggregatedEvent(Event event) { + final relationshipType = event.relationshipType; + final relationshipEventId = event.relationshipEventId; + if (relationshipType == null || relationshipEventId == null) { + return; + } + final e = (aggregatedEvents[relationshipEventId] ??= + >{})[relationshipType] ??= {}; + _removeEventFromSet(e, event); + e.add(event); + if (onChange != null) { + final index = _findEvent(event_id: relationshipEventId); + onChange?.call(index); + } + } + + /// Remove an event from aggregation + void removeAggregatedEvent(Event event) { + aggregatedEvents.remove(event.eventId); + if (event.transactionId != null) { + aggregatedEvents.remove(event.transactionId); + } + for (final types in aggregatedEvents.values) { + for (final e in types.values) { + _removeEventFromSet(e, event); + } + } + } + + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); + } +} diff --git a/lib/src/thread.dart b/lib/src/thread.dart new file mode 100644 index 00000000..8552392d --- /dev/null +++ b/lib/src/thread.dart @@ -0,0 +1,431 @@ +import 'dart:async'; + +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; + +class Thread { + final Room room; + final Event rootEvent; + Event? lastEvent; + String? prev_batch; + bool? currentUserParticipated; + int? count; + final Client client; + + /// The count of unread notifications. + int notificationCount = 0; + + /// The count of highlighted notifications. + int highlightCount = 0; + + Thread({ + required this.room, + required this.rootEvent, + required this.client, + required this.currentUserParticipated, + required this.count, + required this.notificationCount, + required this.highlightCount, + this.prev_batch, + this.lastEvent, + }); + + /// Returns true if this room is unread. To check if there are new messages + /// in muted rooms, use [hasNewMessages]. + bool get isUnread => notificationCount > 0; + + Map toJson() => { + ...rootEvent.toJson(), + 'unsigned': { + 'm.thread': { + 'latest_event': lastEvent?.toJson(), + 'count': count, + 'current_user_participated': currentUserParticipated, + }, + }, + }; + + factory Thread.fromJson(Map json, Client client) { + final room = client.getRoomById(json['room_id']); + if (room == null) throw Error(); + Event? lastEvent; + if (json['unsigned']?['m.relations']?['m.thread']?['latest_event'] != + null) { + lastEvent = Event.fromMatrixEvent( + MatrixEvent.fromJson( + json['unsigned']?['m.relations']?['m.thread']?['latest_event'], + ), + room, + ); + } + if (json['unsigned']?['m.thread']?['latest_event'] != null) { + lastEvent = Event.fromMatrixEvent( + MatrixEvent.fromJson( + json['unsigned']?['m.thread']?['latest_event'], + ), + room, + ); + } + // Although I was making this part according to specification, it's a bit off + // I have no clue why + final thread = Thread( + room: room, + client: client, + rootEvent: Event.fromMatrixEvent( + MatrixEvent.fromJson(json), + room, + ), + lastEvent: lastEvent, + count: json['unsigned']?['m.relations']?['m.thread']?['count'], + currentUserParticipated: json['unsigned']?['m.relations']?['m.thread'] + ?['current_user_participated'], + highlightCount: 0, + notificationCount: 0, + ); + return thread; + } + + Future refreshLastEvent({ + timeout = const Duration(seconds: 30), + }) async { + final lastEvent = _refreshingLastEvent ??= _refreshLastEvent(); + _refreshingLastEvent = null; + return lastEvent; + } + + Future? _refreshingLastEvent; + + Future _refreshLastEvent({ + timeout = const Duration(seconds: 30), + }) async { + if (room.membership != Membership.join) return null; + + final result = await client + .getRelatingEventsWithRelType( + room.id, + rootEvent.eventId, + 'm.thread', + recurse: true, + ) + .timeout(timeout); + final matrixEvent = result.chunk.firstOrNull; + if (matrixEvent == null) { + if (lastEvent?.type == EventTypes.refreshingLastEvent) { + lastEvent = null; + } + Logs().d( + 'No last event found for thread ${rootEvent.eventId} in ${rootEvent.roomId}', + ); + return null; + } + var event = Event.fromMatrixEvent( + matrixEvent, + room, + status: EventStatus.synced, + ); + if (event.type == EventTypes.Encrypted) { + final encryption = client.encryption; + if (encryption != null) { + event = await encryption.decryptRoomEvent(event); + } + } + lastEvent = event; + + return event; + } + + /// When was the last event received. + DateTime get latestEventReceivedTime { + final lastEventTime = lastEvent?.originServerTs; + if (lastEventTime != null) return lastEventTime; + + if (room.membership == Membership.invite) return DateTime.now(); + + return rootEvent.originServerTs; + } + + bool get hasNewMessages { + final lastEvent = this.lastEvent; + + // There is no known event or the last event is only a state fallback event, + // we assume there is no new messages. + if (lastEvent == null || + !client.roomPreviewLastEvents.contains(lastEvent.type)) { + return false; + } + + // Read marker is on the last event so no new messages. + if (lastEvent.receipts + .any((receipt) => receipt.user.senderId == client.userID!)) { + return false; + } + + // If the last event is sent, we mark the room as read. + if (lastEvent.senderId == client.userID) return false; + + // Get the timestamp of read marker and compare + final readAtMilliseconds = room.receiptState.byThread[rootEvent.eventId]?.latestOwnReceipt?.ts ?? 0; + return readAtMilliseconds < lastEvent.originServerTs.millisecondsSinceEpoch; + } + + Future getEventContext(String eventId) async { + // TODO: probably find events with relationship + final resp = await client.getEventContext( + room.id, eventId, + limit: Room.defaultHistoryCount, + + // filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), + ); + + + + final events = [ + if (resp.eventsAfter != null) ...resp.eventsAfter!.reversed, + if (resp.event != null) resp.event!, + if (resp.eventsBefore != null) ...resp.eventsBefore!, + ].map((e) => Event.fromMatrixEvent(e, room)).where((e) => e.relationshipType == RelationshipTypes.thread && e.relationshipEventId == rootEvent.eventId).toList(); + + // Try again to decrypt encrypted events but don't update the database. + if (room.encrypted && client.encryptionEnabled) { + for (var i = 0; i < events.length; i++) { + if (events[i].type == EventTypes.Encrypted && + events[i].content['can_request_session'] == true) { + events[i] = await client.encryption!.decryptRoomEvent(events[i]); + } + } + } + + final chunk = TimelineChunk( + nextBatch: resp.end ?? '', + prevBatch: resp.start ?? '', + events: events, + ); + + return chunk; + } + + Future getTimeline({ + void Function(int index)? onChange, + void Function(int index)? onRemove, + void Function(int insertID)? onInsert, + void Function()? onNewEvent, + void Function()? onUpdate, + String? eventContextId, + int? limit = Room.defaultHistoryCount, + }) async { + // await postLoad(); + + var events = []; + + await client.database.transaction(() async { + events = await client.database.getThreadEventList( + this, + limit: limit, + ); + }); + + var chunk = TimelineChunk(events: events); + // Load the timeline arround eventContextId if set + if (eventContextId != null) { + if (!events.any((Event event) => event.eventId == eventContextId)) { + chunk = + await getEventContext(eventContextId) ?? TimelineChunk(events: []); + } + } + + final timeline = ThreadTimeline( + thread: this, + chunk: chunk, + onChange: onChange, + onRemove: onRemove, + onInsert: onInsert, + onNewEvent: onNewEvent, + onUpdate: onUpdate, + ); + + // Fetch all users from database we have got here. + if (eventContextId == null) { + final userIds = events.map((event) => event.senderId).toSet(); + for (final userId in userIds) { + if (room.getState(EventTypes.RoomMember, userId) != null) continue; + final dbUser = await client.database.getUser(userId, room); + if (dbUser != null) room.setState(dbUser); + } + } + + // Try again to decrypt encrypted events and update the database. + if (room.encrypted && client.encryptionEnabled) { + // decrypt messages + for (var i = 0; i < chunk.events.length; i++) { + if (chunk.events[i].type == EventTypes.Encrypted) { + if (eventContextId != null) { + // for the fragmented timeline, we don't cache the decrypted + //message in the database + chunk.events[i] = await client.encryption!.decryptRoomEvent( + chunk.events[i], + ); + } else { + // else, we need the database + await client.database.transaction(() async { + for (var i = 0; i < chunk.events.length; i++) { + if (chunk.events[i].content['can_request_session'] == true) { + chunk.events[i] = await client.encryption!.decryptRoomEvent( + chunk.events[i], + store: !room.isArchived, + updateType: EventUpdateType.history, + ); + } + } + }); + } + } + } + } + + return timeline; + } + + Future sendTextEvent( + String message, { + String? txid, + Event? inReplyTo, + String? editEventId, + bool parseMarkdown = true, + bool parseCommands = true, + String msgtype = MessageTypes.Text, + StringBuffer? commandStdout, + bool addMentions = true, + + /// Displays an event in the timeline with the transaction ID as the event + /// ID and a status of SENDING, SENT or ERROR until it gets replaced by + /// the sync event. Using this can display a different sort order of events + /// as the sync event does replace but not relocate the pending event. + bool displayPendingEvent = true, + }) { + return room.sendTextEvent( + message, + txid: txid, + inReplyTo: inReplyTo, + editEventId: editEventId, + parseCommands: parseCommands, + parseMarkdown: parseMarkdown, + msgtype: msgtype, + commandStdout: commandStdout, + addMentions: addMentions, + displayPendingEvent: displayPendingEvent, + threadLastEventId: lastEvent?.eventId, + threadRootEventId: rootEvent.eventId, + ); + } + + Future sendLocation(String body, String geoUri, {String? txid}) { + final event = { + 'msgtype': 'm.location', + 'body': body, + 'geo_uri': geoUri, + }; + return room.sendEvent( + event, + txid: txid, + threadLastEventId: lastEvent?.eventId, + threadRootEventId: rootEvent.eventId, + ); + } + + Future sendFileEvent( + MatrixFile file, { + String? txid, + Event? inReplyTo, + String? editEventId, + int? shrinkImageMaxDimension, + MatrixImageFile? thumbnail, + Map? extraContent, + + /// Displays an event in the timeline with the transaction ID as the event + /// ID and a status of SENDING, SENT or ERROR until it gets replaced by + /// the sync event. Using this can display a different sort order of events + /// as the sync event does replace but not relocate the pending event. + bool displayPendingEvent = true, + }) async { + return await room.sendFileEvent( + file, + txid: txid, + inReplyTo: inReplyTo, + editEventId: editEventId, + shrinkImageMaxDimension: shrinkImageMaxDimension, + thumbnail: thumbnail, + extraContent: extraContent, + displayPendingEvent: displayPendingEvent, + threadLastEventId: lastEvent?.eventId, + threadRootEventId: rootEvent.eventId, + ); + } + + Future setReadMarker({String? eventId, bool? public}) async { + if (eventId == null) return null; + return await client.postReceipt( + room.id, + (public ?? client.receiptsPublicByDefault) + ? ReceiptType.mRead + : ReceiptType.mReadPrivate, + eventId, + threadId: rootEvent.eventId, + ); + } + + Future setLastEvent(Event event) async { + lastEvent = event; + final thread = await client.database.getThread(room.id, rootEvent.eventId, client); + Logs().v('Set lastEvent to ${room.id}:${rootEvent.eventId} (${event.senderId})'); + await client.database.storeThread( + room.id, + rootEvent, + lastEvent, + currentUserParticipated ?? false, + notificationCount, + highlightCount, + (thread?.count ?? 0) + 1, + client, + ); + } + + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + void Function()? onHistoryReceived, + direction = Direction.b, + StateFilter? filter, + }) async { + final prev_batch = this.prev_batch; + + final storeInDatabase = !room.isArchived; + + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + if (prev_batch == null) { + throw 'Tried to request history without a prev_batch token'; + } + + final resp = await client.getRelatingEvents( + room.id, + rootEvent.eventId, + from: prev_batch, + limit: historyCount, + dir: direction, + recurse: true, + ); + + if (onHistoryReceived != null) onHistoryReceived(); + + await client.database.transaction(() async { + if (storeInDatabase && direction == Direction.b) { + this.prev_batch = resp.prevBatch; + await client.database.setThreadPrevBatch( + resp.prevBatch, room.id, rootEvent.eventId, client); + } + }); + + return resp.chunk.length; + } +} diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart new file mode 100644 index 00000000..7a73c1fc --- /dev/null +++ b/lib/src/thread_timeline.dart @@ -0,0 +1,500 @@ +import 'dart:async'; + +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; +import 'package:matrix/src/thread.dart'; + +// ThreadTimeline: hey RoomTimeline can i copy your homework? +// RoomTimeline: sure just don't make it too obvious +// ThreadTimeline: + +class ThreadTimeline extends Timeline { + final Thread thread; + + @override + List get events => chunk.events; + + TimelineChunk chunk; + + StreamSubscription? timelineSub; + StreamSubscription? historySub; + StreamSubscription? roomSub; + StreamSubscription? sessionIdReceivedSub; + StreamSubscription? cancelSendEventSub; + + @override + bool isRequestingHistory = false; + + @override + bool isFragmentedTimeline = false; + + final Map _eventCache = {}; + + @override + bool allowNewEvent = true; + + @override + bool isRequestingFuture = false; + + ThreadTimeline({ + required this.thread, + required this.chunk, + super.onUpdate, + super.onChange, + super.onInsert, + super.onRemove, + super.onNewEvent, + }) { + final room = thread.room; + timelineSub = room.client.onTimelineEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.timeline), + ); + historySub = room.client.onHistoryEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.history), + ); + + // we want to populate our aggregated events + for (final e in events) { + addAggregatedEvent(e); + } + + // we are using a fragmented timeline + if (chunk.nextBatch != '') { + allowNewEvent = false; + isFragmentedTimeline = true; + } + } + + void _handleEventUpdate( + Event event, + EventUpdateType type, { + bool update = true, + }) { + try { + if (event.roomId != thread.room.id) return; + // Ignore events outside of this thread + if (event.relationshipType == RelationshipTypes.thread && + event.relationshipEventId != thread.rootEvent.eventId) { + return; + } + + if (event.relationshipType == null) { + return; + } + + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { + return; + } + + if (type == EventUpdateType.timeline) { + onNewEvent?.call(); + } + + final status = event.status; + final i = _findEvent( + event_id: event.eventId, + unsigned_txid: event.transactionId, + ); + if (i < events.length) { + // if the old status is larger than the new one, we also want to preserve the old status + final oldStatus = events[i].status; + events[i] = event; + // do we preserve the status? we should allow 0 -> -1 updates and status increases + if ((latestEventStatus(status, oldStatus) == oldStatus) && + !(status.isError && oldStatus.isSending)) { + events[i].status = oldStatus; + } + addAggregatedEvent(events[i]); + onChange?.call(i); + } else { + if (type == EventUpdateType.history && + events.indexWhere((e) => e.eventId == event.eventId) != -1) { + return; + } + var index = events.length; + if (type == EventUpdateType.history) { + events.add(event); + } else { + index = events.firstIndexWhereNotError; + events.insert(index, event); + } + onInsert?.call(index); + + addAggregatedEvent(event); + } + + unawaited(thread.setLastEvent(events[events.length - 1])); + + // Handle redaction events + if (event.type == EventTypes.Redaction) { + final index = _findEvent(event_id: event.redacts); + if (index < events.length) { + removeAggregatedEvent(events[index]); + + // Is the redacted event a reaction? Then update the event this + // belongs to: + if (onChange != null) { + final relationshipEventId = events[index].relationshipEventId; + if (relationshipEventId != null) { + onChange?.call(_findEvent(event_id: relationshipEventId)); + return; + } + } + + events[index].setRedactionEvent(event); + onChange?.call(index); + } + } + + if (update) { + onUpdate?.call(); + } + } catch (e, s) { + Logs().w('Handle event update failed', e, s); + } + } + + /// Request more previous events from the server. + Future getThreadEvents({ + int historyCount = Room.defaultHistoryCount, + direction = Direction.b, + StateFilter? filter, + }) async { + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + final resp = await thread.client.getRelatingEvents( + thread.room.id, + thread.rootEvent.eventId, + dir: direction, + from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, + limit: historyCount, + recurse: true, + ); + + Logs().w( + 'Loading thread events from server ${resp.chunk.length} ${resp.prevBatch}', + ); + + if (resp.nextBatch == null) { + Logs().w('We reached the end of the timeline'); + } + + final newNextBatch = + direction == Direction.b ? resp.prevBatch : resp.nextBatch; + final newPrevBatch = + direction == Direction.b ? resp.nextBatch : resp.prevBatch; + + final type = direction == Direction.b + ? EventUpdateType.history + : EventUpdateType.timeline; + + // I dont know what this piece of code does + // if ((resp.state?.length ?? 0) == 0 && + // resp.start != resp.end && + // newPrevBatch != null && + // newNextBatch != null) { + // if (type == EventUpdateType.history) { + // Logs().w( + // '[nav] we can still request history prevBatch: $type $newPrevBatch', + // ); + // } else { + // Logs().w( + // '[nav] we can still request timeline nextBatch: $type $newNextBatch', + // ); + // } + // } + + final newEvents = + resp.chunk.map((e) => Event.fromMatrixEvent(e, thread.room)).toList(); + + if (!allowNewEvent) { + if (resp.prevBatch == resp.nextBatch || + (resp.nextBatch == null && direction == Direction.f)) { + allowNewEvent = true; + } + + if (allowNewEvent) { + Logs().d('We now allow sync update into the timeline.'); + newEvents.addAll( + await thread.client.database + .getThreadEventList(thread, onlySending: true), + ); + } + } + + // Try to decrypt encrypted events but don't update the database. + if (thread.room.encrypted && thread.client.encryptionEnabled) { + for (var i = 0; i < newEvents.length; i++) { + if (newEvents[i].type == EventTypes.Encrypted) { + newEvents[i] = await thread.client.encryption!.decryptRoomEvent( + newEvents[i], + ); + } + } + } + + // update chunk anchors + if (type == EventUpdateType.history) { + chunk.prevBatch = newPrevBatch ?? ''; + + final offset = chunk.events.length; + + chunk.events.addAll(newEvents); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i + offset); + } + } else { + chunk.nextBatch = newNextBatch ?? ''; + chunk.events.insertAll(0, newEvents.reversed); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i); + } + } + + if (onUpdate != null) { + onUpdate!(); + } + + for (final e in events) { + addAggregatedEvent(e); + } + + return resp.chunk.length; + } + + Future _requestEvents({ + int historyCount = Room.defaultHistoryCount, + required Direction direction, + StateFilter? filter, + }) async { + onUpdate?.call(); + + try { + // Look up for events in the database first. With fragmented view, we should delete the database cache + final eventsFromStore = isFragmentedTimeline + ? null + : await thread.client.database.getThreadEventList( + thread, + start: events.length, + limit: historyCount, + ); + + if (eventsFromStore != null && eventsFromStore.isNotEmpty) { + for (final e in eventsFromStore) { + addAggregatedEvent(e); + } + // Fetch all users from database we have got here. + for (final event in events) { + if (thread.room.getState(EventTypes.RoomMember, event.senderId) != + null) { + continue; + } + final dbUser = + await thread.client.database.getUser(event.senderId, thread.room); + if (dbUser != null) thread.room.setState(dbUser); + } + + if (direction == Direction.b) { + events.addAll(eventsFromStore); + final startIndex = events.length - eventsFromStore.length; + final endIndex = events.length; + for (var i = startIndex; i < endIndex; i++) { + onInsert?.call(i); + } + } else { + events.insertAll(0, eventsFromStore); + final startIndex = eventsFromStore.length; + final endIndex = 0; + for (var i = startIndex; i > endIndex; i--) { + onInsert?.call(i); + } + } + } else { + Logs().i('No more events found in the store. Request from server...'); + + if (isFragmentedTimeline) { + await getThreadEvents( + historyCount: historyCount, + direction: direction, + filter: filter, + ); + } else { + if (thread.prev_batch == null) { + Logs().i('No more events to request from server...'); + } else { + await thread.requestHistory( + historyCount: historyCount, + direction: direction, + onHistoryReceived: () {}, + filter: filter, + ); + } + } + } + } finally { + isRequestingHistory = false; + onUpdate?.call(); + } + } + + /// Add an event to the aggregation tree + void addAggregatedEvent(Event event) { + final relationshipType = event.relationshipType; + final relationshipEventId = event.relationshipEventId; + if (relationshipType == null || + relationshipType == RelationshipTypes.thread || + relationshipEventId == null) { + return; + } + // Logs().w( + // 'Adding aggregated event ${event.type} ${event.eventId} to $relationshipEventId ($relationshipType)'); + final e = (aggregatedEvents[relationshipEventId] ??= + >{})[relationshipType] ??= {}; + _removeEventFromSet(e, event); + e.add(event); + if (onChange != null) { + final index = _findEvent(event_id: relationshipEventId); + onChange?.call(index); + } + } + + /// Remove an event from aggregation + void removeAggregatedEvent(Event event) { + aggregatedEvents.remove(event.eventId); + if (event.transactionId != null) { + aggregatedEvents.remove(event.transactionId); + } + for (final types in aggregatedEvents.values) { + for (final e in types.values) { + _removeEventFromSet(e, event); + } + } + } + + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); + } + + /// Find event index by event ID or transaction ID + int _findEvent({String? event_id, String? unsigned_txid}) { + final searchNeedle = {}; + if (event_id != null) searchNeedle.add(event_id); + if (unsigned_txid != null) searchNeedle.add(unsigned_txid); + + int i; + for (i = 0; i < events.length; i++) { + final searchHaystack = {events[i].eventId}; + final txnid = events[i].transactionId; + if (txnid != null) searchHaystack.add(txnid); + if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; + } + return i; + } + + @override + void cancelSubscriptions() { + // TODO: implement cancelSubscriptions + } + + @override + Future getEventById(String id) async { + for (final event in events) { + if (event.eventId == id) return event; + } + if (_eventCache.containsKey(id)) return _eventCache[id]; + final requestedEvent = await thread.room.getEventById(id); + if (requestedEvent == null) return null; + _eventCache[id] = requestedEvent; + return _eventCache[id]; + } + + @override + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (isRequestingHistory) return; + isRequestingHistory = true; + await _requestEvents( + direction: Direction.b, + historyCount: historyCount, + filter: filter, + ); + isRequestingHistory = false; + } + + @override + Future setReadMarker({String? eventId, bool? public}) { + return thread.setReadMarker( + eventId: eventId, + public: public, + ); + } + + @override + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + String? sinceEventId, + int? limit, + bool Function(Event p1)? searchFunc, + }) { + // TODO: implement startSearch + throw UnimplementedError(); + } + + @override + bool get canRequestFuture => chunk.nextBatch.isNotEmpty; + + @override + bool get canRequestHistory => chunk.prevBatch.isNotEmpty; + + @override + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (isRequestingFuture || !canRequestFuture) return; + isRequestingFuture = true; + + try { + await getThreadEvents( + historyCount: historyCount, + direction: Direction.f, + filter: filter, + ); + } finally { + isRequestingFuture = false; + } + } + + @override + Future requestKeys({ + bool tryOnlineBackup = true, + bool onlineKeyBackupOnly = true, + }) async { + for (final event in events) { + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + final sessionId = event.content.tryGet('session_id'); + final senderKey = event.content.tryGet('sender_key'); + if (sessionId != null && senderKey != null) { + await thread.room.requestSessionKey(sessionId, senderKey); + } + } + } + } +} diff --git a/lib/src/timeline.dart b/lib/src/timeline.dart index 5ad7758d..11e50670 100644 --- a/lib/src/timeline.dart +++ b/lib/src/timeline.dart @@ -17,598 +17,87 @@ */ import 'dart:async'; -import 'dart:convert'; - -import 'package:collection/collection.dart'; - import 'package:matrix/matrix.dart'; -import 'package:matrix/src/models/timeline_chunk.dart'; -/// Represents the timeline of a room. The callback [onUpdate] will be triggered -/// automatically. The initial -/// event list will be retreived when created by the `room.getTimeline()` method. - -class Timeline { - final Room room; - List get events => chunk.events; +/// Abstract base class for all timeline implementations. +/// Provides common functionality for event management, aggregation, and search. +abstract class Timeline { + /// The list of events in this timeline + List get events; /// Map of event ID to map of type to set of aggregated events final Map>> aggregatedEvents = {}; + /// Called when the timeline is updated final void Function()? onUpdate; + + /// Called when an event at specific index changes final void Function(int index)? onChange; + + /// Called when an event is inserted at specific index final void Function(int index)? onInsert; + + /// Called when an event is removed from specific index final void Function(int index)? onRemove; + + /// Called when a new event is added to the timeline final void Function()? onNewEvent; - StreamSubscription? timelineSub; - StreamSubscription? historySub; - StreamSubscription? roomSub; - StreamSubscription? sessionIdReceivedSub; - StreamSubscription? cancelSendEventSub; - bool isRequestingHistory = false; - bool isRequestingFuture = false; - - bool allowNewEvent = true; - bool isFragmentedTimeline = false; - - final Map _eventCache = {}; - - TimelineChunk chunk; - - /// Searches for the event in this timeline. If not - /// found, requests from the server. Requested events - /// are cached. - Future getEventById(String id) async { - for (final event in events) { - if (event.eventId == id) return event; - } - if (_eventCache.containsKey(id)) return _eventCache[id]; - final requestedEvent = await room.getEventById(id); - if (requestedEvent == null) return null; - _eventCache[id] = requestedEvent; - return _eventCache[id]; - } - - // When fetching history, we will collect them into the `_historyUpdates` set - // first, and then only process all events at once, once we have the full history. - // This ensures that the entire history fetching only triggers `onUpdate` only *once*, - // even if /sync's complete while history is being proccessed. - bool _collectHistoryUpdates = false; - - // We confirmed, that there are no more events to load from the database. - bool _fetchedAllDatabaseEvents = false; - - bool get canRequestHistory { - if (!{Membership.join, Membership.leave}.contains(room.membership)) { - return false; - } - if (events.isEmpty) return true; - return !_fetchedAllDatabaseEvents || - (room.prev_batch != null && events.last.type != EventTypes.RoomCreate); - } - - /// Request more previous events from the server. [historyCount] defines how many events should - /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the - /// events, which can include various criteria such as event types (e.g., [EventTypes.Message]) - /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to - /// true by default, but this can be overridden. - /// This method does not return a value. - Future requestHistory({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, - }) async { - if (isRequestingHistory) { - return; - } - - isRequestingHistory = true; - await _requestEvents( - direction: Direction.b, - historyCount: historyCount, - filter: filter, - ); - isRequestingHistory = false; - } - - bool get canRequestFuture => !allowNewEvent; - - /// Request more future events from the server. [historyCount] defines how many events should - /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the - /// events, which can include various criteria such as event types (e.g., [EventTypes.Message]) - /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to - /// true by default, but this can be overridden. - /// This method does not return a value. - Future requestFuture({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, - }) async { - if (allowNewEvent) { - return; // we shouldn't force to add new events if they will autatically be added - } - - if (isRequestingFuture) return; - isRequestingFuture = true; - await _requestEvents( - direction: Direction.f, - historyCount: historyCount, - filter: filter, - ); - isRequestingFuture = false; - } - - Future _requestEvents({ - int historyCount = Room.defaultHistoryCount, - required Direction direction, - StateFilter? filter, - }) async { - onUpdate?.call(); - - try { - // Look up for events in the database first. With fragmented view, we should delete the database cache - final eventsFromStore = isFragmentedTimeline - ? null - : await room.client.database.getEventList( - room, - start: events.length, - limit: historyCount, - ); - - if (eventsFromStore != null && eventsFromStore.isNotEmpty) { - for (final e in eventsFromStore) { - addAggregatedEvent(e); - } - // Fetch all users from database we have got here. - for (final event in events) { - if (room.getState(EventTypes.RoomMember, event.senderId) != null) { - continue; - } - final dbUser = - await room.client.database.getUser(event.senderId, room); - if (dbUser != null) room.setState(dbUser); - } - - if (direction == Direction.b) { - events.addAll(eventsFromStore); - final startIndex = events.length - eventsFromStore.length; - final endIndex = events.length; - for (var i = startIndex; i < endIndex; i++) { - onInsert?.call(i); - } - } else { - events.insertAll(0, eventsFromStore); - final startIndex = eventsFromStore.length; - final endIndex = 0; - for (var i = startIndex; i > endIndex; i--) { - onInsert?.call(i); - } - } - } else { - _fetchedAllDatabaseEvents = true; - Logs().i('No more events found in the store. Request from server...'); - - if (isFragmentedTimeline) { - await getRoomEvents( - historyCount: historyCount, - direction: direction, - filter: filter, - ); - } else { - if (room.prev_batch == null) { - Logs().i('No more events to request from server...'); - } else { - await room.requestHistory( - historyCount: historyCount, - direction: direction, - onHistoryReceived: () { - _collectHistoryUpdates = true; - }, - filter: filter, - ); - } - } - } - } finally { - _collectHistoryUpdates = false; - isRequestingHistory = false; - onUpdate?.call(); - } - } - - /// Request more previous events from the server. [historyCount] defines how much events should - /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before** - /// the historical events will be published in the onEvent stream. [filter] allows you to specify a - /// [StateFilter] object to filter the events, which can include various criteria such as - /// event types (e.g., [EventTypes.Message]) and other state-related filters. - /// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden. - /// Returns the actual count of received timeline events. - Future getRoomEvents({ - int historyCount = Room.defaultHistoryCount, - direction = Direction.b, - StateFilter? filter, - }) async { - // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set - filter ??= StateFilter(lazyLoadMembers: true); - filter.lazyLoadMembers ??= true; - - final resp = await room.client.getRoomEvents( - room.id, - direction, - from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, - limit: historyCount, - filter: jsonEncode(filter.toJson()), - ); - - if (resp.end == null) { - Logs().w('We reached the end of the timeline'); - } - - final newNextBatch = direction == Direction.b ? resp.start : resp.end; - final newPrevBatch = direction == Direction.b ? resp.end : resp.start; - - final type = direction == Direction.b - ? EventUpdateType.history - : EventUpdateType.timeline; - - if ((resp.state?.length ?? 0) == 0 && - resp.start != resp.end && - newPrevBatch != null && - newNextBatch != null) { - if (type == EventUpdateType.history) { - Logs().w( - '[nav] we can still request history prevBatch: $type $newPrevBatch', - ); - } else { - Logs().w( - '[nav] we can still request timeline nextBatch: $type $newNextBatch', - ); - } - } - - final newEvents = - resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList(); - - if (!allowNewEvent) { - if (resp.start == resp.end || - (resp.end == null && direction == Direction.f)) { - allowNewEvent = true; - } - - if (allowNewEvent) { - Logs().d('We now allow sync update into the timeline.'); - newEvents.addAll( - await room.client.database.getEventList(room, onlySending: true), - ); - } - } - - // Try to decrypt encrypted events but don't update the database. - if (room.encrypted && room.client.encryptionEnabled) { - for (var i = 0; i < newEvents.length; i++) { - if (newEvents[i].type == EventTypes.Encrypted) { - newEvents[i] = await room.client.encryption!.decryptRoomEvent( - newEvents[i], - ); - } - } - } - - // update chunk anchors - if (type == EventUpdateType.history) { - chunk.prevBatch = newPrevBatch ?? ''; - - final offset = chunk.events.length; - - chunk.events.addAll(newEvents); - - for (var i = 0; i < newEvents.length; i++) { - onInsert?.call(i + offset); - } - } else { - chunk.nextBatch = newNextBatch ?? ''; - chunk.events.insertAll(0, newEvents.reversed); - - for (var i = 0; i < newEvents.length; i++) { - onInsert?.call(i); - } - } - - if (onUpdate != null) { - onUpdate!(); - } - return resp.chunk.length; - } + bool get canRequestHistory; + bool get canRequestFuture; + bool get allowNewEvent; + bool get isRequestingFuture; + bool get isRequestingHistory; + bool get isFragmentedTimeline; Timeline({ - required this.room, this.onUpdate, this.onChange, this.onInsert, this.onRemove, this.onNewEvent, - required this.chunk, - }) { - timelineSub = room.client.onTimelineEvent.stream.listen( - (event) => _handleEventUpdate( - event, - EventUpdateType.timeline, - ), - ); - historySub = room.client.onHistoryEvent.stream.listen( - (event) => _handleEventUpdate( - event, - EventUpdateType.history, - ), - ); + }); - // If the timeline is limited we want to clear our events cache - roomSub = room.client.onSync.stream - .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true) - .listen(_removeEventsNotInThisSync); + /// Searches for the event in this timeline. If not found, requests from server. + Future getEventById(String id); - sessionIdReceivedSub = - room.onSessionKeyReceived.stream.listen(_sessionKeyReceived); - cancelSendEventSub = - room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent); + /// Request more previous events + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }); - // we want to populate our aggregated events - for (final e in events) { - addAggregatedEvent(e); - } + /// Request more future events + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }); - // we are using a fragmented timeline - if (chunk.nextBatch != '') { - allowNewEvent = false; - isFragmentedTimeline = true; - // fragmented timelines never read from the database. - _fetchedAllDatabaseEvents = true; - } - } + /// Set the read marker to an event in this timeline + Future setReadMarker({String? eventId, bool? public}); - void _cleanUpCancelledEvent(String eventId) { - final i = _findEvent(event_id: eventId); - if (i < events.length) { - removeAggregatedEvent(events[i]); - events.removeAt(i); - onRemove?.call(i); - onUpdate?.call(); - } - } - - /// Removes all entries from [events] which are not in this SyncUpdate. - void _removeEventsNotInThisSync(SyncUpdate sync) { - final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? []; - final keepEventIds = newSyncEvents.map((e) => e.eventId); - events.removeWhere((e) => !keepEventIds.contains(e.eventId)); - } - - /// Don't forget to call this before you dismiss this object! - void cancelSubscriptions() { - // ignore: discarded_futures - timelineSub?.cancel(); - // ignore: discarded_futures - historySub?.cancel(); - // ignore: discarded_futures - roomSub?.cancel(); - // ignore: discarded_futures - sessionIdReceivedSub?.cancel(); - // ignore: discarded_futures - cancelSendEventSub?.cancel(); - } - - void _sessionKeyReceived(String sessionId) async { - var decryptAtLeastOneEvent = false; - Future decryptFn() async { - final encryption = room.client.encryption; - if (!room.client.encryptionEnabled || encryption == null) { - return; - } - for (var i = 0; i < events.length; i++) { - if (events[i].type == EventTypes.Encrypted && - events[i].messageType == MessageTypes.BadEncrypted && - events[i].content['session_id'] == sessionId) { - events[i] = await encryption.decryptRoomEvent( - events[i], - store: true, - updateType: EventUpdateType.history, - ); - addAggregatedEvent(events[i]); - onChange?.call(i); - if (events[i].type != EventTypes.Encrypted) { - decryptAtLeastOneEvent = true; - } - } - } - } - - await room.client.database.transaction(decryptFn); - if (decryptAtLeastOneEvent) onUpdate?.call(); - } - - /// Request the keys for undecryptable events of this timeline + /// Request keys for undecryptable events void requestKeys({ bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true, - }) { - for (final event in events) { - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - final sessionId = event.content.tryGet('session_id'); - final senderKey = event.content.tryGet('sender_key'); - if (sessionId != null && senderKey != null) { - room.client.encryption?.keyManager.maybeAutoRequest( - room.id, - sessionId, - senderKey, - tryOnlineBackup: tryOnlineBackup, - onlineKeyBackupOnly: onlineKeyBackupOnly, - ); - } - } - } - } + }); - /// Set the read marker to the last synced event in this timeline. - Future setReadMarker({String? eventId, bool? public}) async { - eventId ??= - events.firstWhereOrNull((event) => event.status.isSynced)?.eventId; - if (eventId == null) return; - return room.setReadMarker(eventId, mRead: eventId, public: public); - } + /// Search events in this timeline + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + @Deprecated('Use [prevBatch] instead.') String? sinceEventId, + int? limit, + bool Function(Event)? searchFunc, + }); - int _findEvent({String? event_id, String? unsigned_txid}) { - // we want to find any existing event where either the passed event_id or the passed unsigned_txid - // matches either the event_id or transaction_id of the existing event. - // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match. - // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair, - // thus meaning we found our element. - final searchNeedle = {}; - if (event_id != null) { - searchNeedle.add(event_id); - } - if (unsigned_txid != null) { - searchNeedle.add(unsigned_txid); - } - int i; - for (i = 0; i < events.length; i++) { - final searchHaystack = {events[i].eventId}; + /// Handle event updates (to be implemented by subclasses) + void _handleEventUpdate(Event event, EventUpdateType type, {bool update = true}); - final txnid = events[i].transactionId; - if (txnid != null) { - searchHaystack.add(txnid); - } - if (searchNeedle.intersection(searchHaystack).isNotEmpty) { - break; - } - } - return i; - } - - void _removeEventFromSet(Set eventSet, Event event) { - eventSet.removeWhere( - (e) => - e.matchesEventOrTransactionId(event.eventId) || - event.unsigned != null && - e.matchesEventOrTransactionId(event.transactionId), - ); - } - - void addAggregatedEvent(Event event) { - // we want to add an event to the aggregation tree - final relationshipType = event.relationshipType; - final relationshipEventId = event.relationshipEventId; - if (relationshipType == null || relationshipEventId == null) { - return; // nothing to do - } - final e = (aggregatedEvents[relationshipEventId] ??= - >{})[relationshipType] ??= {}; - // remove a potential old event - _removeEventFromSet(e, event); - // add the new one - e.add(event); - if (onChange != null) { - final index = _findEvent(event_id: relationshipEventId); - onChange?.call(index); - } - } - - void removeAggregatedEvent(Event event) { - aggregatedEvents.remove(event.eventId); - if (event.transactionId != null) { - aggregatedEvents.remove(event.transactionId); - } - for (final types in aggregatedEvents.values) { - for (final e in types.values) { - _removeEventFromSet(e, event); - } - } - } - - void _handleEventUpdate( - Event event, - EventUpdateType type, { - bool update = true, - }) { - try { - if (event.roomId != room.id) return; - - if (type != EventUpdateType.timeline && type != EventUpdateType.history) { - return; - } - - if (type == EventUpdateType.timeline) { - onNewEvent?.call(); - } - - if (!allowNewEvent) return; - - final status = event.status; - - final i = _findEvent( - event_id: event.eventId, - unsigned_txid: event.transactionId, - ); - - if (i < events.length) { - // if the old status is larger than the new one, we also want to preserve the old status - final oldStatus = events[i].status; - events[i] = event; - // do we preserve the status? we should allow 0 -> -1 updates and status increases - if ((latestEventStatus(status, oldStatus) == oldStatus) && - !(status.isError && oldStatus.isSending)) { - events[i].status = oldStatus; - } - addAggregatedEvent(events[i]); - onChange?.call(i); - } else { - if (type == EventUpdateType.history && - events.indexWhere( - (e) => e.eventId == event.eventId, - ) != - -1) { - return; - } - var index = events.length; - if (type == EventUpdateType.history) { - events.add(event); - } else { - index = events.firstIndexWhereNotError; - events.insert(index, event); - } - onInsert?.call(index); - - addAggregatedEvent(event); - } - - // Handle redaction events - if (event.type == EventTypes.Redaction) { - final index = _findEvent(event_id: event.redacts); - if (index < events.length) { - removeAggregatedEvent(events[index]); - - // Is the redacted event a reaction? Then update the event this - // belongs to: - if (onChange != null) { - final relationshipEventId = events[index].relationshipEventId; - if (relationshipEventId != null) { - onChange?.call(_findEvent(event_id: relationshipEventId)); - return; - } - } - - events[index].setRedactionEvent(event); - onChange?.call(index); - } - } - - if (update && !_collectHistoryUpdates) { - onUpdate?.call(); - } - } catch (e, s) { - Logs().w('Handle event update failed', e, s); - } - } + /// Cancel all subscriptions + void cancelSubscriptions(); @Deprecated('Use [startSearch] instead.') Stream> searchEvent({ @@ -623,114 +112,18 @@ class Timeline { searchTerm: searchTerm, requestHistoryCount: requestHistoryCount, maxHistoryRequests: maxHistoryRequests, - // ignore: deprecated_member_use_from_same_package sinceEventId: sinceEventId, limit: limit, searchFunc: searchFunc, ).map((result) => result.$1); - - /// Searches [searchTerm] in this timeline. It first searches in the - /// cache, then in the database and then on the server. The search can - /// take a while, which is why this returns a stream so the already found - /// events can already be displayed. - /// Override the [searchFunc] if you need another search. This will then - /// ignore [searchTerm]. - /// Returns the List of Events and the next prevBatch at the end of the - /// search. - Stream<(List, String?)> startSearch({ - String? searchTerm, - int requestHistoryCount = 100, - int maxHistoryRequests = 10, - String? prevBatch, - @Deprecated('Use [prevBatch] instead.') String? sinceEventId, - int? limit, - bool Function(Event)? searchFunc, - }) async* { - assert(searchTerm != null || searchFunc != null); - searchFunc ??= (event) => - event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? ''); - final found = []; - - if (sinceEventId == null) { - // Search locally - for (final event in events) { - if (searchFunc(event)) { - yield (found..add(event), null); - } - } - - // Search in database - var start = events.length; - while (true) { - final eventsFromStore = await room.client.database.getEventList( - room, - start: start, - limit: requestHistoryCount, - ); - if (eventsFromStore.isEmpty) break; - start += eventsFromStore.length; - for (final event in eventsFromStore) { - if (searchFunc(event)) { - yield (found..add(event), null); - } - } - } - } - - // Search on the server - prevBatch ??= room.prev_batch; - if (sinceEventId != null) { - prevBatch = - (await room.client.getEventContext(room.id, sinceEventId)).end; - } - final encryption = room.client.encryption; - for (var i = 0; i < maxHistoryRequests; i++) { - if (prevBatch == null) break; - if (limit != null && found.length >= limit) break; - try { - final resp = await room.client.getRoomEvents( - room.id, - Direction.b, - from: prevBatch, - limit: requestHistoryCount, - filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), - ); - for (final matrixEvent in resp.chunk) { - var event = Event.fromMatrixEvent(matrixEvent, room); - if (event.type == EventTypes.Encrypted && encryption != null) { - event = await encryption.decryptRoomEvent(event); - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - // Await requestKey() here to ensure decrypted message bodies - await event.requestKey(); - } - } - if (searchFunc(event)) { - yield (found..add(event), resp.end); - if (limit != null && found.length >= limit) break; - } - } - prevBatch = resp.end; - // We are at the beginning of the room - if (resp.chunk.length < requestHistoryCount) break; - } on MatrixException catch (e) { - // We have no permission anymore to request the history - if (e.error == MatrixError.M_FORBIDDEN) { - break; - } - rethrow; - } - } - return; - } } -extension on List { +// TODO: make up a better name +extension TimelineExtension on List { int get firstIndexWhereNotError { if (isEmpty) return 0; final index = indexWhere((event) => !event.status.isError); if (index == -1) return length; return index; } -} +} \ No newline at end of file