From c3e596653de51ee873c62e49ba5c282bf3fa100f Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Sun, 19 Oct 2025 16:37:49 +0500 Subject: [PATCH 01/18] boilerplate for threads --- find.js | 23 +++++++++++++++++++++++ lib/src/thread.dart | 27 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 find.js create mode 100644 lib/src/thread.dart diff --git a/find.js b/find.js new file mode 100644 index 00000000..695558b4 --- /dev/null +++ b/find.js @@ -0,0 +1,23 @@ +import fs from 'fs'; + +const files = fs.readdirSync('lib/', { + recursive: true +}); + +const q = process.argv[2]; + +var total = 0; + +for (const f of files) { + try { + const b = fs.readFileSync(`lib/${f}`, 'utf-8'); + if (b.includes(q) || f.includes(q)) { + total ++; + console.log(f); + } + } catch (error) { + + } +} + +console.log(`${total} files in total`); \ No newline at end of file diff --git a/lib/src/thread.dart b/lib/src/thread.dart new file mode 100644 index 00000000..ed9f8216 --- /dev/null +++ b/lib/src/thread.dart @@ -0,0 +1,27 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:math'; + +import 'package:async/async.dart'; +import 'package:collection/collection.dart'; +import 'package:html_unescape/html_unescape.dart'; + +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; +import 'package:matrix/src/utils/cached_stream_controller.dart'; +import 'package:matrix/src/utils/file_send_request_credentials.dart'; +import 'package:matrix/src/utils/markdown.dart'; +import 'package:matrix/src/utils/marked_unread.dart'; +import 'package:matrix/src/utils/space_child.dart'; + +class Thread { + final Room room; + final String threadRootId; + + Thread({ + required Room this.room, + required String this.threadRootId + }) { + + } +} \ No newline at end of file From 8f4da40dec027ca581296489fa9477eed82777e2 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Sun, 19 Oct 2025 17:44:46 +0500 Subject: [PATCH 02/18] split Timeline into abstract and RoomTimeline --- lib/src/client.dart | 3 +- lib/src/room_timeline.dart | 622 ++++++++++++++++++++++++++++++++ lib/src/thread.dart | 4 +- lib/src/timeline.dart | 713 ++++--------------------------------- 4 files changed, 702 insertions(+), 640 deletions(-) create mode 100644 lib/src/room_timeline.dart diff --git a/lib/src/client.dart b/lib/src/client.dart index de3213eb..38a0195d 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -25,6 +25,7 @@ import 'dart:typed_data'; import 'package:async/async.dart'; import 'package:collection/collection.dart' show IterableExtension; import 'package:http/http.dart' as http; +import 'package:matrix/src/room_timeline.dart'; import 'package:mime/mime.dart'; import 'package:random_string/random_string.dart'; import 'package:vodozemac/vodozemac.dart' as vod; @@ -1213,7 +1214,7 @@ class Client extends MatrixApi { // Set membership of room to leave, in the case we got a left room passed, otherwise // the left room would have still membership join, which would be wrong for the setState later archivedRoom.membership = Membership.leave; - final timeline = Timeline( + final timeline = RoomTimeline( room: archivedRoom, chunk: TimelineChunk( events: roomUpdate.timeline?.events?.reversed diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart new file mode 100644 index 00000000..46cf9551 --- /dev/null +++ b/lib/src/room_timeline.dart @@ -0,0 +1,622 @@ +/* + * Famedly Matrix SDK + * Copyright (C) 2019, 2020, 2021 Famedly GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import 'dart:async'; +import 'dart:convert'; + +import 'package:collection/collection.dart'; +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; + +/// Represents the main timeline of a room. +class RoomTimeline extends Timeline { + @override + final Room room; + @override + List get events => chunk.events; + + TimelineChunk chunk; + + StreamSubscription? timelineSub; + StreamSubscription? historySub; + StreamSubscription? roomSub; + StreamSubscription? sessionIdReceivedSub; + StreamSubscription? cancelSendEventSub; + + bool isRequestingHistory = false; + bool isRequestingFuture = false; + bool allowNewEvent = true; + bool isFragmentedTimeline = false; + + final Map _eventCache = {}; + + // When fetching history, we will collect them into the `_historyUpdates` set + // first, and then only process all events at once, once we have the full history. + // This ensures that the entire history fetching only triggers `onUpdate` only *once*, + // even if /sync's complete while history is being proccessed. + bool _collectHistoryUpdates = false; + + // We confirmed, that there are no more events to load from the database. + bool _fetchedAllDatabaseEvents = false; + + @override + bool get canRequestHistory { + if (!{Membership.join, Membership.leave}.contains(room.membership)) { + return false; + } + if (events.isEmpty) return true; + return !_fetchedAllDatabaseEvents || + (room.prev_batch != null && events.last.type != EventTypes.RoomCreate); + } + + @override + bool get canRequestFuture => !allowNewEvent; + + RoomTimeline({ + required this.room, + required this.chunk, + super.onUpdate, + super.onChange, + super.onInsert, + super.onRemove, + super.onNewEvent, + }) { + timelineSub = room.client.onTimelineEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.timeline), + ); + historySub = room.client.onHistoryEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.history), + ); + + // If the timeline is limited we want to clear our events cache + roomSub = room.client.onSync.stream + .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true) + .listen(_removeEventsNotInThisSync); + + sessionIdReceivedSub = + room.onSessionKeyReceived.stream.listen(_sessionKeyReceived); + cancelSendEventSub = + room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent); + + // we want to populate our aggregated events + for (final e in events) { + addAggregatedEvent(e); + } + + // we are using a fragmented timeline + if (chunk.nextBatch != '') { + allowNewEvent = false; + isFragmentedTimeline = true; + // fragmented timelines never read from the database. + _fetchedAllDatabaseEvents = true; + } + } + + @override + Future getEventById(String id) async { + for (final event in events) { + if (event.eventId == id) return event; + } + if (_eventCache.containsKey(id)) return _eventCache[id]; + final requestedEvent = await room.getEventById(id); + if (requestedEvent == null) return null; + _eventCache[id] = requestedEvent; + return _eventCache[id]; + } + + @override + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (isRequestingHistory) return; + isRequestingHistory = true; + await _requestEvents( + direction: Direction.b, + historyCount: historyCount, + filter: filter, + ); + isRequestingHistory = false; + } + + @override + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (allowNewEvent) return; + if (isRequestingFuture) return; + isRequestingFuture = true; + await _requestEvents( + direction: Direction.f, + historyCount: historyCount, + filter: filter, + ); + isRequestingFuture = false; + } + + Future _requestEvents({ + int historyCount = Room.defaultHistoryCount, + required Direction direction, + StateFilter? filter, + }) async { + onUpdate?.call(); + + try { + // Look up for events in the database first. With fragmented view, we should delete the database cache + final eventsFromStore = isFragmentedTimeline + ? null + : await room.client.database.getEventList( + room, + start: events.length, + limit: historyCount, + ); + + if (eventsFromStore != null && eventsFromStore.isNotEmpty) { + for (final e in eventsFromStore) { + addAggregatedEvent(e); + } + // Fetch all users from database we have got here. + for (final event in events) { + if (room.getState(EventTypes.RoomMember, event.senderId) != null) { + continue; + } + final dbUser = + await room.client.database.getUser(event.senderId, room); + if (dbUser != null) room.setState(dbUser); + } + + if (direction == Direction.b) { + events.addAll(eventsFromStore); + final startIndex = events.length - eventsFromStore.length; + final endIndex = events.length; + for (var i = startIndex; i < endIndex; i++) { + onInsert?.call(i); + } + } else { + events.insertAll(0, eventsFromStore); + final startIndex = eventsFromStore.length; + final endIndex = 0; + for (var i = startIndex; i > endIndex; i--) { + onInsert?.call(i); + } + } + } else { + _fetchedAllDatabaseEvents = true; + Logs().i('No more events found in the store. Request from server...'); + + if (isFragmentedTimeline) { + await getRoomEvents( + historyCount: historyCount, + direction: direction, + filter: filter, + ); + } else { + if (room.prev_batch == null) { + Logs().i('No more events to request from server...'); + } else { + await room.requestHistory( + historyCount: historyCount, + direction: direction, + onHistoryReceived: () { + _collectHistoryUpdates = true; + }, + filter: filter, + ); + } + } + } + } finally { + _collectHistoryUpdates = false; + isRequestingHistory = false; + onUpdate?.call(); + } + } + + /// Request more previous events from the server. + Future getRoomEvents({ + int historyCount = Room.defaultHistoryCount, + direction = Direction.b, + StateFilter? filter, + }) async { + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + final resp = await room.client.getRoomEvents( + room.id, + direction, + from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, + limit: historyCount, + filter: jsonEncode(filter.toJson()), + ); + + if (resp.end == null) { + Logs().w('We reached the end of the timeline'); + } + + final newNextBatch = direction == Direction.b ? resp.start : resp.end; + final newPrevBatch = direction == Direction.b ? resp.end : resp.start; + + final type = direction == Direction.b + ? EventUpdateType.history + : EventUpdateType.timeline; + + if ((resp.state?.length ?? 0) == 0 && + resp.start != resp.end && + newPrevBatch != null && + newNextBatch != null) { + if (type == EventUpdateType.history) { + Logs().w( + '[nav] we can still request history prevBatch: $type $newPrevBatch', + ); + } else { + Logs().w( + '[nav] we can still request timeline nextBatch: $type $newNextBatch', + ); + } + } + + final newEvents = + resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList(); + + if (!allowNewEvent) { + if (resp.start == resp.end || + (resp.end == null && direction == Direction.f)) { + allowNewEvent = true; + } + + if (allowNewEvent) { + Logs().d('We now allow sync update into the timeline.'); + newEvents.addAll( + await room.client.database.getEventList(room, onlySending: true), + ); + } + } + + // Try to decrypt encrypted events but don't update the database. + if (room.encrypted && room.client.encryptionEnabled) { + for (var i = 0; i < newEvents.length; i++) { + if (newEvents[i].type == EventTypes.Encrypted) { + newEvents[i] = await room.client.encryption!.decryptRoomEvent( + newEvents[i], + ); + } + } + } + + // update chunk anchors + if (type == EventUpdateType.history) { + chunk.prevBatch = newPrevBatch ?? ''; + + final offset = chunk.events.length; + + chunk.events.addAll(newEvents); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i + offset); + } + } else { + chunk.nextBatch = newNextBatch ?? ''; + chunk.events.insertAll(0, newEvents.reversed); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i); + } + } + + if (onUpdate != null) { + onUpdate!(); + } + return resp.chunk.length; + } + + void _cleanUpCancelledEvent(String eventId) { + final i = _findEvent(event_id: eventId); + if (i < events.length) { + removeAggregatedEvent(events[i]); + events.removeAt(i); + onRemove?.call(i); + onUpdate?.call(); + } + } + + /// Removes all entries from [events] which are not in this SyncUpdate. + void _removeEventsNotInThisSync(SyncUpdate sync) { + final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? []; + final keepEventIds = newSyncEvents.map((e) => e.eventId); + events.removeWhere((e) => !keepEventIds.contains(e.eventId)); + } + + @override + void cancelSubscriptions() { + timelineSub?.cancel(); + historySub?.cancel(); + roomSub?.cancel(); + sessionIdReceivedSub?.cancel(); + cancelSendEventSub?.cancel(); + } + + void _sessionKeyReceived(String sessionId) async { + var decryptAtLeastOneEvent = false; + Future decryptFn() async { + final encryption = room.client.encryption; + if (!room.client.encryptionEnabled || encryption == null) { + return; + } + for (var i = 0; i < events.length; i++) { + if (events[i].type == EventTypes.Encrypted && + events[i].messageType == MessageTypes.BadEncrypted && + events[i].content['session_id'] == sessionId) { + events[i] = await encryption.decryptRoomEvent( + events[i], + store: true, + updateType: EventUpdateType.history, + ); + addAggregatedEvent(events[i]); + onChange?.call(i); + if (events[i].type != EventTypes.Encrypted) { + decryptAtLeastOneEvent = true; + } + } + } + } + + await room.client.database.transaction(decryptFn); + if (decryptAtLeastOneEvent) onUpdate?.call(); + } + + @override + void requestKeys({ + bool tryOnlineBackup = true, + bool onlineKeyBackupOnly = true, + }) { + for (final event in events) { + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + final sessionId = event.content.tryGet('session_id'); + final senderKey = event.content.tryGet('sender_key'); + if (sessionId != null && senderKey != null) { + room.client.encryption?.keyManager.maybeAutoRequest( + room.id, + sessionId, + senderKey, + tryOnlineBackup: tryOnlineBackup, + onlineKeyBackupOnly: onlineKeyBackupOnly, + ); + } + } + } + } + + @override + Future setReadMarker({String? eventId, bool? public}) async { + eventId ??= + events.firstWhereOrNull((event) => event.status.isSynced)?.eventId; + if (eventId == null) return; + return room.setReadMarker(eventId, mRead: eventId, public: public); + } + + /// Find event index by event ID or transaction ID + int _findEvent({String? event_id, String? unsigned_txid}) { + final searchNeedle = {}; + if (event_id != null) searchNeedle.add(event_id); + if (unsigned_txid != null) searchNeedle.add(unsigned_txid); + + int i; + for (i = 0; i < events.length; i++) { + final searchHaystack = {events[i].eventId}; + final txnid = events[i].transactionId; + if (txnid != null) searchHaystack.add(txnid); + if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; + } + return i; + } + + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); + } + + @override + void _handleEventUpdate( + Event event, + EventUpdateType type, { + bool update = true, + }) { + try { + if (event.roomId != room.id) return; + + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { + return; + } + + if (type == EventUpdateType.timeline) { + onNewEvent?.call(); + } + + if (!allowNewEvent) return; + + final status = event.status; + + final i = _findEvent( + event_id: event.eventId, + unsigned_txid: event.transactionId, + ); + + if (i < events.length) { + // if the old status is larger than the new one, we also want to preserve the old status + final oldStatus = events[i].status; + events[i] = event; + // do we preserve the status? we should allow 0 -> -1 updates and status increases + if ((latestEventStatus(status, oldStatus) == oldStatus) && + !(status.isError && oldStatus.isSending)) { + events[i].status = oldStatus; + } + addAggregatedEvent(events[i]); + onChange?.call(i); + } else { + if (type == EventUpdateType.history && + events.indexWhere((e) => e.eventId == event.eventId) != -1) { + return; + } + var index = events.length; + if (type == EventUpdateType.history) { + events.add(event); + } else { + index = events.firstIndexWhereNotError; + events.insert(index, event); + } + onInsert?.call(index); + + addAggregatedEvent(event); + } + + // Handle redaction events + if (event.type == EventTypes.Redaction) { + final index = _findEvent(event_id: event.redacts); + if (index < events.length) { + removeAggregatedEvent(events[index]); + + // Is the redacted event a reaction? Then update the event this + // belongs to: + if (onChange != null) { + final relationshipEventId = events[index].relationshipEventId; + if (relationshipEventId != null) { + onChange?.call(_findEvent(event_id: relationshipEventId)); + return; + } + } + + events[index].setRedactionEvent(event); + onChange?.call(index); + } + } + + if (update && !_collectHistoryUpdates) { + onUpdate?.call(); + } + } catch (e, s) { + Logs().w('Handle event update failed', e, s); + } + } + + @override + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + @Deprecated('Use [prevBatch] instead.') String? sinceEventId, + int? limit, + bool Function(Event)? searchFunc, + }) async* { + assert(searchTerm != null || searchFunc != null); + searchFunc ??= (event) => + event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? ''); + final found = []; + + if (sinceEventId == null) { + // Search locally + for (final event in events) { + if (searchFunc(event)) { + yield (found..add(event), null); + } + } + + // Search in database + var start = events.length; + while (true) { + final eventsFromStore = await room.client.database.getEventList( + room, + start: start, + limit: requestHistoryCount, + ); + if (eventsFromStore.isEmpty) break; + start += eventsFromStore.length; + for (final event in eventsFromStore) { + if (searchFunc(event)) { + yield (found..add(event), null); + } + } + } + } + + // Search on the server + prevBatch ??= room.prev_batch; + if (sinceEventId != null) { + prevBatch = + (await room.client.getEventContext(room.id, sinceEventId)).end; + } + final encryption = room.client.encryption; + for (var i = 0; i < maxHistoryRequests; i++) { + if (prevBatch == null) break; + if (limit != null && found.length >= limit) break; + try { + final resp = await room.client.getRoomEvents( + room.id, + Direction.b, + from: prevBatch, + limit: requestHistoryCount, + filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), + ); + for (final matrixEvent in resp.chunk) { + var event = Event.fromMatrixEvent(matrixEvent, room); + if (event.type == EventTypes.Encrypted && encryption != null) { + event = await encryption.decryptRoomEvent(event); + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + // Await requestKey() here to ensure decrypted message bodies + await event.requestKey(); + } + } + if (searchFunc(event)) { + yield (found..add(event), resp.end); + if (limit != null && found.length >= limit) break; + } + } + prevBatch = resp.end; + // We are at the beginning of the room + if (resp.chunk.length < requestHistoryCount) break; + } on MatrixException catch (e) { + // We have no permission anymore to request the history + if (e.error == MatrixError.M_FORBIDDEN) { + break; + } + rethrow; + } + } + return; + } +} + +extension on List { + int get firstIndexWhereNotError { + if (isEmpty) return 0; + final index = indexWhere((event) => !event.status.isError); + if (index == -1) return length; + return index; + } +} \ No newline at end of file diff --git a/lib/src/thread.dart b/lib/src/thread.dart index ed9f8216..aa707a8e 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -16,11 +16,11 @@ import 'package:matrix/src/utils/space_child.dart'; class Thread { final Room room; - final String threadRootId; + final MatrixEvent rootEvent; Thread({ required Room this.room, - required String this.threadRootId + required MatrixEvent this.rootEvent }) { } diff --git a/lib/src/timeline.dart b/lib/src/timeline.dart index 5ad7758d..3decd3cb 100644 --- a/lib/src/timeline.dart +++ b/lib/src/timeline.dart @@ -20,492 +20,89 @@ import 'dart:async'; import 'dart:convert'; import 'package:collection/collection.dart'; - import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; -/// Represents the timeline of a room. The callback [onUpdate] will be triggered -/// automatically. The initial -/// event list will be retreived when created by the `room.getTimeline()` method. - -class Timeline { - final Room room; - List get events => chunk.events; +/// Abstract base class for all timeline implementations. +/// Provides common functionality for event management, aggregation, and search. +abstract class Timeline { + /// The list of events in this timeline + List get events; /// Map of event ID to map of type to set of aggregated events final Map>> aggregatedEvents = {}; + /// Called when the timeline is updated final void Function()? onUpdate; + + /// Called when an event at specific index changes final void Function(int index)? onChange; + + /// Called when an event is inserted at specific index final void Function(int index)? onInsert; + + /// Called when an event is removed from specific index final void Function(int index)? onRemove; + + /// Called when a new event is added to the timeline final void Function()? onNewEvent; - StreamSubscription? timelineSub; - StreamSubscription? historySub; - StreamSubscription? roomSub; - StreamSubscription? sessionIdReceivedSub; - StreamSubscription? cancelSendEventSub; - bool isRequestingHistory = false; - bool isRequestingFuture = false; - - bool allowNewEvent = true; - bool isFragmentedTimeline = false; - - final Map _eventCache = {}; - - TimelineChunk chunk; - - /// Searches for the event in this timeline. If not - /// found, requests from the server. Requested events - /// are cached. - Future getEventById(String id) async { - for (final event in events) { - if (event.eventId == id) return event; - } - if (_eventCache.containsKey(id)) return _eventCache[id]; - final requestedEvent = await room.getEventById(id); - if (requestedEvent == null) return null; - _eventCache[id] = requestedEvent; - return _eventCache[id]; - } - - // When fetching history, we will collect them into the `_historyUpdates` set - // first, and then only process all events at once, once we have the full history. - // This ensures that the entire history fetching only triggers `onUpdate` only *once*, - // even if /sync's complete while history is being proccessed. - bool _collectHistoryUpdates = false; - - // We confirmed, that there are no more events to load from the database. - bool _fetchedAllDatabaseEvents = false; - - bool get canRequestHistory { - if (!{Membership.join, Membership.leave}.contains(room.membership)) { - return false; - } - if (events.isEmpty) return true; - return !_fetchedAllDatabaseEvents || - (room.prev_batch != null && events.last.type != EventTypes.RoomCreate); - } - - /// Request more previous events from the server. [historyCount] defines how many events should - /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the - /// events, which can include various criteria such as event types (e.g., [EventTypes.Message]) - /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to - /// true by default, but this can be overridden. - /// This method does not return a value. - Future requestHistory({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, - }) async { - if (isRequestingHistory) { - return; - } - - isRequestingHistory = true; - await _requestEvents( - direction: Direction.b, - historyCount: historyCount, - filter: filter, - ); - isRequestingHistory = false; - } - - bool get canRequestFuture => !allowNewEvent; - - /// Request more future events from the server. [historyCount] defines how many events should - /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the - /// events, which can include various criteria such as event types (e.g., [EventTypes.Message]) - /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to - /// true by default, but this can be overridden. - /// This method does not return a value. - Future requestFuture({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, - }) async { - if (allowNewEvent) { - return; // we shouldn't force to add new events if they will autatically be added - } - - if (isRequestingFuture) return; - isRequestingFuture = true; - await _requestEvents( - direction: Direction.f, - historyCount: historyCount, - filter: filter, - ); - isRequestingFuture = false; - } - - Future _requestEvents({ - int historyCount = Room.defaultHistoryCount, - required Direction direction, - StateFilter? filter, - }) async { - onUpdate?.call(); - - try { - // Look up for events in the database first. With fragmented view, we should delete the database cache - final eventsFromStore = isFragmentedTimeline - ? null - : await room.client.database.getEventList( - room, - start: events.length, - limit: historyCount, - ); - - if (eventsFromStore != null && eventsFromStore.isNotEmpty) { - for (final e in eventsFromStore) { - addAggregatedEvent(e); - } - // Fetch all users from database we have got here. - for (final event in events) { - if (room.getState(EventTypes.RoomMember, event.senderId) != null) { - continue; - } - final dbUser = - await room.client.database.getUser(event.senderId, room); - if (dbUser != null) room.setState(dbUser); - } - - if (direction == Direction.b) { - events.addAll(eventsFromStore); - final startIndex = events.length - eventsFromStore.length; - final endIndex = events.length; - for (var i = startIndex; i < endIndex; i++) { - onInsert?.call(i); - } - } else { - events.insertAll(0, eventsFromStore); - final startIndex = eventsFromStore.length; - final endIndex = 0; - for (var i = startIndex; i > endIndex; i--) { - onInsert?.call(i); - } - } - } else { - _fetchedAllDatabaseEvents = true; - Logs().i('No more events found in the store. Request from server...'); - - if (isFragmentedTimeline) { - await getRoomEvents( - historyCount: historyCount, - direction: direction, - filter: filter, - ); - } else { - if (room.prev_batch == null) { - Logs().i('No more events to request from server...'); - } else { - await room.requestHistory( - historyCount: historyCount, - direction: direction, - onHistoryReceived: () { - _collectHistoryUpdates = true; - }, - filter: filter, - ); - } - } - } - } finally { - _collectHistoryUpdates = false; - isRequestingHistory = false; - onUpdate?.call(); - } - } - - /// Request more previous events from the server. [historyCount] defines how much events should - /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before** - /// the historical events will be published in the onEvent stream. [filter] allows you to specify a - /// [StateFilter] object to filter the events, which can include various criteria such as - /// event types (e.g., [EventTypes.Message]) and other state-related filters. - /// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden. - /// Returns the actual count of received timeline events. - Future getRoomEvents({ - int historyCount = Room.defaultHistoryCount, - direction = Direction.b, - StateFilter? filter, - }) async { - // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set - filter ??= StateFilter(lazyLoadMembers: true); - filter.lazyLoadMembers ??= true; - - final resp = await room.client.getRoomEvents( - room.id, - direction, - from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, - limit: historyCount, - filter: jsonEncode(filter.toJson()), - ); - - if (resp.end == null) { - Logs().w('We reached the end of the timeline'); - } - - final newNextBatch = direction == Direction.b ? resp.start : resp.end; - final newPrevBatch = direction == Direction.b ? resp.end : resp.start; - - final type = direction == Direction.b - ? EventUpdateType.history - : EventUpdateType.timeline; - - if ((resp.state?.length ?? 0) == 0 && - resp.start != resp.end && - newPrevBatch != null && - newNextBatch != null) { - if (type == EventUpdateType.history) { - Logs().w( - '[nav] we can still request history prevBatch: $type $newPrevBatch', - ); - } else { - Logs().w( - '[nav] we can still request timeline nextBatch: $type $newNextBatch', - ); - } - } - - final newEvents = - resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList(); - - if (!allowNewEvent) { - if (resp.start == resp.end || - (resp.end == null && direction == Direction.f)) { - allowNewEvent = true; - } - - if (allowNewEvent) { - Logs().d('We now allow sync update into the timeline.'); - newEvents.addAll( - await room.client.database.getEventList(room, onlySending: true), - ); - } - } - - // Try to decrypt encrypted events but don't update the database. - if (room.encrypted && room.client.encryptionEnabled) { - for (var i = 0; i < newEvents.length; i++) { - if (newEvents[i].type == EventTypes.Encrypted) { - newEvents[i] = await room.client.encryption!.decryptRoomEvent( - newEvents[i], - ); - } - } - } - - // update chunk anchors - if (type == EventUpdateType.history) { - chunk.prevBatch = newPrevBatch ?? ''; - - final offset = chunk.events.length; - - chunk.events.addAll(newEvents); - - for (var i = 0; i < newEvents.length; i++) { - onInsert?.call(i + offset); - } - } else { - chunk.nextBatch = newNextBatch ?? ''; - chunk.events.insertAll(0, newEvents.reversed); - - for (var i = 0; i < newEvents.length; i++) { - onInsert?.call(i); - } - } - - if (onUpdate != null) { - onUpdate!(); - } - return resp.chunk.length; - } + bool get canRequestHistory; + bool get canRequestFuture; Timeline({ - required this.room, this.onUpdate, this.onChange, this.onInsert, this.onRemove, this.onNewEvent, - required this.chunk, - }) { - timelineSub = room.client.onTimelineEvent.stream.listen( - (event) => _handleEventUpdate( - event, - EventUpdateType.timeline, - ), - ); - historySub = room.client.onHistoryEvent.stream.listen( - (event) => _handleEventUpdate( - event, - EventUpdateType.history, - ), - ); + }); - // If the timeline is limited we want to clear our events cache - roomSub = room.client.onSync.stream - .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true) - .listen(_removeEventsNotInThisSync); + /// Searches for the event in this timeline. If not found, requests from server. + Future getEventById(String id); - sessionIdReceivedSub = - room.onSessionKeyReceived.stream.listen(_sessionKeyReceived); - cancelSendEventSub = - room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent); + /// Request more previous events + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }); - // we want to populate our aggregated events - for (final e in events) { - addAggregatedEvent(e); - } + /// Request more future events + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }); - // we are using a fragmented timeline - if (chunk.nextBatch != '') { - allowNewEvent = false; - isFragmentedTimeline = true; - // fragmented timelines never read from the database. - _fetchedAllDatabaseEvents = true; - } - } + /// Set the read marker to an event in this timeline + Future setReadMarker({String? eventId, bool? public}); - void _cleanUpCancelledEvent(String eventId) { - final i = _findEvent(event_id: eventId); - if (i < events.length) { - removeAggregatedEvent(events[i]); - events.removeAt(i); - onRemove?.call(i); - onUpdate?.call(); - } - } - - /// Removes all entries from [events] which are not in this SyncUpdate. - void _removeEventsNotInThisSync(SyncUpdate sync) { - final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? []; - final keepEventIds = newSyncEvents.map((e) => e.eventId); - events.removeWhere((e) => !keepEventIds.contains(e.eventId)); - } - - /// Don't forget to call this before you dismiss this object! - void cancelSubscriptions() { - // ignore: discarded_futures - timelineSub?.cancel(); - // ignore: discarded_futures - historySub?.cancel(); - // ignore: discarded_futures - roomSub?.cancel(); - // ignore: discarded_futures - sessionIdReceivedSub?.cancel(); - // ignore: discarded_futures - cancelSendEventSub?.cancel(); - } - - void _sessionKeyReceived(String sessionId) async { - var decryptAtLeastOneEvent = false; - Future decryptFn() async { - final encryption = room.client.encryption; - if (!room.client.encryptionEnabled || encryption == null) { - return; - } - for (var i = 0; i < events.length; i++) { - if (events[i].type == EventTypes.Encrypted && - events[i].messageType == MessageTypes.BadEncrypted && - events[i].content['session_id'] == sessionId) { - events[i] = await encryption.decryptRoomEvent( - events[i], - store: true, - updateType: EventUpdateType.history, - ); - addAggregatedEvent(events[i]); - onChange?.call(i); - if (events[i].type != EventTypes.Encrypted) { - decryptAtLeastOneEvent = true; - } - } - } - } - - await room.client.database.transaction(decryptFn); - if (decryptAtLeastOneEvent) onUpdate?.call(); - } - - /// Request the keys for undecryptable events of this timeline + /// Request keys for undecryptable events void requestKeys({ bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true, - }) { - for (final event in events) { - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - final sessionId = event.content.tryGet('session_id'); - final senderKey = event.content.tryGet('sender_key'); - if (sessionId != null && senderKey != null) { - room.client.encryption?.keyManager.maybeAutoRequest( - room.id, - sessionId, - senderKey, - tryOnlineBackup: tryOnlineBackup, - onlineKeyBackupOnly: onlineKeyBackupOnly, - ); - } - } - } - } + }); - /// Set the read marker to the last synced event in this timeline. - Future setReadMarker({String? eventId, bool? public}) async { - eventId ??= - events.firstWhereOrNull((event) => event.status.isSynced)?.eventId; - if (eventId == null) return; - return room.setReadMarker(eventId, mRead: eventId, public: public); - } - - int _findEvent({String? event_id, String? unsigned_txid}) { - // we want to find any existing event where either the passed event_id or the passed unsigned_txid - // matches either the event_id or transaction_id of the existing event. - // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match. - // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair, - // thus meaning we found our element. - final searchNeedle = {}; - if (event_id != null) { - searchNeedle.add(event_id); - } - if (unsigned_txid != null) { - searchNeedle.add(unsigned_txid); - } - int i; - for (i = 0; i < events.length; i++) { - final searchHaystack = {events[i].eventId}; - - final txnid = events[i].transactionId; - if (txnid != null) { - searchHaystack.add(txnid); - } - if (searchNeedle.intersection(searchHaystack).isNotEmpty) { - break; - } - } - return i; - } - - void _removeEventFromSet(Set eventSet, Event event) { - eventSet.removeWhere( - (e) => - e.matchesEventOrTransactionId(event.eventId) || - event.unsigned != null && - e.matchesEventOrTransactionId(event.transactionId), - ); - } + /// Search events in this timeline + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + @Deprecated('Use [prevBatch] instead.') String? sinceEventId, + int? limit, + bool Function(Event)? searchFunc, + }); + /// Add an event to the aggregation tree void addAggregatedEvent(Event event) { - // we want to add an event to the aggregation tree final relationshipType = event.relationshipType; final relationshipEventId = event.relationshipEventId; if (relationshipType == null || relationshipEventId == null) { - return; // nothing to do + return; } final e = (aggregatedEvents[relationshipEventId] ??= >{})[relationshipType] ??= {}; - // remove a potential old event _removeEventFromSet(e, event); - // add the new one e.add(event); if (onChange != null) { final index = _findEvent(event_id: relationshipEventId); @@ -513,6 +110,7 @@ class Timeline { } } + /// Remove an event from aggregation void removeAggregatedEvent(Event event) { aggregatedEvents.remove(event.eventId); if (event.transactionId != null) { @@ -525,91 +123,38 @@ class Timeline { } } - void _handleEventUpdate( - Event event, - EventUpdateType type, { - bool update = true, - }) { - try { - if (event.roomId != room.id) return; - - if (type != EventUpdateType.timeline && type != EventUpdateType.history) { - return; - } - - if (type == EventUpdateType.timeline) { - onNewEvent?.call(); - } - - if (!allowNewEvent) return; - - final status = event.status; - - final i = _findEvent( - event_id: event.eventId, - unsigned_txid: event.transactionId, - ); - - if (i < events.length) { - // if the old status is larger than the new one, we also want to preserve the old status - final oldStatus = events[i].status; - events[i] = event; - // do we preserve the status? we should allow 0 -> -1 updates and status increases - if ((latestEventStatus(status, oldStatus) == oldStatus) && - !(status.isError && oldStatus.isSending)) { - events[i].status = oldStatus; - } - addAggregatedEvent(events[i]); - onChange?.call(i); - } else { - if (type == EventUpdateType.history && - events.indexWhere( - (e) => e.eventId == event.eventId, - ) != - -1) { - return; - } - var index = events.length; - if (type == EventUpdateType.history) { - events.add(event); - } else { - index = events.firstIndexWhereNotError; - events.insert(index, event); - } - onInsert?.call(index); - - addAggregatedEvent(event); - } - - // Handle redaction events - if (event.type == EventTypes.Redaction) { - final index = _findEvent(event_id: event.redacts); - if (index < events.length) { - removeAggregatedEvent(events[index]); - - // Is the redacted event a reaction? Then update the event this - // belongs to: - if (onChange != null) { - final relationshipEventId = events[index].relationshipEventId; - if (relationshipEventId != null) { - onChange?.call(_findEvent(event_id: relationshipEventId)); - return; - } - } - - events[index].setRedactionEvent(event); - onChange?.call(index); - } - } - - if (update && !_collectHistoryUpdates) { - onUpdate?.call(); - } - } catch (e, s) { - Logs().w('Handle event update failed', e, s); + /// Find event index by event ID or transaction ID + int _findEvent({String? event_id, String? unsigned_txid}) { + final searchNeedle = {}; + if (event_id != null) searchNeedle.add(event_id); + if (unsigned_txid != null) searchNeedle.add(unsigned_txid); + + int i; + for (i = 0; i < events.length; i++) { + final searchHaystack = {events[i].eventId}; + final txnid = events[i].transactionId; + if (txnid != null) searchHaystack.add(txnid); + if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; } + return i; } + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); + } + + /// Handle event updates (to be implemented by subclasses) + void _handleEventUpdate(Event event, EventUpdateType type, {bool update = true}); + + /// Cancel all subscriptions + void cancelSubscriptions(); + @Deprecated('Use [startSearch] instead.') Stream> searchEvent({ String? searchTerm, @@ -623,114 +168,8 @@ class Timeline { searchTerm: searchTerm, requestHistoryCount: requestHistoryCount, maxHistoryRequests: maxHistoryRequests, - // ignore: deprecated_member_use_from_same_package sinceEventId: sinceEventId, limit: limit, searchFunc: searchFunc, ).map((result) => result.$1); - - /// Searches [searchTerm] in this timeline. It first searches in the - /// cache, then in the database and then on the server. The search can - /// take a while, which is why this returns a stream so the already found - /// events can already be displayed. - /// Override the [searchFunc] if you need another search. This will then - /// ignore [searchTerm]. - /// Returns the List of Events and the next prevBatch at the end of the - /// search. - Stream<(List, String?)> startSearch({ - String? searchTerm, - int requestHistoryCount = 100, - int maxHistoryRequests = 10, - String? prevBatch, - @Deprecated('Use [prevBatch] instead.') String? sinceEventId, - int? limit, - bool Function(Event)? searchFunc, - }) async* { - assert(searchTerm != null || searchFunc != null); - searchFunc ??= (event) => - event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? ''); - final found = []; - - if (sinceEventId == null) { - // Search locally - for (final event in events) { - if (searchFunc(event)) { - yield (found..add(event), null); - } - } - - // Search in database - var start = events.length; - while (true) { - final eventsFromStore = await room.client.database.getEventList( - room, - start: start, - limit: requestHistoryCount, - ); - if (eventsFromStore.isEmpty) break; - start += eventsFromStore.length; - for (final event in eventsFromStore) { - if (searchFunc(event)) { - yield (found..add(event), null); - } - } - } - } - - // Search on the server - prevBatch ??= room.prev_batch; - if (sinceEventId != null) { - prevBatch = - (await room.client.getEventContext(room.id, sinceEventId)).end; - } - final encryption = room.client.encryption; - for (var i = 0; i < maxHistoryRequests; i++) { - if (prevBatch == null) break; - if (limit != null && found.length >= limit) break; - try { - final resp = await room.client.getRoomEvents( - room.id, - Direction.b, - from: prevBatch, - limit: requestHistoryCount, - filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), - ); - for (final matrixEvent in resp.chunk) { - var event = Event.fromMatrixEvent(matrixEvent, room); - if (event.type == EventTypes.Encrypted && encryption != null) { - event = await encryption.decryptRoomEvent(event); - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - // Await requestKey() here to ensure decrypted message bodies - await event.requestKey(); - } - } - if (searchFunc(event)) { - yield (found..add(event), resp.end); - if (limit != null && found.length >= limit) break; - } - } - prevBatch = resp.end; - // We are at the beginning of the room - if (resp.chunk.length < requestHistoryCount) break; - } on MatrixException catch (e) { - // We have no permission anymore to request the history - if (e.error == MatrixError.M_FORBIDDEN) { - break; - } - rethrow; - } - } - return; - } -} - -extension on List { - int get firstIndexWhereNotError { - if (isEmpty) return 0; - final index = indexWhere((event) => !event.status.isError); - if (index == -1) return length; - return index; - } -} +} \ No newline at end of file From 4ca2a5eee350a33371aec23326a94e4150d9c9df Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:03:17 +0500 Subject: [PATCH 03/18] implement Thread.fromJson --- lib/src/room.dart | 3 +- lib/src/thread.dart | 37 +++++++++---------- lib/src/thread_timeline.dart | 69 ++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 19 deletions(-) create mode 100644 lib/src/thread_timeline.dart diff --git a/lib/src/room.dart b/lib/src/room.dart index 76d72a40..a6d1f83a 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -26,6 +26,7 @@ import 'package:html_unescape/html_unescape.dart'; import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; +import 'package:matrix/src/room_timeline.dart'; import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:matrix/src/utils/file_send_request_credentials.dart'; import 'package:matrix/src/utils/markdown.dart'; @@ -1690,7 +1691,7 @@ class Room { } } - final timeline = Timeline( + final timeline = RoomTimeline( room: this, chunk: chunk, onChange: onChange, diff --git a/lib/src/thread.dart b/lib/src/thread.dart index aa707a8e..eb7842ea 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -1,27 +1,28 @@ -import 'dart:async'; -import 'dart:convert'; -import 'dart:math'; - -import 'package:async/async.dart'; -import 'package:collection/collection.dart'; -import 'package:html_unescape/html_unescape.dart'; - import 'package:matrix/matrix.dart'; -import 'package:matrix/src/models/timeline_chunk.dart'; -import 'package:matrix/src/utils/cached_stream_controller.dart'; -import 'package:matrix/src/utils/file_send_request_credentials.dart'; -import 'package:matrix/src/utils/markdown.dart'; -import 'package:matrix/src/utils/marked_unread.dart'; -import 'package:matrix/src/utils/space_child.dart'; class Thread { final Room room; final MatrixEvent rootEvent; + final MatrixEvent? lastEvent; Thread({ - required Room this.room, - required MatrixEvent this.rootEvent - }) { + required this.room, + required this.rootEvent, + this.lastEvent, + }); + factory Thread.fromJson(Map json, Client client) { + final room = client.getRoomById(json['room_id']); + if (room == null) throw Error(); + MatrixEvent? lastEvent; + if (json['unsigned']?['m.relations']?['m.thread']?['latest_event'] != null) { + lastEvent = MatrixEvent.fromJson(json['unsigned']?['m.relations']?['m.thread']?['latest_event']); + } + final thread = Thread( + room: room, + rootEvent: MatrixEvent.fromJson(json), + lastEvent: lastEvent + ); + return thread; } -} \ No newline at end of file +} diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart new file mode 100644 index 00000000..032057b3 --- /dev/null +++ b/lib/src/thread_timeline.dart @@ -0,0 +1,69 @@ +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; +import 'package:matrix/src/thread.dart'; + +class ThreadTimeline extends Timeline { + final Thread thread; + + @override + List get events => chunk.events; + + TimelineChunk chunk; + + ThreadTimeline({ + required this.thread, + required this.chunk + }) { + + } + + + @override + // TODO: implement canRequestFuture + bool get canRequestFuture => throw UnimplementedError(); + + @override + // TODO: implement canRequestHistory + bool get canRequestHistory => throw UnimplementedError(); + + @override + void cancelSubscriptions() { + // TODO: implement cancelSubscriptions + } + + @override + Future getEventById(String id) { + // TODO: implement getEventById + throw UnimplementedError(); + } + + @override + Future requestFuture({int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { + // TODO: implement requestFuture + throw UnimplementedError(); + } + + @override + Future requestHistory({int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { + // TODO: implement requestHistory + throw UnimplementedError(); + } + + @override + void requestKeys({bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true}) { + // TODO: implement requestKeys + } + + @override + Future setReadMarker({String? eventId, bool? public}) { + // TODO: implement setReadMarker + throw UnimplementedError(); + } + + @override + Stream<(List, String?)> startSearch({String? searchTerm, int requestHistoryCount = 100, int maxHistoryRequests = 10, String? prevBatch, String? sinceEventId, int? limit, bool Function(Event p1)? searchFunc}) { + // TODO: implement startSearch + throw UnimplementedError(); + } + +} \ No newline at end of file From a64492e2d88f1b55b1402c7959bf91b120d7845d Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:04:30 +0500 Subject: [PATCH 04/18] chore: add trailing comma --- lib/src/thread.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/thread.dart b/lib/src/thread.dart index eb7842ea..d4193eaf 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -21,7 +21,7 @@ class Thread { final thread = Thread( room: room, rootEvent: MatrixEvent.fromJson(json), - lastEvent: lastEvent + lastEvent: lastEvent, ); return thread; } From dab7b8b5be17caf6d9f192128549d5efccdcd47c Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:06:37 +0500 Subject: [PATCH 05/18] use Event instead of MatrixEvent --- lib/src/thread.dart | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/src/thread.dart b/lib/src/thread.dart index d4193eaf..02bfd98e 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -2,8 +2,8 @@ import 'package:matrix/matrix.dart'; class Thread { final Room room; - final MatrixEvent rootEvent; - final MatrixEvent? lastEvent; + final Event rootEvent; + final Event? lastEvent; Thread({ required this.room, @@ -14,13 +14,13 @@ class Thread { factory Thread.fromJson(Map json, Client client) { final room = client.getRoomById(json['room_id']); if (room == null) throw Error(); - MatrixEvent? lastEvent; + Event? lastEvent; if (json['unsigned']?['m.relations']?['m.thread']?['latest_event'] != null) { - lastEvent = MatrixEvent.fromJson(json['unsigned']?['m.relations']?['m.thread']?['latest_event']); + lastEvent = MatrixEvent.fromJson(json['unsigned']?['m.relations']?['m.thread']?['latest_event']) as Event; } final thread = Thread( room: room, - rootEvent: MatrixEvent.fromJson(json), + rootEvent: MatrixEvent.fromJson(json) as Event, lastEvent: lastEvent, ); return thread; From c1f42cf8818b60762050b6140070db1fcd47de2d Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:12:05 +0500 Subject: [PATCH 06/18] add _refreshLastEvent to Thread use Event.fromMatrixEvent instead of casting --- lib/src/thread.dart | 63 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 02bfd98e..e9fe0798 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -3,11 +3,13 @@ import 'package:matrix/matrix.dart'; class Thread { final Room room; final Event rootEvent; - final Event? lastEvent; + Event? lastEvent; + final Client client; Thread({ required this.room, required this.rootEvent, + required this.client, this.lastEvent, }); @@ -15,14 +17,63 @@ class Thread { final room = client.getRoomById(json['room_id']); if (room == null) throw Error(); Event? lastEvent; - if (json['unsigned']?['m.relations']?['m.thread']?['latest_event'] != null) { - lastEvent = MatrixEvent.fromJson(json['unsigned']?['m.relations']?['m.thread']?['latest_event']) as Event; + if (json['unsigned']?['m.relations']?['m.thread']?['latest_event'] != + null) { + lastEvent = Event.fromMatrixEvent( + MatrixEvent.fromJson( + json['unsigned']?['m.relations']?['m.thread']?['latest_event']), + room, + ); } final thread = Thread( - room: room, - rootEvent: MatrixEvent.fromJson(json) as Event, - lastEvent: lastEvent, + room: room, + client: client, + rootEvent: Event.fromMatrixEvent( + MatrixEvent.fromJson(json), + room, + ), + lastEvent: lastEvent, ); return thread; } + + Future? _refreshingLastEvent; + + Future _refreshLastEvent({ + timeout = const Duration(seconds: 30), + }) async { + if (room.membership != Membership.join) return null; + + final result = await client + .getRelatingEventsWithRelType( + room.id, + rootEvent.eventId, + 'm.thread', + ) + .timeout(timeout); + final matrixEvent = result.chunk.firstOrNull; + if (matrixEvent == null) { + if (lastEvent?.type == EventTypes.refreshingLastEvent) { + lastEvent = null; + } + Logs().d( + 'No last event found for thread ${rootEvent.eventId} in ${rootEvent.roomId}', + ); + return null; + } + var event = Event.fromMatrixEvent( + matrixEvent, + room, + status: EventStatus.synced, + ); + if (event.type == EventTypes.Encrypted) { + final encryption = client.encryption; + if (encryption != null) { + event = await encryption.decryptRoomEvent(event); + } + } + lastEvent = event; + + return event; + } } From d5a743622f8529a2af7c69304bdf859e91919662 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:14:23 +0500 Subject: [PATCH 07/18] add trailing comma --- lib/src/thread.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/src/thread.dart b/lib/src/thread.dart index e9fe0798..fc68bc48 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -21,7 +21,8 @@ class Thread { null) { lastEvent = Event.fromMatrixEvent( MatrixEvent.fromJson( - json['unsigned']?['m.relations']?['m.thread']?['latest_event']), + json['unsigned']?['m.relations']?['m.thread']?['latest_event'], + ), room, ); } From 784d4e401039ce47087e8c44cf9654a34d98b7c0 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:20:12 +0500 Subject: [PATCH 08/18] implement Thread.sendTextEvent --- lib/src/thread.dart | 48 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/lib/src/thread.dart b/lib/src/thread.dart index fc68bc48..441c3f0f 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -77,4 +77,52 @@ class Thread { return event; } + + /// When was the last event received. + DateTime get latestEventReceivedTime { + final lastEventTime = lastEvent?.originServerTs; + if (lastEventTime != null) return lastEventTime; + + if (room.membership == Membership.invite) return DateTime.now(); + + return rootEvent.originServerTs; + } + + bool get hasNewMessages { + // TODO: Implement this + return false; + } + + Future sendTextEvent( + String message, { + String? txid, + Event? inReplyTo, + String? editEventId, + bool parseMarkdown = true, + bool parseCommands = true, + String msgtype = MessageTypes.Text, + StringBuffer? commandStdout, + bool addMentions = true, + + /// Displays an event in the timeline with the transaction ID as the event + /// ID and a status of SENDING, SENT or ERROR until it gets replaced by + /// the sync event. Using this can display a different sort order of events + /// as the sync event does replace but not relocate the pending event. + bool displayPendingEvent = true, + }) { + return room.sendTextEvent( + message, + txid: txid, + inReplyTo: inReplyTo, + editEventId: editEventId, + parseCommands: parseCommands, + parseMarkdown: parseMarkdown, + msgtype: msgtype, + commandStdout: commandStdout, + addMentions: addMentions, + displayPendingEvent: displayPendingEvent, + threadLastEventId: lastEvent?.eventId, + threadRootEventId: rootEvent.eventId, + ); + } } From 5e1783eaa23ecfcb1be8e4cc032467f0eb96e7d5 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:23:49 +0500 Subject: [PATCH 09/18] implement sendLocation and sendFileEvent in Thread --- lib/src/thread.dart | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 441c3f0f..1e09437f 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -125,4 +125,47 @@ class Thread { threadRootEventId: rootEvent.eventId, ); } + + Future sendLocation(String body, String geoUri, {String? txid}) { + final event = { + 'msgtype': 'm.location', + 'body': body, + 'geo_uri': geoUri, + }; + return room.sendEvent( + event, + txid: txid, + threadLastEventId: lastEvent?.eventId, + threadRootEventId: rootEvent.eventId, + ); + } + + Future sendFileEvent( + MatrixFile file, { + String? txid, + Event? inReplyTo, + String? editEventId, + int? shrinkImageMaxDimension, + MatrixImageFile? thumbnail, + Map? extraContent, + + /// Displays an event in the timeline with the transaction ID as the event + /// ID and a status of SENDING, SENT or ERROR until it gets replaced by + /// the sync event. Using this can display a different sort order of events + /// as the sync event does replace but not relocate the pending event. + bool displayPendingEvent = true, + }) async { + return await room.sendFileEvent( + file, + txid: txid, + inReplyTo: inReplyTo, + editEventId: editEventId, + shrinkImageMaxDimension: shrinkImageMaxDimension, + thumbnail: thumbnail, + extraContent: extraContent, + displayPendingEvent: displayPendingEvent, + threadLastEventId: lastEvent?.eventId, + threadRootEventId: rootEvent.eventId, + ); + } } From c4df17241658308179c54cf4da82cc217686435a Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 19:25:34 +0500 Subject: [PATCH 10/18] fix warnings in room_timeline --- lib/src/room_timeline.dart | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index 46cf9551..6c55e0f0 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -25,7 +25,6 @@ import 'package:matrix/src/models/timeline_chunk.dart'; /// Represents the main timeline of a room. class RoomTimeline extends Timeline { - @override final Room room; @override List get events => chunk.events; @@ -429,17 +428,6 @@ class RoomTimeline extends Timeline { return i; } - /// Remove event from set based on event or transaction ID - void _removeEventFromSet(Set eventSet, Event event) { - eventSet.removeWhere( - (e) => - e.matchesEventOrTransactionId(event.eventId) || - event.unsigned != null && - e.matchesEventOrTransactionId(event.transactionId), - ); - } - - @override void _handleEventUpdate( Event event, EventUpdateType type, { From 889477e07c1822d2ce6f917013aa8913a1fbf808 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Tue, 21 Oct 2025 21:31:42 +0500 Subject: [PATCH 11/18] I forgot what I changed sorry --- lib/src/room_timeline.dart | 45 ++++++++-- lib/src/thread.dart | 2 + lib/src/thread_timeline.dart | 168 +++++++++++++++++++++++++++++++++-- lib/src/timeline.dart | 70 +++------------ 4 files changed, 209 insertions(+), 76 deletions(-) diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index 6c55e0f0..da9d5312 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -598,13 +598,44 @@ class RoomTimeline extends Timeline { } return; } -} -extension on List { - int get firstIndexWhereNotError { - if (isEmpty) return 0; - final index = indexWhere((event) => !event.status.isError); - if (index == -1) return length; - return index; + /// Add an event to the aggregation tree + void addAggregatedEvent(Event event) { + final relationshipType = event.relationshipType; + final relationshipEventId = event.relationshipEventId; + if (relationshipType == null || relationshipEventId == null) { + return; + } + final e = (aggregatedEvents[relationshipEventId] ??= + >{})[relationshipType] ??= {}; + _removeEventFromSet(e, event); + e.add(event); + if (onChange != null) { + final index = _findEvent(event_id: relationshipEventId); + onChange?.call(index); + } + } + + /// Remove an event from aggregation + void removeAggregatedEvent(Event event) { + aggregatedEvents.remove(event.eventId); + if (event.transactionId != null) { + aggregatedEvents.remove(event.transactionId); + } + for (final types in aggregatedEvents.values) { + for (final e in types.values) { + _removeEventFromSet(e, event); + } + } + } + + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); } } \ No newline at end of file diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 1e09437f..0e9b243b 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -4,12 +4,14 @@ class Thread { final Room room; final Event rootEvent; Event? lastEvent; + String? prev_batch; final Client client; Thread({ required this.room, required this.rootEvent, required this.client, + this.prev_batch, this.lastEvent, }); diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index 032057b3..b480aad9 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -1,22 +1,163 @@ +import 'dart:async'; + import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; import 'package:matrix/src/thread.dart'; +// ThreadTimeline: hey RoomTimeline can i copy your homework? +// RoomTimeline: sure just don't make it too obvious +// ThreadTimeline: + class ThreadTimeline extends Timeline { final Thread thread; - + @override List get events => chunk.events; TimelineChunk chunk; + StreamSubscription? timelineSub; + StreamSubscription? historySub; + StreamSubscription? roomSub; + StreamSubscription? sessionIdReceivedSub; + StreamSubscription? cancelSendEventSub; + ThreadTimeline({ required this.thread, - required this.chunk + required this.chunk, + super.onUpdate, + super.onChange, + super.onInsert, + super.onRemove, + super.onNewEvent, }) { - + final room = thread.room; + timelineSub = room.client.onTimelineEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.timeline), + ); + historySub = room.client.onHistoryEvent.stream.listen( + (event) => _handleEventUpdate(event, EventUpdateType.history), + ); } + void _handleEventUpdate( + Event event, + EventUpdateType type, { + bool update = true, + }) { + try { + if (event.roomId != thread.room.id) return; + // Ignore events outside of this thread + if (event.relationshipType != RelationshipTypes.thread || + event.relationshipEventId != thread.rootEvent.eventId) return; + + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { + return; + } + + if (type == EventUpdateType.timeline) { + onNewEvent?.call(); + } + + if (type == EventUpdateType.history && + events.indexWhere((e) => e.eventId == event.eventId) != -1) { + return; + } + var index = events.length; + if (type == EventUpdateType.history) { + events.add(event); + } else { + index = events.firstIndexWhereNotError; + events.insert(index, event); + } + onInsert?.call(index); + + addAggregatedEvent(event); + + // Handle redaction events + if (event.type == EventTypes.Redaction) { + final index = _findEvent(event_id: event.redacts); + if (index < events.length) { + removeAggregatedEvent(events[index]); + + // Is the redacted event a reaction? Then update the event this + // belongs to: + if (onChange != null) { + final relationshipEventId = events[index].relationshipEventId; + if (relationshipEventId != null) { + onChange?.call(_findEvent(event_id: relationshipEventId)); + return; + } + } + + events[index].setRedactionEvent(event); + onChange?.call(index); + } + } + + if (update) { + onUpdate?.call(); + } + } catch (e, s) { + Logs().w('Handle event update failed', e, s); + } + } + + /// Add an event to the aggregation tree + void addAggregatedEvent(Event event) { + final relationshipType = event.relationshipType; + final relationshipEventId = event.relationshipEventId; + if (relationshipType == null || relationshipEventId == null) { + return; + } + final e = (aggregatedEvents[relationshipEventId] ??= + >{})[relationshipType] ??= {}; + _removeEventFromSet(e, event); + e.add(event); + if (onChange != null) { + final index = _findEvent(event_id: relationshipEventId); + onChange?.call(index); + } + } + + /// Remove an event from aggregation + void removeAggregatedEvent(Event event) { + aggregatedEvents.remove(event.eventId); + if (event.transactionId != null) { + aggregatedEvents.remove(event.transactionId); + } + for (final types in aggregatedEvents.values) { + for (final e in types.values) { + _removeEventFromSet(e, event); + } + } + } + + /// Remove event from set based on event or transaction ID + void _removeEventFromSet(Set eventSet, Event event) { + eventSet.removeWhere( + (e) => + e.matchesEventOrTransactionId(event.eventId) || + event.unsigned != null && + e.matchesEventOrTransactionId(event.transactionId), + ); + } + + /// Find event index by event ID or transaction ID + int _findEvent({String? event_id, String? unsigned_txid}) { + final searchNeedle = {}; + if (event_id != null) searchNeedle.add(event_id); + if (unsigned_txid != null) searchNeedle.add(unsigned_txid); + + int i; + for (i = 0; i < events.length; i++) { + final searchHaystack = {events[i].eventId}; + final txnid = events[i].transactionId; + if (txnid != null) searchHaystack.add(txnid); + if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; + } + return i; + } @override // TODO: implement canRequestFuture @@ -38,19 +179,22 @@ class ThreadTimeline extends Timeline { } @override - Future requestFuture({int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { + Future requestFuture( + {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { // TODO: implement requestFuture throw UnimplementedError(); } @override - Future requestHistory({int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { + Future requestHistory( + {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { // TODO: implement requestHistory throw UnimplementedError(); } @override - void requestKeys({bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true}) { + void requestKeys( + {bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true}) { // TODO: implement requestKeys } @@ -61,9 +205,15 @@ class ThreadTimeline extends Timeline { } @override - Stream<(List, String?)> startSearch({String? searchTerm, int requestHistoryCount = 100, int maxHistoryRequests = 10, String? prevBatch, String? sinceEventId, int? limit, bool Function(Event p1)? searchFunc}) { + Stream<(List, String?)> startSearch( + {String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + String? sinceEventId, + int? limit, + bool Function(Event p1)? searchFunc}) { // TODO: implement startSearch throw UnimplementedError(); } - -} \ No newline at end of file +} diff --git a/lib/src/timeline.dart b/lib/src/timeline.dart index 3decd3cb..4219da7b 100644 --- a/lib/src/timeline.dart +++ b/lib/src/timeline.dart @@ -17,11 +17,7 @@ */ import 'dart:async'; -import 'dart:convert'; - -import 'package:collection/collection.dart'; import 'package:matrix/matrix.dart'; -import 'package:matrix/src/models/timeline_chunk.dart'; /// Abstract base class for all timeline implementations. /// Provides common functionality for event management, aggregation, and search. @@ -93,62 +89,6 @@ abstract class Timeline { bool Function(Event)? searchFunc, }); - /// Add an event to the aggregation tree - void addAggregatedEvent(Event event) { - final relationshipType = event.relationshipType; - final relationshipEventId = event.relationshipEventId; - if (relationshipType == null || relationshipEventId == null) { - return; - } - final e = (aggregatedEvents[relationshipEventId] ??= - >{})[relationshipType] ??= {}; - _removeEventFromSet(e, event); - e.add(event); - if (onChange != null) { - final index = _findEvent(event_id: relationshipEventId); - onChange?.call(index); - } - } - - /// Remove an event from aggregation - void removeAggregatedEvent(Event event) { - aggregatedEvents.remove(event.eventId); - if (event.transactionId != null) { - aggregatedEvents.remove(event.transactionId); - } - for (final types in aggregatedEvents.values) { - for (final e in types.values) { - _removeEventFromSet(e, event); - } - } - } - - /// Find event index by event ID or transaction ID - int _findEvent({String? event_id, String? unsigned_txid}) { - final searchNeedle = {}; - if (event_id != null) searchNeedle.add(event_id); - if (unsigned_txid != null) searchNeedle.add(unsigned_txid); - - int i; - for (i = 0; i < events.length; i++) { - final searchHaystack = {events[i].eventId}; - final txnid = events[i].transactionId; - if (txnid != null) searchHaystack.add(txnid); - if (searchNeedle.intersection(searchHaystack).isNotEmpty) break; - } - return i; - } - - /// Remove event from set based on event or transaction ID - void _removeEventFromSet(Set eventSet, Event event) { - eventSet.removeWhere( - (e) => - e.matchesEventOrTransactionId(event.eventId) || - event.unsigned != null && - e.matchesEventOrTransactionId(event.transactionId), - ); - } - /// Handle event updates (to be implemented by subclasses) void _handleEventUpdate(Event event, EventUpdateType type, {bool update = true}); @@ -172,4 +112,14 @@ abstract class Timeline { limit: limit, searchFunc: searchFunc, ).map((result) => result.$1); +} + +// TODO: make up a better name +extension TimelineExtension on List { + int get firstIndexWhereNotError { + if (isEmpty) return 0; + final index = indexWhere((event) => !event.status.isError); + if (index == -1) return length; + return index; + } } \ No newline at end of file From 0fb0a6c47fd56b45b8d60ada84bca8ecd991dce7 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Wed, 22 Oct 2025 18:13:59 +0500 Subject: [PATCH 12/18] working on threads --- lib/matrix.dart | 3 + .../timeline_export.dart | 4 +- lib/src/database/database_api.dart | 27 ++ lib/src/database/matrix_sdk_database.dart | 107 +++++++ lib/src/room.dart | 2 +- lib/src/room_timeline.dart | 3 + lib/src/thread.dart | 186 +++++++++++++ lib/src/thread_timeline.dart | 262 ++++++++++++++++-- 8 files changed, 571 insertions(+), 23 deletions(-) diff --git a/lib/matrix.dart b/lib/matrix.dart index 8ad93add..461d7fb3 100644 --- a/lib/matrix.dart +++ b/lib/matrix.dart @@ -49,7 +49,10 @@ export 'src/voip/utils/famedly_call_extension.dart'; export 'src/voip/utils/types.dart'; export 'src/voip/utils/wrapped_media_stream.dart'; export 'src/room.dart'; +export 'src/thread.dart'; export 'src/timeline.dart'; +export 'src/room_timeline.dart'; +export 'src/thread_timeline.dart'; export 'src/user.dart'; export 'src/utils/cached_profile_information.dart'; export 'src/utils/commands_extension.dart'; diff --git a/lib/msc_extensions/extension_timeline_export/timeline_export.dart b/lib/msc_extensions/extension_timeline_export/timeline_export.dart index 47d7751f..6f829258 100644 --- a/lib/msc_extensions/extension_timeline_export/timeline_export.dart +++ b/lib/msc_extensions/extension_timeline_export/timeline_export.dart @@ -2,9 +2,9 @@ import 'dart:convert'; import 'package:matrix/matrix_api_lite.dart'; import 'package:matrix/src/event.dart'; -import 'package:matrix/src/timeline.dart'; +import 'package:matrix/src/room_timeline.dart'; -extension TimelineExportExtension on Timeline { +extension TimelineExportExtension on RoomTimeline { /// Exports timeline events from a Matrix room within a specified date range. /// /// The export process provides progress updates through the returned stream with the following information: diff --git a/lib/src/database/database_api.dart b/lib/src/database/database_api.dart index 644ecfff..1f18a071 100644 --- a/lib/src/database/database_api.dart +++ b/lib/src/database/database_api.dart @@ -59,6 +59,17 @@ abstract class DatabaseApi { Future> getRoomList(Client client); + Future> getThreadList(String roomId, Client client); + + Future storeThread( + String roomId, + Event threadRootEvent, + Event? lastEvent, + bool currentUserParticipated, + int count, + Client client, + ); + Future getSingleRoom( Client client, String roomId, { @@ -78,6 +89,8 @@ abstract class DatabaseApi { Future deleteTimelineForRoom(String roomId); + Future deleteTimelineForThread(String roomId, String threadRootEventId); + /// Stores an EventUpdate object in the database. Must be called inside of /// [transaction]. Future storeEventUpdate( @@ -115,6 +128,13 @@ abstract class DatabaseApi { int? limit, }); + Future> getThreadEventList( + Thread thread, { + int start = 0, + bool onlySending = false, + int? limit, + }); + Future> getEventIdList( Room room, { int start = 0, @@ -265,6 +285,13 @@ abstract class DatabaseApi { Future removeEvent(String eventId, String roomId); + Future setThreadPrevBatch( + String? prevBatch, + String roomId, + String threadRootEventId, + Client client, + ); + Future setRoomPrevBatch( String? prevBatch, String roomId, diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart index fc2ddeaf..f2ae5740 100644 --- a/lib/src/database/matrix_sdk_database.dart +++ b/lib/src/database/matrix_sdk_database.dart @@ -59,6 +59,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { late Box _clientBox; late Box _accountDataBox; late Box _roomsBox; + late Box _threadsBox; late Box _toDeviceQueueBox; /// Key is a tuple as TupleKey(roomId, type, stateKey) where stateKey can be @@ -122,6 +123,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { static const String _roomsBoxName = 'box_rooms'; + static const String _threadsBoxName = 'box_threads'; + static const String _toDeviceQueueBoxName = 'box_to_device_queue'; static const String _preloadRoomStateBoxName = 'box_preload_room_states'; @@ -218,6 +221,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _clientBoxName, _accountDataBoxName, _roomsBoxName, + _threadsBoxName, _toDeviceQueueBoxName, _preloadRoomStateBoxName, _nonPreloadRoomStateBoxName, @@ -252,6 +256,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _roomsBox = _collection.openBox( _roomsBoxName, ); + _threadsBox = _collection.openBox(_threadsBoxName); _preloadRoomStateBox = _collection.openBox( _preloadRoomStateBoxName, ); @@ -357,6 +362,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _clientBox.clearQuickAccessCache(); _accountDataBox.clearQuickAccessCache(); _roomsBox.clearQuickAccessCache(); + _threadsBox.clearQuickAccessCache(); _preloadRoomStateBox.clearQuickAccessCache(); _nonPreloadRoomStateBox.clearQuickAccessCache(); _roomMembersBox.clearQuickAccessCache(); @@ -383,6 +389,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { @override Future clearCache() => transaction(() async { await _roomsBox.clear(); + await _threadsBox.clear(); await _accountDataBox.clear(); await _roomAccountDataBox.clear(); await _preloadRoomStateBox.clear(); @@ -532,6 +539,46 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return await _getEventsByIds(eventIds.cast(), room); }); + @override + Future> getThreadEventList( + Thread thread, { + int start = 0, + bool onlySending = false, + int? limit, + }) => + runBenchmarked>('Get event list', () async { + // Get the synced event IDs from the store + final timelineKey = + TupleKey(thread.room.id, '', thread.rootEvent.eventId).toString(); + final timelineEventIds = + (await _timelineFragmentsBox.get(timelineKey) ?? []); + + // Get the local stored SENDING events from the store + late final List sendingEventIds; + if (start != 0) { + sendingEventIds = []; + } else { + final sendingTimelineKey = + TupleKey(thread.room.id, 'SENDING', thread.rootEvent.eventId) + .toString(); + sendingEventIds = + (await _timelineFragmentsBox.get(sendingTimelineKey) ?? []); + } + + // Combine those two lists while respecting the start and limit parameters. + final end = min( + timelineEventIds.length, + start + (limit ?? timelineEventIds.length), + ); + final eventIds = [ + ...sendingEventIds, + if (!onlySending && start < timelineEventIds.length) + ...timelineEventIds.getRange(start, end), + ]; + + return await _getEventsByIds(eventIds.cast(), thread.room); + }); + @override Future getInboundGroupSession( String roomId, @@ -1053,6 +1100,22 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return; } + @override + Future setThreadPrevBatch( + String? prevBatch, + String roomId, + String threadRootEventId, + Client client, + ) async { + final raw = + await _threadsBox.get(TupleKey(roomId, threadRootEventId).toString()); + if (raw == null) return; + final thread = Thread.fromJson(copyMap(raw), client); + thread.prev_batch = prevBatch; + await _threadsBox.put(roomId, thread.toJson()); + return; + } + @override Future setVerifiedUserCrossSigningKey( bool verified, @@ -1302,6 +1365,43 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return; } + @override + Future> getThreadList(String roomId, Client client) async { + final allThreadsKeys = await _threadsBox.getAllKeys(); + final threadsKeys = {}; + // TERRIBLE implementation. Better to create another box (String[roomId]->List[event ids]) + for (final key in allThreadsKeys) { + if (key.startsWith(roomId)) threadsKeys.add(key); + } + final threads = {}; + + + + return threads.toList(); + } + + @override + Future storeThread( + String roomId, + Event threadRootEvent, + Event? lastEvent, + bool currentUserParticipated, + int count, + Client client, + ) async { + final key = TupleKey(roomId, threadRootEvent.eventId).toString(); + // final currentRawThread = await _threadsBox.get(key); + await _threadsBox.put( + key, + Thread( + room: Room(id: roomId, client: client), + rootEvent: threadRootEvent, + client: client, + currentUserParticipated: currentUserParticipated, + count: count, + ).toJson()); + } + @override Future storeRoomUpdate( String roomId, @@ -1314,6 +1414,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { await forgetRoom(roomId); return; } + final membership = roomUpdate is LeftRoomUpdate ? Membership.leave : roomUpdate is InvitedRoomUpdate @@ -1376,6 +1477,12 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { Future deleteTimelineForRoom(String roomId) => _timelineFragmentsBox.delete(TupleKey(roomId, '').toString()); + @override + Future deleteTimelineForThread( + String roomId, String threadRootEventId) => + _timelineFragmentsBox + .delete(TupleKey(roomId, '', threadRootEventId).toString()); + @override Future storeSSSSCache( String type, diff --git a/lib/src/room.dart b/lib/src/room.dart index a6d1f83a..b69f549c 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -1649,7 +1649,7 @@ class Room { /// [onChange], [onRemove], [onInsert] and the [onHistoryReceived] callbacks. /// This method can also retrieve the timeline at a specific point by setting /// the [eventContextId] - Future getTimeline({ + Future getTimeline({ void Function(int index)? onChange, void Function(int index)? onRemove, void Function(int insertID)? onInsert, diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index da9d5312..2818d808 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -436,6 +436,9 @@ class RoomTimeline extends Timeline { try { if (event.roomId != room.id) return; + // This will be handled by ThreadTimeline + if (event.relationshipType == RelationshipTypes.thread) return; + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { return; } diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 0e9b243b..74c2a3c1 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -1,20 +1,37 @@ import 'package:matrix/matrix.dart'; +import 'package:matrix/matrix_api_lite/generated/internal.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; class Thread { final Room room; final Event rootEvent; Event? lastEvent; String? prev_batch; + bool currentUserParticipated; + int count; final Client client; Thread({ required this.room, required this.rootEvent, required this.client, + required this.currentUserParticipated, + required this.count, this.prev_batch, this.lastEvent, }); + Map toJson() => { + ...rootEvent.toJson(), + 'unsigned': { + 'm.thread': { + 'latest_event': lastEvent?.toJson(), + 'count': count, + 'current_user_participated': currentUserParticipated, + }, + }, + }; + factory Thread.fromJson(Map json, Client client) { final room = client.getRoomById(json['room_id']); if (room == null) throw Error(); @@ -36,6 +53,9 @@ class Thread { room, ), lastEvent: lastEvent, + count: json['unsigned']?['m.relations']?['m.thread']?['count'], + currentUserParticipated: json['unsigned']?['m.relations']?['m.thread'] + ?['current_user_participated'], ); return thread; } @@ -95,6 +115,120 @@ class Thread { return false; } + Future getEventContext(String eventId) async { + // TODO: probably find events with relationship + final resp = await client.getEventContext( + room.id, eventId, + limit: Room.defaultHistoryCount, + // filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), + ); + + final events = [ + if (resp.eventsAfter != null) ...resp.eventsAfter!.reversed, + if (resp.event != null) resp.event!, + if (resp.eventsBefore != null) ...resp.eventsBefore!, + ].map((e) => Event.fromMatrixEvent(e, room)).toList(); + + // Try again to decrypt encrypted events but don't update the database. + if (room.encrypted && client.encryptionEnabled) { + for (var i = 0; i < events.length; i++) { + if (events[i].type == EventTypes.Encrypted && + events[i].content['can_request_session'] == true) { + events[i] = await client.encryption!.decryptRoomEvent(events[i]); + } + } + } + + final chunk = TimelineChunk( + nextBatch: resp.end ?? '', + prevBatch: resp.start ?? '', + events: events, + ); + + return chunk; + } + + Future getTimeline({ + void Function(int index)? onChange, + void Function(int index)? onRemove, + void Function(int insertID)? onInsert, + void Function()? onNewEvent, + void Function()? onUpdate, + String? eventContextId, + int? limit = Room.defaultHistoryCount, + }) async { + // await postLoad(); + + var events = []; + + await client.database.transaction(() async { + events = await client.database.getThreadEventList( + this, + limit: limit, + ); + }); + + var chunk = TimelineChunk(events: events); + // Load the timeline arround eventContextId if set + if (eventContextId != null) { + if (!events.any((Event event) => event.eventId == eventContextId)) { + chunk = + await getEventContext(eventContextId) ?? TimelineChunk(events: []); + } + } + + final timeline = ThreadTimeline( + thread: this, + chunk: chunk, + onChange: onChange, + onRemove: onRemove, + onInsert: onInsert, + onNewEvent: onNewEvent, + onUpdate: onUpdate, + ); + + // Fetch all users from database we have got here. + if (eventContextId == null) { + final userIds = events.map((event) => event.senderId).toSet(); + for (final userId in userIds) { + if (room.getState(EventTypes.RoomMember, userId) != null) continue; + final dbUser = await client.database.getUser(userId, room); + if (dbUser != null) room.setState(dbUser); + } + } + + // Try again to decrypt encrypted events and update the database. + if (room.encrypted && client.encryptionEnabled) { + // decrypt messages + for (var i = 0; i < chunk.events.length; i++) { + if (chunk.events[i].type == EventTypes.Encrypted) { + if (eventContextId != null) { + // for the fragmented timeline, we don't cache the decrypted + //message in the database + chunk.events[i] = await client.encryption!.decryptRoomEvent( + chunk.events[i], + ); + } else { + // else, we need the database + await client.database.transaction(() async { + for (var i = 0; i < chunk.events.length; i++) { + if (chunk.events[i].content['can_request_session'] == true) { + chunk.events[i] = await client.encryption!.decryptRoomEvent( + chunk.events[i], + store: !room.isArchived, + updateType: EventUpdateType.history, + ); + } + } + }); + } + } + } + } + + return timeline; + } + Future sendTextEvent( String message, { String? txid, @@ -170,4 +304,56 @@ class Thread { threadRootEventId: rootEvent.eventId, ); } + + Future setReadMarker({String? eventId, bool? public}) async { + if (eventId == null) return null; + return await client.postReceipt( + room.id, + (public ?? client.receiptsPublicByDefault) + ? ReceiptType.mRead + : ReceiptType.mReadPrivate, + eventId, + threadId: rootEvent.eventId, + ); + } + + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + void Function()? onHistoryReceived, + direction = Direction.b, + StateFilter? filter, + }) async { + final prev_batch = this.prev_batch; + + final storeInDatabase = !room.isArchived; + + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + if (prev_batch == null) { + throw 'Tried to request history without a prev_batch token'; + } + + final resp = await client.getRelatingEventsWithRelType( + room.id, + rootEvent.eventId, + RelationshipTypes.thread, + from: prev_batch, + limit: historyCount, + dir: direction, + recurse: true, + ); + + if (onHistoryReceived != null) onHistoryReceived(); + + await client.database.transaction(() async { + if (storeInDatabase && direction == Direction.b) { + this.prev_batch = resp.prevBatch; + await client.database.setThreadPrevBatch(resp.prevBatch, room.id, rootEvent.eventId, client); + } + }); + + return resp.chunk.length; + } } diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index b480aad9..8501dc2a 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; @@ -22,6 +23,17 @@ class ThreadTimeline extends Timeline { StreamSubscription? sessionIdReceivedSub; StreamSubscription? cancelSendEventSub; + bool isRequestingHistory = false; + bool isFragmentedTimeline = false; + + final Map _eventCache = {}; + + bool _fetchedAllDatabaseEvents = false; + + bool allowNewEvent = true; + + bool _collectHistoryUpdates = false; + ThreadTimeline({ required this.thread, required this.chunk, @@ -59,20 +71,38 @@ class ThreadTimeline extends Timeline { onNewEvent?.call(); } - if (type == EventUpdateType.history && - events.indexWhere((e) => e.eventId == event.eventId) != -1) { - return; - } - var index = events.length; - if (type == EventUpdateType.history) { - events.add(event); + final status = event.status; + final i = _findEvent( + event_id: event.eventId, + unsigned_txid: event.transactionId, + ); + if (i < events.length) { + // if the old status is larger than the new one, we also want to preserve the old status + final oldStatus = events[i].status; + events[i] = event; + // do we preserve the status? we should allow 0 -> -1 updates and status increases + if ((latestEventStatus(status, oldStatus) == oldStatus) && + !(status.isError && oldStatus.isSending)) { + events[i].status = oldStatus; + } + addAggregatedEvent(events[i]); + onChange?.call(i); } else { - index = events.firstIndexWhereNotError; - events.insert(index, event); - } - onInsert?.call(index); + if (type == EventUpdateType.history && + events.indexWhere((e) => e.eventId == event.eventId) != -1) { + return; + } + var index = events.length; + if (type == EventUpdateType.history) { + events.add(event); + } else { + index = events.firstIndexWhereNotError; + events.insert(index, event); + } + onInsert?.call(index); - addAggregatedEvent(event); + addAggregatedEvent(event); + } // Handle redaction events if (event.type == EventTypes.Redaction) { @@ -103,6 +133,184 @@ class ThreadTimeline extends Timeline { } } + /// Request more previous events from the server. + Future getThreadEvents({ + int historyCount = Room.defaultHistoryCount, + direction = Direction.b, + StateFilter? filter, + }) async { + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + final resp = await thread.client.getRelatingEventsWithRelType( + thread.room.id, + thread.rootEvent.eventId, + RelationshipTypes.thread, + dir: direction, + from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, + limit: historyCount, + ); + + if (resp.nextBatch == null) { + Logs().w('We reached the end of the timeline'); + } + + final newNextBatch = direction == Direction.b ? resp.prevBatch : resp.nextBatch; + final newPrevBatch = direction == Direction.b ? resp.nextBatch : resp.prevBatch; + + final type = direction == Direction.b + ? EventUpdateType.history + : EventUpdateType.timeline; + + // I dont know what this piece of code does + // if ((resp.state?.length ?? 0) == 0 && + // resp.start != resp.end && + // newPrevBatch != null && + // newNextBatch != null) { + // if (type == EventUpdateType.history) { + // Logs().w( + // '[nav] we can still request history prevBatch: $type $newPrevBatch', + // ); + // } else { + // Logs().w( + // '[nav] we can still request timeline nextBatch: $type $newNextBatch', + // ); + // } + // } + + final newEvents = + resp.chunk.map((e) => Event.fromMatrixEvent(e, thread.room)).toList(); + + if (!allowNewEvent) { + if (resp.prevBatch == resp.nextBatch || + (resp.nextBatch == null && direction == Direction.f)) { + allowNewEvent = true; + } + + if (allowNewEvent) { + Logs().d('We now allow sync update into the timeline.'); + newEvents.addAll( + await thread.client.database.getThreadEventList(thread, onlySending: true), + ); + } + } + + // Try to decrypt encrypted events but don't update the database. + if (thread.room.encrypted && thread.client.encryptionEnabled) { + for (var i = 0; i < newEvents.length; i++) { + if (newEvents[i].type == EventTypes.Encrypted) { + newEvents[i] = await thread.client.encryption!.decryptRoomEvent( + newEvents[i], + ); + } + } + } + + // update chunk anchors + if (type == EventUpdateType.history) { + chunk.prevBatch = newPrevBatch ?? ''; + + final offset = chunk.events.length; + + chunk.events.addAll(newEvents); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i + offset); + } + } else { + chunk.nextBatch = newNextBatch ?? ''; + chunk.events.insertAll(0, newEvents.reversed); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i); + } + } + + if (onUpdate != null) { + onUpdate!(); + } + return resp.chunk.length; + } + + Future _requestEvents({ + int historyCount = Room.defaultHistoryCount, + required Direction direction, + StateFilter? filter, + }) async { + onUpdate?.call(); + + try { + // Look up for events in the database first. With fragmented view, we should delete the database cache + final eventsFromStore = isFragmentedTimeline + ? null + : await thread.client.database.getThreadEventList( + thread, + start: events.length, + limit: historyCount, + ); + + if (eventsFromStore != null && eventsFromStore.isNotEmpty) { + for (final e in eventsFromStore) { + addAggregatedEvent(e); + } + // Fetch all users from database we have got here. + for (final event in events) { + if (thread.room.getState(EventTypes.RoomMember, event.senderId) != null) { + continue; + } + final dbUser = + await thread.client.database.getUser(event.senderId, thread.room); + if (dbUser != null) thread.room.setState(dbUser); + } + + if (direction == Direction.b) { + events.addAll(eventsFromStore); + final startIndex = events.length - eventsFromStore.length; + final endIndex = events.length; + for (var i = startIndex; i < endIndex; i++) { + onInsert?.call(i); + } + } else { + events.insertAll(0, eventsFromStore); + final startIndex = eventsFromStore.length; + final endIndex = 0; + for (var i = startIndex; i > endIndex; i--) { + onInsert?.call(i); + } + } + } else { + _fetchedAllDatabaseEvents = true; + Logs().i('No more events found in the store. Request from server...'); + + if (isFragmentedTimeline) { + await getThreadEvents( + historyCount: historyCount, + direction: direction, + filter: filter, + ); + } else { + if (thread.prev_batch == null) { + Logs().i('No more events to request from server...'); + } else { + await thread.requestHistory( + historyCount: historyCount, + direction: direction, + onHistoryReceived: () { + _collectHistoryUpdates = true; + }, + filter: filter, + ); + } + } + } + } finally { + _collectHistoryUpdates = false; + isRequestingHistory = false; + onUpdate?.call(); + } + } + /// Add an event to the aggregation tree void addAggregatedEvent(Event event) { final relationshipType = event.relationshipType; @@ -173,9 +381,15 @@ class ThreadTimeline extends Timeline { } @override - Future getEventById(String id) { - // TODO: implement getEventById - throw UnimplementedError(); + Future getEventById(String id) async { + for (final event in events) { + if (event.eventId == id) return event; + } + if (_eventCache.containsKey(id)) return _eventCache[id]; + final requestedEvent = await thread.room.getEventById(id); + if (requestedEvent == null) return null; + _eventCache[id] = requestedEvent; + return _eventCache[id]; } @override @@ -187,9 +401,15 @@ class ThreadTimeline extends Timeline { @override Future requestHistory( - {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { - // TODO: implement requestHistory - throw UnimplementedError(); + {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) async { + if (isRequestingHistory) return; + isRequestingHistory = true; + await _requestEvents( + direction: Direction.b, + historyCount: historyCount, + filter: filter, + ); + isRequestingHistory = false; } @override @@ -200,8 +420,10 @@ class ThreadTimeline extends Timeline { @override Future setReadMarker({String? eventId, bool? public}) { - // TODO: implement setReadMarker - throw UnimplementedError(); + return thread.setReadMarker( + eventId: eventId, + public: public, + ); } @override From 74c39f91feb34c2537b079178d3571c18c6e487b Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Fri, 24 Oct 2025 21:21:20 +0500 Subject: [PATCH 13/18] did something --- lib/src/database/database_api.dart | 2 + lib/src/database/matrix_sdk_database.dart | 46 +++++++++++----- lib/src/room.dart | 52 ++++++++++++++++++ lib/src/room_timeline.dart | 15 +++--- lib/src/thread.dart | 4 +- lib/src/thread_timeline.dart | 66 +++++++++++++++-------- 6 files changed, 142 insertions(+), 43 deletions(-) diff --git a/lib/src/database/database_api.dart b/lib/src/database/database_api.dart index 1f18a071..509cf55c 100644 --- a/lib/src/database/database_api.dart +++ b/lib/src/database/database_api.dart @@ -61,6 +61,8 @@ abstract class DatabaseApi { Future> getThreadList(String roomId, Client client); + Future getThread(String roomId, String threadRootEventId, Client client); + Future storeThread( String roomId, Event threadRootEvent, diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart index f2ae5740..295abf42 100644 --- a/lib/src/database/matrix_sdk_database.dart +++ b/lib/src/database/matrix_sdk_database.dart @@ -1368,18 +1368,34 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { @override Future> getThreadList(String roomId, Client client) async { final allThreadsKeys = await _threadsBox.getAllKeys(); - final threadsKeys = {}; - // TERRIBLE implementation. Better to create another box (String[roomId]->List[event ids]) - for (final key in allThreadsKeys) { - if (key.startsWith(roomId)) threadsKeys.add(key); - } final threads = {}; - + // TERRIBLE implementation. Better to create another box (String[roomId]->List[event ids]) + for (final key in allThreadsKeys) { + if (key.startsWith('$roomId|')) { + final thread = await getThread(roomId, key.split('|')[1], client); + if (thread != null) { + threads.add(thread); + } + } + } return threads.toList(); } + @override + Future getThread( + String roomId, + String threadRootEventId, + Client client, + ) async { + final key = TupleKey(roomId, threadRootEventId).toString(); + final thread = await _threadsBox.get(key); + if (thread == null) return null; + Logs().w(thread.toString()); + return Thread.fromJson(thread.cast(), client); + } + @override Future storeThread( String roomId, @@ -1392,14 +1408,16 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { final key = TupleKey(roomId, threadRootEvent.eventId).toString(); // final currentRawThread = await _threadsBox.get(key); await _threadsBox.put( - key, - Thread( - room: Room(id: roomId, client: client), - rootEvent: threadRootEvent, - client: client, - currentUserParticipated: currentUserParticipated, - count: count, - ).toJson()); + key, + Thread( + room: Room(id: roomId, client: client), + rootEvent: threadRootEvent, + lastEvent: lastEvent, + client: client, + currentUserParticipated: currentUserParticipated, + count: count, + ).toJson(), + ); } @override diff --git a/lib/src/room.dart b/lib/src/room.dart index b69f549c..275e18b8 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -130,9 +130,52 @@ class Room { for (final state in allStates) { setState(state); } + + await _loadThreadsFromServer(); + partial = false; } + Future _loadThreadsFromServer() async { + try { + final response = await client.getThreadRoots(id); + + for (final threadEvent in response.chunk) { + final event = Event.fromMatrixEvent(threadEvent, this); + // Store thread in database + await client.database.storeThread( + id, + event, + event, // lastEvent + false, // currentUserParticipated + 1, // count + client, + ); + } + } catch (e) { + Logs().w('Failed to load threads from server', e); + } + } + + Future handleThreadSync(Event event) async { + // This should be called from the client's sync handling + // when a thread-related event is received + + if (event.relationshipType == RelationshipTypes.thread && event.relationshipEventId != null) { + // Update thread metadata in database + final root = await getEventById(event.relationshipEventId!); + if (root == null) return; + await client.database.storeThread( + id, + root, + event, // update last event + event.senderId == client.userID, // currentUserParticipated + 1, // increment count - should be calculated properly + client, + ); + } + } + /// Returns the [Event] for the given [typeKey] and optional [stateKey]. /// If no [stateKey] is provided, it defaults to an empty string. /// This returns either a `StrippedStateEvent` for rooms with membership @@ -174,6 +217,15 @@ class Room { client.onRoomState.add((roomId: id, state: state)); } + Future> getThreads() async { + final dict = {}; + final list = await client.database.getThreadList(id, client); + for (final thread in list) { + dict[thread.rootEvent.eventId] = thread; + } + return dict; + } + /// ID of the fully read marker event. String get fullyRead => roomAccountData['m.fully_read']?.content.tryGet('event_id') ?? ''; diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index 2818d808..3229ae4c 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -36,7 +36,7 @@ class RoomTimeline extends Timeline { StreamSubscription? roomSub; StreamSubscription? sessionIdReceivedSub; StreamSubscription? cancelSendEventSub; - + bool isRequestingHistory = false; bool isRequestingFuture = false; bool allowNewEvent = true; @@ -417,7 +417,7 @@ class RoomTimeline extends Timeline { final searchNeedle = {}; if (event_id != null) searchNeedle.add(event_id); if (unsigned_txid != null) searchNeedle.add(unsigned_txid); - + int i; for (i = 0; i < events.length; i++) { final searchHaystack = {events[i].eventId}; @@ -436,13 +436,16 @@ class RoomTimeline extends Timeline { try { if (event.roomId != room.id) return; - // This will be handled by ThreadTimeline - if (event.relationshipType == RelationshipTypes.thread) return; - if (type != EventUpdateType.timeline && type != EventUpdateType.history) { return; } + // Skip thread events in main timeline - THEY SHOULD ONLY APPEAR IN THREAD TIMELINES + if (event.relationshipType == RelationshipTypes.thread && + event.relationshipEventId != null) { + unawaited(room.handleThreadSync(event)); + } + if (type == EventUpdateType.timeline) { onNewEvent?.call(); } @@ -641,4 +644,4 @@ class RoomTimeline extends Timeline { e.matchesEventOrTransactionId(event.transactionId), ); } -} \ No newline at end of file +} diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 74c2a3c1..75f776a8 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -7,8 +7,8 @@ class Thread { final Event rootEvent; Event? lastEvent; String? prev_batch; - bool currentUserParticipated; - int count; + bool? currentUserParticipated; + int? count; final Client client; Thread({ diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index 8501dc2a..0d8e8e8e 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -33,6 +33,8 @@ class ThreadTimeline extends Timeline { bool allowNewEvent = true; bool _collectHistoryUpdates = false; + + bool isRequestingFuture = false; ThreadTimeline({ required this.thread, @@ -367,14 +369,6 @@ class ThreadTimeline extends Timeline { return i; } - @override - // TODO: implement canRequestFuture - bool get canRequestFuture => throw UnimplementedError(); - - @override - // TODO: implement canRequestHistory - bool get canRequestHistory => throw UnimplementedError(); - @override void cancelSubscriptions() { // TODO: implement cancelSubscriptions @@ -392,13 +386,6 @@ class ThreadTimeline extends Timeline { return _eventCache[id]; } - @override - Future requestFuture( - {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { - // TODO: implement requestFuture - throw UnimplementedError(); - } - @override Future requestHistory( {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) async { @@ -412,12 +399,6 @@ class ThreadTimeline extends Timeline { isRequestingHistory = false; } - @override - void requestKeys( - {bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true}) { - // TODO: implement requestKeys - } - @override Future setReadMarker({String? eventId, bool? public}) { return thread.setReadMarker( @@ -438,4 +419,47 @@ class ThreadTimeline extends Timeline { // TODO: implement startSearch throw UnimplementedError(); } + + @override +bool get canRequestFuture => chunk.nextBatch != null && chunk.nextBatch!.isNotEmpty; + +@override +bool get canRequestHistory => chunk.prevBatch != null && chunk.prevBatch!.isNotEmpty; + +@override +Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, +}) async { + if (isRequestingFuture || !canRequestFuture) return; + isRequestingFuture = true; + + try { + await getThreadEvents( + historyCount: historyCount, + direction: Direction.f, + filter: filter, + ); + } finally { + isRequestingFuture = false; + } +} + +@override +void requestKeys({ + bool tryOnlineBackup = true, + bool onlineKeyBackupOnly = true, +}) { + for (final event in events) { + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + final sessionId = event.content.tryGet('session_id'); + final senderKey = event.content.tryGet('sender_key'); + if (sessionId != null && senderKey != null) { + thread.room.requestSessionKey(sessionId, senderKey); + } + } + } +} } From 98031bbb3dc16c82777364d2f869806b3eefca42 Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Sat, 25 Oct 2025 18:43:31 +0500 Subject: [PATCH 14/18] working hard --- lib/src/database/matrix_sdk_database.dart | 1 - lib/src/room.dart | 18 +++- lib/src/room_timeline.dart | 4 + lib/src/thread.dart | 15 ++- lib/src/thread_timeline.dart | 126 +++++++++++++--------- lib/src/timeline.dart | 4 + 6 files changed, 111 insertions(+), 57 deletions(-) diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart index 295abf42..317f8b75 100644 --- a/lib/src/database/matrix_sdk_database.dart +++ b/lib/src/database/matrix_sdk_database.dart @@ -1392,7 +1392,6 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { final key = TupleKey(roomId, threadRootEventId).toString(); final thread = await _threadsBox.get(key); if (thread == null) return null; - Logs().w(thread.toString()); return Thread.fromJson(thread.cast(), client); } diff --git a/lib/src/room.dart b/lib/src/room.dart index 275e18b8..e6b8dbaa 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -136,6 +136,8 @@ class Room { partial = false; } + Map threads = {}; + Future _loadThreadsFromServer() async { try { final response = await client.getThreadRoots(id); @@ -151,6 +153,7 @@ class Room { 1, // count client, ); + threads[event.eventId] = (await client.database.getThread(id, event.eventId, client))!; } } catch (e) { Logs().w('Failed to load threads from server', e); @@ -161,7 +164,8 @@ class Room { // This should be called from the client's sync handling // when a thread-related event is received - if (event.relationshipType == RelationshipTypes.thread && event.relationshipEventId != null) { + if (event.relationshipType == RelationshipTypes.thread && + event.relationshipEventId != null) { // Update thread metadata in database final root = await getEventById(event.relationshipEventId!); if (root == null) return; @@ -226,6 +230,18 @@ class Room { return dict; } + Future getThread(Event rootEvent) async { + final threads = await getThreads(); + if (threads.containsKey(rootEvent.eventId)) return threads[rootEvent.eventId]!; + return Thread( + room: this, + rootEvent: rootEvent, + client: client, + currentUserParticipated: false, + count: 0, + ); + } + /// ID of the fully read marker event. String get fullyRead => roomAccountData['m.fully_read']?.content.tryGet('event_id') ?? ''; diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index 3229ae4c..5afd5015 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -37,9 +37,13 @@ class RoomTimeline extends Timeline { StreamSubscription? sessionIdReceivedSub; StreamSubscription? cancelSendEventSub; + @override bool isRequestingHistory = false; + @override bool isRequestingFuture = false; + @override bool allowNewEvent = true; + @override bool isFragmentedTimeline = false; final Map _eventCache = {}; diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 75f776a8..7e99270b 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -45,6 +45,16 @@ class Thread { room, ); } + if (json['unsigned']?['m.thread']?['latest_event'] != null) { + lastEvent = Event.fromMatrixEvent( + MatrixEvent.fromJson( + json['unsigned']?['m.thread']?['latest_event'], + ), + room, + ); + } + // Although I was making this part according to specification, it's a bit off + // I have no clue why final thread = Thread( room: room, client: client, @@ -334,7 +344,7 @@ class Thread { if (prev_batch == null) { throw 'Tried to request history without a prev_batch token'; } - + final resp = await client.getRelatingEventsWithRelType( room.id, rootEvent.eventId, @@ -350,7 +360,8 @@ class Thread { await client.database.transaction(() async { if (storeInDatabase && direction == Direction.b) { this.prev_batch = resp.prevBatch; - await client.database.setThreadPrevBatch(resp.prevBatch, room.id, rootEvent.eventId, client); + await client.database.setThreadPrevBatch( + resp.prevBatch, room.id, rootEvent.eventId, client); } }); diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index 0d8e8e8e..4aa4fc08 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:convert'; import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; @@ -23,17 +22,18 @@ class ThreadTimeline extends Timeline { StreamSubscription? sessionIdReceivedSub; StreamSubscription? cancelSendEventSub; + @override bool isRequestingHistory = false; + + @override bool isFragmentedTimeline = false; final Map _eventCache = {}; - - bool _fetchedAllDatabaseEvents = false; - + + @override bool allowNewEvent = true; - - bool _collectHistoryUpdates = false; - + + @override bool isRequestingFuture = false; ThreadTimeline({ @@ -52,6 +52,17 @@ class ThreadTimeline extends Timeline { historySub = room.client.onHistoryEvent.stream.listen( (event) => _handleEventUpdate(event, EventUpdateType.history), ); + + // we want to populate our aggregated events + for (final e in events) { + addAggregatedEvent(e); + } + + // we are using a fragmented timeline + if (chunk.nextBatch != '') { + allowNewEvent = false; + isFragmentedTimeline = true; + } } void _handleEventUpdate( @@ -63,7 +74,9 @@ class ThreadTimeline extends Timeline { if (event.roomId != thread.room.id) return; // Ignore events outside of this thread if (event.relationshipType != RelationshipTypes.thread || - event.relationshipEventId != thread.rootEvent.eventId) return; + event.relationshipEventId != thread.rootEvent.eventId) { + return; + } if (type != EventUpdateType.timeline && type != EventUpdateType.history) { return; @@ -152,14 +165,21 @@ class ThreadTimeline extends Timeline { dir: direction, from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, limit: historyCount, + recurse: true, + ); + + Logs().w( + 'Loading thread events from server ${resp.chunk.length} ${resp.prevBatch}', ); if (resp.nextBatch == null) { Logs().w('We reached the end of the timeline'); } - final newNextBatch = direction == Direction.b ? resp.prevBatch : resp.nextBatch; - final newPrevBatch = direction == Direction.b ? resp.nextBatch : resp.prevBatch; + final newNextBatch = + direction == Direction.b ? resp.prevBatch : resp.nextBatch; + final newPrevBatch = + direction == Direction.b ? resp.nextBatch : resp.prevBatch; final type = direction == Direction.b ? EventUpdateType.history @@ -193,7 +213,8 @@ class ThreadTimeline extends Timeline { if (allowNewEvent) { Logs().d('We now allow sync update into the timeline.'); newEvents.addAll( - await thread.client.database.getThreadEventList(thread, onlySending: true), + await thread.client.database + .getThreadEventList(thread, onlySending: true), ); } } @@ -258,7 +279,8 @@ class ThreadTimeline extends Timeline { } // Fetch all users from database we have got here. for (final event in events) { - if (thread.room.getState(EventTypes.RoomMember, event.senderId) != null) { + if (thread.room.getState(EventTypes.RoomMember, event.senderId) != + null) { continue; } final dbUser = @@ -282,7 +304,6 @@ class ThreadTimeline extends Timeline { } } } else { - _fetchedAllDatabaseEvents = true; Logs().i('No more events found in the store. Request from server...'); if (isFragmentedTimeline) { @@ -298,16 +319,13 @@ class ThreadTimeline extends Timeline { await thread.requestHistory( historyCount: historyCount, direction: direction, - onHistoryReceived: () { - _collectHistoryUpdates = true; - }, + onHistoryReceived: () {}, filter: filter, ); } } } } finally { - _collectHistoryUpdates = false; isRequestingHistory = false; onUpdate?.call(); } @@ -387,8 +405,10 @@ class ThreadTimeline extends Timeline { } @override - Future requestHistory( - {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) async { + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { if (isRequestingHistory) return; isRequestingHistory = true; await _requestEvents( @@ -421,45 +441,45 @@ class ThreadTimeline extends Timeline { } @override -bool get canRequestFuture => chunk.nextBatch != null && chunk.nextBatch!.isNotEmpty; + bool get canRequestFuture => chunk.nextBatch.isNotEmpty; -@override -bool get canRequestHistory => chunk.prevBatch != null && chunk.prevBatch!.isNotEmpty; + @override + bool get canRequestHistory => chunk.prevBatch.isNotEmpty; -@override -Future requestFuture({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, -}) async { - if (isRequestingFuture || !canRequestFuture) return; - isRequestingFuture = true; - - try { - await getThreadEvents( - historyCount: historyCount, - direction: Direction.f, - filter: filter, - ); - } finally { - isRequestingFuture = false; + @override + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (isRequestingFuture || !canRequestFuture) return; + isRequestingFuture = true; + + try { + await getThreadEvents( + historyCount: historyCount, + direction: Direction.f, + filter: filter, + ); + } finally { + isRequestingFuture = false; + } } -} -@override -void requestKeys({ - bool tryOnlineBackup = true, - bool onlineKeyBackupOnly = true, -}) { - for (final event in events) { - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - final sessionId = event.content.tryGet('session_id'); - final senderKey = event.content.tryGet('sender_key'); - if (sessionId != null && senderKey != null) { - thread.room.requestSessionKey(sessionId, senderKey); + @override + void requestKeys({ + bool tryOnlineBackup = true, + bool onlineKeyBackupOnly = true, + }) { + for (final event in events) { + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + final sessionId = event.content.tryGet('session_id'); + final senderKey = event.content.tryGet('sender_key'); + if (sessionId != null && senderKey != null) { + thread.room.requestSessionKey(sessionId, senderKey); + } } } } } -} diff --git a/lib/src/timeline.dart b/lib/src/timeline.dart index 4219da7b..11e50670 100644 --- a/lib/src/timeline.dart +++ b/lib/src/timeline.dart @@ -45,6 +45,10 @@ abstract class Timeline { bool get canRequestHistory; bool get canRequestFuture; + bool get allowNewEvent; + bool get isRequestingFuture; + bool get isRequestingHistory; + bool get isFragmentedTimeline; Timeline({ this.onUpdate, From 6b5ecb21d4fe6c5721f745764a17145bfb0d501f Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Sat, 25 Oct 2025 21:53:33 +0500 Subject: [PATCH 15/18] load more threads when requesting history --- lib/src/room.dart | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/lib/src/room.dart b/lib/src/room.dart index e6b8dbaa..5b5e1c8b 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -137,10 +137,14 @@ class Room { } Map threads = {}; + String? getThreadRootsBatch; + bool loadedAllThreads = false; Future _loadThreadsFromServer() async { try { - final response = await client.getThreadRoots(id); + if (loadedAllThreads) return; + final response = + await client.getThreadRoots(id, from: getThreadRootsBatch); for (final threadEvent in response.chunk) { final event = Event.fromMatrixEvent(threadEvent, this); @@ -153,7 +157,14 @@ class Room { 1, // count client, ); - threads[event.eventId] = (await client.database.getThread(id, event.eventId, client))!; + threads[event.eventId] = + (await client.database.getThread(id, event.eventId, client))!; + } + + if (response.nextBatch == null) { + loadedAllThreads = true; + } else { + getThreadRootsBatch = response.nextBatch; } } catch (e) { Logs().w('Failed to load threads from server', e); @@ -232,7 +243,8 @@ class Room { Future getThread(Event rootEvent) async { final threads = await getThreads(); - if (threads.containsKey(rootEvent.eventId)) return threads[rootEvent.eventId]!; + if (threads.containsKey(rootEvent.eventId)) + return threads[rootEvent.eventId]!; return Thread( room: this, rootEvent: rootEvent, @@ -1511,6 +1523,9 @@ class Room { direction = Direction.b, StateFilter? filter, }) async { + + unawaited(_loadThreadsFromServer()); + final prev_batch = this.prev_batch; final storeInDatabase = !isArchived; From 56f34ecb59808618b14dad30c3189d44021127bf Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Sun, 26 Oct 2025 15:01:18 +0500 Subject: [PATCH 16/18] implement some changes --- lib/src/client.dart | 3 +- lib/src/database/database_api.dart | 2 + lib/src/database/matrix_sdk_database.dart | 4 ++ lib/src/room.dart | 22 ++++---- lib/src/thread.dart | 64 +++++++++++++++++++++-- lib/src/thread_timeline.dart | 25 +++++---- 6 files changed, 96 insertions(+), 24 deletions(-) diff --git a/lib/src/client.dart b/lib/src/client.dart index 38a0195d..b4f6c7bc 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -2776,13 +2776,13 @@ class Client extends MatrixApi { final List receipts = []; for (final event in events) { + room.setEphemeral(event); // Receipt events are deltas between two states. We will create a // fake room account data event for this and store the difference // there. if (event.type != 'm.receipt') continue; - receipts.add(ReceiptEventContent.fromJson(event.content)); } @@ -2797,6 +2797,7 @@ class Client extends MatrixApi { type: LatestReceiptState.eventType, content: receiptStateContent.toJson(), ); + await database.storeRoomAccountData(room.id, event); room.roomAccountData[event.type] = event; } diff --git a/lib/src/database/database_api.dart b/lib/src/database/database_api.dart index 509cf55c..5a56d22f 100644 --- a/lib/src/database/database_api.dart +++ b/lib/src/database/database_api.dart @@ -68,6 +68,8 @@ abstract class DatabaseApi { Event threadRootEvent, Event? lastEvent, bool currentUserParticipated, + int? notificationCount, + int? highlightCount, int count, Client client, ); diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart index 317f8b75..3f0263c6 100644 --- a/lib/src/database/matrix_sdk_database.dart +++ b/lib/src/database/matrix_sdk_database.dart @@ -1401,6 +1401,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { Event threadRootEvent, Event? lastEvent, bool currentUserParticipated, + int? notificationCount, + int? highlightCount, int count, Client client, ) async { @@ -1415,6 +1417,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { client: client, currentUserParticipated: currentUserParticipated, count: count, + notificationCount: notificationCount ?? 0, + highlightCount: highlightCount ?? 0, ).toJson(), ); } diff --git a/lib/src/room.dart b/lib/src/room.dart index 5b5e1c8b..12b218bb 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -26,7 +26,6 @@ import 'package:html_unescape/html_unescape.dart'; import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; -import 'package:matrix/src/room_timeline.dart'; import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:matrix/src/utils/file_send_request_credentials.dart'; import 'package:matrix/src/utils/markdown.dart'; @@ -148,17 +147,18 @@ class Room { for (final threadEvent in response.chunk) { final event = Event.fromMatrixEvent(threadEvent, this); + final thread = Thread.fromJson(threadEvent.toJson(), client); // Store thread in database await client.database.storeThread( id, event, - event, // lastEvent - false, // currentUserParticipated - 1, // count + thread.lastEvent, // lastEvent + thread.currentUserParticipated ?? false, // currentUserParticipated + 0, 0, + thread.count ?? 1, // count client, ); - threads[event.eventId] = - (await client.database.getThread(id, event.eventId, client))!; + threads[event.eventId] = thread; } if (response.nextBatch == null) { @@ -180,12 +180,14 @@ class Room { // Update thread metadata in database final root = await getEventById(event.relationshipEventId!); if (root == null) return; + final thread = await client.database.getThread(id, event.relationshipEventId!, client); await client.database.storeThread( id, root, event, // update last event event.senderId == client.userID, // currentUserParticipated - 1, // increment count - should be calculated properly + (thread?.count ?? 0) + 1, // increment count - should be calculated properly + 0, 0, client, ); } @@ -243,14 +245,17 @@ class Room { Future getThread(Event rootEvent) async { final threads = await getThreads(); - if (threads.containsKey(rootEvent.eventId)) + if (threads.containsKey(rootEvent.eventId)) { return threads[rootEvent.eventId]!; + } return Thread( room: this, rootEvent: rootEvent, client: client, currentUserParticipated: false, count: 0, + highlightCount: 0, + notificationCount: 0, ); } @@ -1523,7 +1528,6 @@ class Room { direction = Direction.b, StateFilter? filter, }) async { - unawaited(_loadThreadsFromServer()); final prev_batch = this.prev_batch; diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 7e99270b..726a48c9 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -1,5 +1,6 @@ +import 'dart:async'; + import 'package:matrix/matrix.dart'; -import 'package:matrix/matrix_api_lite/generated/internal.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; class Thread { @@ -11,16 +12,28 @@ class Thread { int? count; final Client client; + /// The count of unread notifications. + int notificationCount = 0; + + /// The count of highlighted notifications. + int highlightCount = 0; + Thread({ required this.room, required this.rootEvent, required this.client, required this.currentUserParticipated, required this.count, + required this.notificationCount, + required this.highlightCount, this.prev_batch, this.lastEvent, }); + /// Returns true if this room is unread. To check if there are new messages + /// in muted rooms, use [hasNewMessages]. + bool get isUnread => notificationCount > 0; + Map toJson() => { ...rootEvent.toJson(), 'unsigned': { @@ -66,10 +79,20 @@ class Thread { count: json['unsigned']?['m.relations']?['m.thread']?['count'], currentUserParticipated: json['unsigned']?['m.relations']?['m.thread'] ?['current_user_participated'], + highlightCount: 0, + notificationCount: 0, ); return thread; } + Future refreshLastEvent({ + timeout = const Duration(seconds: 30), + }) async { + final lastEvent = _refreshingLastEvent ??= _refreshLastEvent(); + _refreshingLastEvent = null; + return lastEvent; + } + Future? _refreshingLastEvent; Future _refreshLastEvent({ @@ -121,8 +144,27 @@ class Thread { } bool get hasNewMessages { - // TODO: Implement this - return false; + final lastEvent = this.lastEvent; + + // There is no known event or the last event is only a state fallback event, + // we assume there is no new messages. + if (lastEvent == null || + !client.roomPreviewLastEvents.contains(lastEvent.type)) { + return false; + } + + // Read marker is on the last event so no new messages. + if (lastEvent.receipts + .any((receipt) => receipt.user.senderId == client.userID!)) { + return false; + } + + // If the last event is sent, we mark the room as read. + if (lastEvent.senderId == client.userID) return false; + + // Get the timestamp of read marker and compare + final readAtMilliseconds = room.receiptState.byThread[rootEvent.eventId]?.latestOwnReceipt?.ts ?? 0; + return readAtMilliseconds < lastEvent.originServerTs.millisecondsSinceEpoch; } Future getEventContext(String eventId) async { @@ -327,6 +369,22 @@ class Thread { ); } + Future setLastEvent(Event event) async { + lastEvent = event; + final thread = await client.database.getThread(room.id, rootEvent.eventId, client); + Logs().v('Set lastEvent to ${room.id}:${rootEvent.eventId} (${event.senderId})'); + await client.database.storeThread( + room.id, + rootEvent, + lastEvent, + currentUserParticipated ?? false, + notificationCount, + highlightCount, + (thread?.count ?? 0) + 1, + client, + ); + } + Future requestHistory({ int historyCount = Room.defaultHistoryCount, void Function()? onHistoryReceived, diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index 4aa4fc08..7772c3d3 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -119,6 +119,8 @@ class ThreadTimeline extends Timeline { addAggregatedEvent(event); } + unawaited(thread.setLastEvent(events[events.length - 1])); + // Handle redaction events if (event.type == EventTypes.Redaction) { final index = _findEvent(event_id: event.redacts); @@ -428,14 +430,15 @@ class ThreadTimeline extends Timeline { } @override - Stream<(List, String?)> startSearch( - {String? searchTerm, - int requestHistoryCount = 100, - int maxHistoryRequests = 10, - String? prevBatch, - String? sinceEventId, - int? limit, - bool Function(Event p1)? searchFunc}) { + Stream<(List, String?)> startSearch({ + String? searchTerm, + int requestHistoryCount = 100, + int maxHistoryRequests = 10, + String? prevBatch, + String? sinceEventId, + int? limit, + bool Function(Event p1)? searchFunc, + }) { // TODO: implement startSearch throw UnimplementedError(); } @@ -466,10 +469,10 @@ class ThreadTimeline extends Timeline { } @override - void requestKeys({ + Future requestKeys({ bool tryOnlineBackup = true, bool onlineKeyBackupOnly = true, - }) { + }) async { for (final event in events) { if (event.type == EventTypes.Encrypted && event.messageType == MessageTypes.BadEncrypted && @@ -477,7 +480,7 @@ class ThreadTimeline extends Timeline { final sessionId = event.content.tryGet('session_id'); final senderKey = event.content.tryGet('sender_key'); if (sessionId != null && senderKey != null) { - thread.room.requestSessionKey(sessionId, senderKey); + await thread.room.requestSessionKey(sessionId, senderKey); } } } From 09643a172f8699dd3f917b382582a205dbb35d6d Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Thu, 30 Oct 2025 11:50:17 +0500 Subject: [PATCH 17/18] make Room#loadThreadsFromServer public --- lib/src/room.dart | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/src/room.dart b/lib/src/room.dart index 12b218bb..040beb65 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -130,7 +130,7 @@ class Room { setState(state); } - await _loadThreadsFromServer(); + await loadThreadsFromServer(); partial = false; } @@ -139,7 +139,7 @@ class Room { String? getThreadRootsBatch; bool loadedAllThreads = false; - Future _loadThreadsFromServer() async { + Future loadThreadsFromServer() async { try { if (loadedAllThreads) return; final response = @@ -185,11 +185,12 @@ class Room { id, root, event, // update last event - event.senderId == client.userID, // currentUserParticipated + event.senderId == client.userID || (thread?.currentUserParticipated ?? false), // currentUserParticipated (thread?.count ?? 0) + 1, // increment count - should be calculated properly 0, 0, client, ); + threads[event.relationshipEventId!] = (await client.database.getThread(id, event.relationshipEventId!, client))!; } } @@ -1528,7 +1529,7 @@ class Room { direction = Direction.b, StateFilter? filter, }) async { - unawaited(_loadThreadsFromServer()); + unawaited(loadThreadsFromServer()); final prev_batch = this.prev_batch; From 92e69b5f1e8d41117b96b22507694f8fe976af8c Mon Sep 17 00:00:00 2001 From: OfficialDakari Date: Sun, 2 Nov 2025 14:11:11 +0500 Subject: [PATCH 18/18] edits and reactions in threads --- lib/src/room.dart | 6 +++--- lib/src/room_timeline.dart | 9 +++++++++ lib/src/thread.dart | 9 ++++++--- lib/src/thread_timeline.dart | 20 ++++++++++++++++---- 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/lib/src/room.dart b/lib/src/room.dart index 040beb65..1ae4c40f 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -175,8 +175,8 @@ class Room { // This should be called from the client's sync handling // when a thread-related event is received - if (event.relationshipType == RelationshipTypes.thread && - event.relationshipEventId != null) { + // if (event.relationshipType == RelationshipTypes.thread && + // event.relationshipEventId != null) { // Update thread metadata in database final root = await getEventById(event.relationshipEventId!); if (root == null) return; @@ -191,7 +191,7 @@ class Room { client, ); threads[event.relationshipEventId!] = (await client.database.getThread(id, event.relationshipEventId!, client))!; - } + //} } /// Returns the [Event] for the given [typeKey] and optional [stateKey]. diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index 5afd5015..b0306a87 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -491,6 +491,15 @@ class RoomTimeline extends Timeline { addAggregatedEvent(event); } + if (event.relationshipEventId != null && + (event.relationshipType == RelationshipTypes.edit || + event.relationshipType == RelationshipTypes.reaction || + event.relationshipType == RelationshipTypes.reference)) { + final parentEventIndex = + _findEvent(event_id: event.relationshipEventId); + unawaited(room.handleThreadSync(events[parentEventIndex])); + } + // Handle redaction events if (event.type == EventTypes.Redaction) { final index = _findEvent(event_id: event.redacts); diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 726a48c9..8552392d 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -105,6 +105,7 @@ class Thread { room.id, rootEvent.eventId, 'm.thread', + recurse: true, ) .timeout(timeout); final matrixEvent = result.chunk.firstOrNull; @@ -172,14 +173,17 @@ class Thread { final resp = await client.getEventContext( room.id, eventId, limit: Room.defaultHistoryCount, + // filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), ); + + final events = [ if (resp.eventsAfter != null) ...resp.eventsAfter!.reversed, if (resp.event != null) resp.event!, if (resp.eventsBefore != null) ...resp.eventsBefore!, - ].map((e) => Event.fromMatrixEvent(e, room)).toList(); + ].map((e) => Event.fromMatrixEvent(e, room)).where((e) => e.relationshipType == RelationshipTypes.thread && e.relationshipEventId == rootEvent.eventId).toList(); // Try again to decrypt encrypted events but don't update the database. if (room.encrypted && client.encryptionEnabled) { @@ -403,10 +407,9 @@ class Thread { throw 'Tried to request history without a prev_batch token'; } - final resp = await client.getRelatingEventsWithRelType( + final resp = await client.getRelatingEvents( room.id, rootEvent.eventId, - RelationshipTypes.thread, from: prev_batch, limit: historyCount, dir: direction, diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index 7772c3d3..7a73c1fc 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -73,11 +73,15 @@ class ThreadTimeline extends Timeline { try { if (event.roomId != thread.room.id) return; // Ignore events outside of this thread - if (event.relationshipType != RelationshipTypes.thread || + if (event.relationshipType == RelationshipTypes.thread && event.relationshipEventId != thread.rootEvent.eventId) { return; } + if (event.relationshipType == null) { + return; + } + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { return; } @@ -160,10 +164,9 @@ class ThreadTimeline extends Timeline { filter ??= StateFilter(lazyLoadMembers: true); filter.lazyLoadMembers ??= true; - final resp = await thread.client.getRelatingEventsWithRelType( + final resp = await thread.client.getRelatingEvents( thread.room.id, thread.rootEvent.eventId, - RelationshipTypes.thread, dir: direction, from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, limit: historyCount, @@ -255,6 +258,11 @@ class ThreadTimeline extends Timeline { if (onUpdate != null) { onUpdate!(); } + + for (final e in events) { + addAggregatedEvent(e); + } + return resp.chunk.length; } @@ -337,9 +345,13 @@ class ThreadTimeline extends Timeline { void addAggregatedEvent(Event event) { final relationshipType = event.relationshipType; final relationshipEventId = event.relationshipEventId; - if (relationshipType == null || relationshipEventId == null) { + if (relationshipType == null || + relationshipType == RelationshipTypes.thread || + relationshipEventId == null) { return; } + // Logs().w( + // 'Adding aggregated event ${event.type} ${event.eventId} to $relationshipEventId ($relationshipType)'); final e = (aggregatedEvents[relationshipEventId] ??= >{})[relationshipType] ??= {}; _removeEventFromSet(e, event);