diff --git a/lib/matrix.dart b/lib/matrix.dart index 8ad93add..461d7fb3 100644 --- a/lib/matrix.dart +++ b/lib/matrix.dart @@ -49,7 +49,10 @@ export 'src/voip/utils/famedly_call_extension.dart'; export 'src/voip/utils/types.dart'; export 'src/voip/utils/wrapped_media_stream.dart'; export 'src/room.dart'; +export 'src/thread.dart'; export 'src/timeline.dart'; +export 'src/room_timeline.dart'; +export 'src/thread_timeline.dart'; export 'src/user.dart'; export 'src/utils/cached_profile_information.dart'; export 'src/utils/commands_extension.dart'; diff --git a/lib/msc_extensions/extension_timeline_export/timeline_export.dart b/lib/msc_extensions/extension_timeline_export/timeline_export.dart index 47d7751f..6f829258 100644 --- a/lib/msc_extensions/extension_timeline_export/timeline_export.dart +++ b/lib/msc_extensions/extension_timeline_export/timeline_export.dart @@ -2,9 +2,9 @@ import 'dart:convert'; import 'package:matrix/matrix_api_lite.dart'; import 'package:matrix/src/event.dart'; -import 'package:matrix/src/timeline.dart'; +import 'package:matrix/src/room_timeline.dart'; -extension TimelineExportExtension on Timeline { +extension TimelineExportExtension on RoomTimeline { /// Exports timeline events from a Matrix room within a specified date range. /// /// The export process provides progress updates through the returned stream with the following information: diff --git a/lib/src/database/database_api.dart b/lib/src/database/database_api.dart index 644ecfff..1f18a071 100644 --- a/lib/src/database/database_api.dart +++ b/lib/src/database/database_api.dart @@ -59,6 +59,17 @@ abstract class DatabaseApi { Future> getRoomList(Client client); + Future> getThreadList(String roomId, Client client); + + Future storeThread( + String roomId, + Event threadRootEvent, + Event? lastEvent, + bool currentUserParticipated, + int count, + Client client, + ); + Future getSingleRoom( Client client, String roomId, { @@ -78,6 +89,8 @@ abstract class DatabaseApi { Future deleteTimelineForRoom(String roomId); + Future deleteTimelineForThread(String roomId, String threadRootEventId); + /// Stores an EventUpdate object in the database. Must be called inside of /// [transaction]. Future storeEventUpdate( @@ -115,6 +128,13 @@ abstract class DatabaseApi { int? limit, }); + Future> getThreadEventList( + Thread thread, { + int start = 0, + bool onlySending = false, + int? limit, + }); + Future> getEventIdList( Room room, { int start = 0, @@ -265,6 +285,13 @@ abstract class DatabaseApi { Future removeEvent(String eventId, String roomId); + Future setThreadPrevBatch( + String? prevBatch, + String roomId, + String threadRootEventId, + Client client, + ); + Future setRoomPrevBatch( String? prevBatch, String roomId, diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart index fc2ddeaf..f2ae5740 100644 --- a/lib/src/database/matrix_sdk_database.dart +++ b/lib/src/database/matrix_sdk_database.dart @@ -59,6 +59,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { late Box _clientBox; late Box _accountDataBox; late Box _roomsBox; + late Box _threadsBox; late Box _toDeviceQueueBox; /// Key is a tuple as TupleKey(roomId, type, stateKey) where stateKey can be @@ -122,6 +123,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { static const String _roomsBoxName = 'box_rooms'; + static const String _threadsBoxName = 'box_threads'; + static const String _toDeviceQueueBoxName = 'box_to_device_queue'; static const String _preloadRoomStateBoxName = 'box_preload_room_states'; @@ -218,6 +221,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _clientBoxName, _accountDataBoxName, _roomsBoxName, + _threadsBoxName, _toDeviceQueueBoxName, _preloadRoomStateBoxName, _nonPreloadRoomStateBoxName, @@ -252,6 +256,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _roomsBox = _collection.openBox( _roomsBoxName, ); + _threadsBox = _collection.openBox(_threadsBoxName); _preloadRoomStateBox = _collection.openBox( _preloadRoomStateBoxName, ); @@ -357,6 +362,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { _clientBox.clearQuickAccessCache(); _accountDataBox.clearQuickAccessCache(); _roomsBox.clearQuickAccessCache(); + _threadsBox.clearQuickAccessCache(); _preloadRoomStateBox.clearQuickAccessCache(); _nonPreloadRoomStateBox.clearQuickAccessCache(); _roomMembersBox.clearQuickAccessCache(); @@ -383,6 +389,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { @override Future clearCache() => transaction(() async { await _roomsBox.clear(); + await _threadsBox.clear(); await _accountDataBox.clear(); await _roomAccountDataBox.clear(); await _preloadRoomStateBox.clear(); @@ -532,6 +539,46 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return await _getEventsByIds(eventIds.cast(), room); }); + @override + Future> getThreadEventList( + Thread thread, { + int start = 0, + bool onlySending = false, + int? limit, + }) => + runBenchmarked>('Get event list', () async { + // Get the synced event IDs from the store + final timelineKey = + TupleKey(thread.room.id, '', thread.rootEvent.eventId).toString(); + final timelineEventIds = + (await _timelineFragmentsBox.get(timelineKey) ?? []); + + // Get the local stored SENDING events from the store + late final List sendingEventIds; + if (start != 0) { + sendingEventIds = []; + } else { + final sendingTimelineKey = + TupleKey(thread.room.id, 'SENDING', thread.rootEvent.eventId) + .toString(); + sendingEventIds = + (await _timelineFragmentsBox.get(sendingTimelineKey) ?? []); + } + + // Combine those two lists while respecting the start and limit parameters. + final end = min( + timelineEventIds.length, + start + (limit ?? timelineEventIds.length), + ); + final eventIds = [ + ...sendingEventIds, + if (!onlySending && start < timelineEventIds.length) + ...timelineEventIds.getRange(start, end), + ]; + + return await _getEventsByIds(eventIds.cast(), thread.room); + }); + @override Future getInboundGroupSession( String roomId, @@ -1053,6 +1100,22 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return; } + @override + Future setThreadPrevBatch( + String? prevBatch, + String roomId, + String threadRootEventId, + Client client, + ) async { + final raw = + await _threadsBox.get(TupleKey(roomId, threadRootEventId).toString()); + if (raw == null) return; + final thread = Thread.fromJson(copyMap(raw), client); + thread.prev_batch = prevBatch; + await _threadsBox.put(roomId, thread.toJson()); + return; + } + @override Future setVerifiedUserCrossSigningKey( bool verified, @@ -1302,6 +1365,43 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { return; } + @override + Future> getThreadList(String roomId, Client client) async { + final allThreadsKeys = await _threadsBox.getAllKeys(); + final threadsKeys = {}; + // TERRIBLE implementation. Better to create another box (String[roomId]->List[event ids]) + for (final key in allThreadsKeys) { + if (key.startsWith(roomId)) threadsKeys.add(key); + } + final threads = {}; + + + + return threads.toList(); + } + + @override + Future storeThread( + String roomId, + Event threadRootEvent, + Event? lastEvent, + bool currentUserParticipated, + int count, + Client client, + ) async { + final key = TupleKey(roomId, threadRootEvent.eventId).toString(); + // final currentRawThread = await _threadsBox.get(key); + await _threadsBox.put( + key, + Thread( + room: Room(id: roomId, client: client), + rootEvent: threadRootEvent, + client: client, + currentUserParticipated: currentUserParticipated, + count: count, + ).toJson()); + } + @override Future storeRoomUpdate( String roomId, @@ -1314,6 +1414,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { await forgetRoom(roomId); return; } + final membership = roomUpdate is LeftRoomUpdate ? Membership.leave : roomUpdate is InvitedRoomUpdate @@ -1376,6 +1477,12 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { Future deleteTimelineForRoom(String roomId) => _timelineFragmentsBox.delete(TupleKey(roomId, '').toString()); + @override + Future deleteTimelineForThread( + String roomId, String threadRootEventId) => + _timelineFragmentsBox + .delete(TupleKey(roomId, '', threadRootEventId).toString()); + @override Future storeSSSSCache( String type, diff --git a/lib/src/room.dart b/lib/src/room.dart index a6d1f83a..b69f549c 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -1649,7 +1649,7 @@ class Room { /// [onChange], [onRemove], [onInsert] and the [onHistoryReceived] callbacks. /// This method can also retrieve the timeline at a specific point by setting /// the [eventContextId] - Future getTimeline({ + Future getTimeline({ void Function(int index)? onChange, void Function(int index)? onRemove, void Function(int insertID)? onInsert, diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index da9d5312..2818d808 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -436,6 +436,9 @@ class RoomTimeline extends Timeline { try { if (event.roomId != room.id) return; + // This will be handled by ThreadTimeline + if (event.relationshipType == RelationshipTypes.thread) return; + if (type != EventUpdateType.timeline && type != EventUpdateType.history) { return; } diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 0e9b243b..74c2a3c1 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -1,20 +1,37 @@ import 'package:matrix/matrix.dart'; +import 'package:matrix/matrix_api_lite/generated/internal.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; class Thread { final Room room; final Event rootEvent; Event? lastEvent; String? prev_batch; + bool currentUserParticipated; + int count; final Client client; Thread({ required this.room, required this.rootEvent, required this.client, + required this.currentUserParticipated, + required this.count, this.prev_batch, this.lastEvent, }); + Map toJson() => { + ...rootEvent.toJson(), + 'unsigned': { + 'm.thread': { + 'latest_event': lastEvent?.toJson(), + 'count': count, + 'current_user_participated': currentUserParticipated, + }, + }, + }; + factory Thread.fromJson(Map json, Client client) { final room = client.getRoomById(json['room_id']); if (room == null) throw Error(); @@ -36,6 +53,9 @@ class Thread { room, ), lastEvent: lastEvent, + count: json['unsigned']?['m.relations']?['m.thread']?['count'], + currentUserParticipated: json['unsigned']?['m.relations']?['m.thread'] + ?['current_user_participated'], ); return thread; } @@ -95,6 +115,120 @@ class Thread { return false; } + Future getEventContext(String eventId) async { + // TODO: probably find events with relationship + final resp = await client.getEventContext( + room.id, eventId, + limit: Room.defaultHistoryCount, + // filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), + ); + + final events = [ + if (resp.eventsAfter != null) ...resp.eventsAfter!.reversed, + if (resp.event != null) resp.event!, + if (resp.eventsBefore != null) ...resp.eventsBefore!, + ].map((e) => Event.fromMatrixEvent(e, room)).toList(); + + // Try again to decrypt encrypted events but don't update the database. + if (room.encrypted && client.encryptionEnabled) { + for (var i = 0; i < events.length; i++) { + if (events[i].type == EventTypes.Encrypted && + events[i].content['can_request_session'] == true) { + events[i] = await client.encryption!.decryptRoomEvent(events[i]); + } + } + } + + final chunk = TimelineChunk( + nextBatch: resp.end ?? '', + prevBatch: resp.start ?? '', + events: events, + ); + + return chunk; + } + + Future getTimeline({ + void Function(int index)? onChange, + void Function(int index)? onRemove, + void Function(int insertID)? onInsert, + void Function()? onNewEvent, + void Function()? onUpdate, + String? eventContextId, + int? limit = Room.defaultHistoryCount, + }) async { + // await postLoad(); + + var events = []; + + await client.database.transaction(() async { + events = await client.database.getThreadEventList( + this, + limit: limit, + ); + }); + + var chunk = TimelineChunk(events: events); + // Load the timeline arround eventContextId if set + if (eventContextId != null) { + if (!events.any((Event event) => event.eventId == eventContextId)) { + chunk = + await getEventContext(eventContextId) ?? TimelineChunk(events: []); + } + } + + final timeline = ThreadTimeline( + thread: this, + chunk: chunk, + onChange: onChange, + onRemove: onRemove, + onInsert: onInsert, + onNewEvent: onNewEvent, + onUpdate: onUpdate, + ); + + // Fetch all users from database we have got here. + if (eventContextId == null) { + final userIds = events.map((event) => event.senderId).toSet(); + for (final userId in userIds) { + if (room.getState(EventTypes.RoomMember, userId) != null) continue; + final dbUser = await client.database.getUser(userId, room); + if (dbUser != null) room.setState(dbUser); + } + } + + // Try again to decrypt encrypted events and update the database. + if (room.encrypted && client.encryptionEnabled) { + // decrypt messages + for (var i = 0; i < chunk.events.length; i++) { + if (chunk.events[i].type == EventTypes.Encrypted) { + if (eventContextId != null) { + // for the fragmented timeline, we don't cache the decrypted + //message in the database + chunk.events[i] = await client.encryption!.decryptRoomEvent( + chunk.events[i], + ); + } else { + // else, we need the database + await client.database.transaction(() async { + for (var i = 0; i < chunk.events.length; i++) { + if (chunk.events[i].content['can_request_session'] == true) { + chunk.events[i] = await client.encryption!.decryptRoomEvent( + chunk.events[i], + store: !room.isArchived, + updateType: EventUpdateType.history, + ); + } + } + }); + } + } + } + } + + return timeline; + } + Future sendTextEvent( String message, { String? txid, @@ -170,4 +304,56 @@ class Thread { threadRootEventId: rootEvent.eventId, ); } + + Future setReadMarker({String? eventId, bool? public}) async { + if (eventId == null) return null; + return await client.postReceipt( + room.id, + (public ?? client.receiptsPublicByDefault) + ? ReceiptType.mRead + : ReceiptType.mReadPrivate, + eventId, + threadId: rootEvent.eventId, + ); + } + + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + void Function()? onHistoryReceived, + direction = Direction.b, + StateFilter? filter, + }) async { + final prev_batch = this.prev_batch; + + final storeInDatabase = !room.isArchived; + + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + if (prev_batch == null) { + throw 'Tried to request history without a prev_batch token'; + } + + final resp = await client.getRelatingEventsWithRelType( + room.id, + rootEvent.eventId, + RelationshipTypes.thread, + from: prev_batch, + limit: historyCount, + dir: direction, + recurse: true, + ); + + if (onHistoryReceived != null) onHistoryReceived(); + + await client.database.transaction(() async { + if (storeInDatabase && direction == Direction.b) { + this.prev_batch = resp.prevBatch; + await client.database.setThreadPrevBatch(resp.prevBatch, room.id, rootEvent.eventId, client); + } + }); + + return resp.chunk.length; + } } diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index b480aad9..8501dc2a 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; @@ -22,6 +23,17 @@ class ThreadTimeline extends Timeline { StreamSubscription? sessionIdReceivedSub; StreamSubscription? cancelSendEventSub; + bool isRequestingHistory = false; + bool isFragmentedTimeline = false; + + final Map _eventCache = {}; + + bool _fetchedAllDatabaseEvents = false; + + bool allowNewEvent = true; + + bool _collectHistoryUpdates = false; + ThreadTimeline({ required this.thread, required this.chunk, @@ -59,20 +71,38 @@ class ThreadTimeline extends Timeline { onNewEvent?.call(); } - if (type == EventUpdateType.history && - events.indexWhere((e) => e.eventId == event.eventId) != -1) { - return; - } - var index = events.length; - if (type == EventUpdateType.history) { - events.add(event); + final status = event.status; + final i = _findEvent( + event_id: event.eventId, + unsigned_txid: event.transactionId, + ); + if (i < events.length) { + // if the old status is larger than the new one, we also want to preserve the old status + final oldStatus = events[i].status; + events[i] = event; + // do we preserve the status? we should allow 0 -> -1 updates and status increases + if ((latestEventStatus(status, oldStatus) == oldStatus) && + !(status.isError && oldStatus.isSending)) { + events[i].status = oldStatus; + } + addAggregatedEvent(events[i]); + onChange?.call(i); } else { - index = events.firstIndexWhereNotError; - events.insert(index, event); - } - onInsert?.call(index); + if (type == EventUpdateType.history && + events.indexWhere((e) => e.eventId == event.eventId) != -1) { + return; + } + var index = events.length; + if (type == EventUpdateType.history) { + events.add(event); + } else { + index = events.firstIndexWhereNotError; + events.insert(index, event); + } + onInsert?.call(index); - addAggregatedEvent(event); + addAggregatedEvent(event); + } // Handle redaction events if (event.type == EventTypes.Redaction) { @@ -103,6 +133,184 @@ class ThreadTimeline extends Timeline { } } + /// Request more previous events from the server. + Future getThreadEvents({ + int historyCount = Room.defaultHistoryCount, + direction = Direction.b, + StateFilter? filter, + }) async { + // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set + filter ??= StateFilter(lazyLoadMembers: true); + filter.lazyLoadMembers ??= true; + + final resp = await thread.client.getRelatingEventsWithRelType( + thread.room.id, + thread.rootEvent.eventId, + RelationshipTypes.thread, + dir: direction, + from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, + limit: historyCount, + ); + + if (resp.nextBatch == null) { + Logs().w('We reached the end of the timeline'); + } + + final newNextBatch = direction == Direction.b ? resp.prevBatch : resp.nextBatch; + final newPrevBatch = direction == Direction.b ? resp.nextBatch : resp.prevBatch; + + final type = direction == Direction.b + ? EventUpdateType.history + : EventUpdateType.timeline; + + // I dont know what this piece of code does + // if ((resp.state?.length ?? 0) == 0 && + // resp.start != resp.end && + // newPrevBatch != null && + // newNextBatch != null) { + // if (type == EventUpdateType.history) { + // Logs().w( + // '[nav] we can still request history prevBatch: $type $newPrevBatch', + // ); + // } else { + // Logs().w( + // '[nav] we can still request timeline nextBatch: $type $newNextBatch', + // ); + // } + // } + + final newEvents = + resp.chunk.map((e) => Event.fromMatrixEvent(e, thread.room)).toList(); + + if (!allowNewEvent) { + if (resp.prevBatch == resp.nextBatch || + (resp.nextBatch == null && direction == Direction.f)) { + allowNewEvent = true; + } + + if (allowNewEvent) { + Logs().d('We now allow sync update into the timeline.'); + newEvents.addAll( + await thread.client.database.getThreadEventList(thread, onlySending: true), + ); + } + } + + // Try to decrypt encrypted events but don't update the database. + if (thread.room.encrypted && thread.client.encryptionEnabled) { + for (var i = 0; i < newEvents.length; i++) { + if (newEvents[i].type == EventTypes.Encrypted) { + newEvents[i] = await thread.client.encryption!.decryptRoomEvent( + newEvents[i], + ); + } + } + } + + // update chunk anchors + if (type == EventUpdateType.history) { + chunk.prevBatch = newPrevBatch ?? ''; + + final offset = chunk.events.length; + + chunk.events.addAll(newEvents); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i + offset); + } + } else { + chunk.nextBatch = newNextBatch ?? ''; + chunk.events.insertAll(0, newEvents.reversed); + + for (var i = 0; i < newEvents.length; i++) { + onInsert?.call(i); + } + } + + if (onUpdate != null) { + onUpdate!(); + } + return resp.chunk.length; + } + + Future _requestEvents({ + int historyCount = Room.defaultHistoryCount, + required Direction direction, + StateFilter? filter, + }) async { + onUpdate?.call(); + + try { + // Look up for events in the database first. With fragmented view, we should delete the database cache + final eventsFromStore = isFragmentedTimeline + ? null + : await thread.client.database.getThreadEventList( + thread, + start: events.length, + limit: historyCount, + ); + + if (eventsFromStore != null && eventsFromStore.isNotEmpty) { + for (final e in eventsFromStore) { + addAggregatedEvent(e); + } + // Fetch all users from database we have got here. + for (final event in events) { + if (thread.room.getState(EventTypes.RoomMember, event.senderId) != null) { + continue; + } + final dbUser = + await thread.client.database.getUser(event.senderId, thread.room); + if (dbUser != null) thread.room.setState(dbUser); + } + + if (direction == Direction.b) { + events.addAll(eventsFromStore); + final startIndex = events.length - eventsFromStore.length; + final endIndex = events.length; + for (var i = startIndex; i < endIndex; i++) { + onInsert?.call(i); + } + } else { + events.insertAll(0, eventsFromStore); + final startIndex = eventsFromStore.length; + final endIndex = 0; + for (var i = startIndex; i > endIndex; i--) { + onInsert?.call(i); + } + } + } else { + _fetchedAllDatabaseEvents = true; + Logs().i('No more events found in the store. Request from server...'); + + if (isFragmentedTimeline) { + await getThreadEvents( + historyCount: historyCount, + direction: direction, + filter: filter, + ); + } else { + if (thread.prev_batch == null) { + Logs().i('No more events to request from server...'); + } else { + await thread.requestHistory( + historyCount: historyCount, + direction: direction, + onHistoryReceived: () { + _collectHistoryUpdates = true; + }, + filter: filter, + ); + } + } + } + } finally { + _collectHistoryUpdates = false; + isRequestingHistory = false; + onUpdate?.call(); + } + } + /// Add an event to the aggregation tree void addAggregatedEvent(Event event) { final relationshipType = event.relationshipType; @@ -173,9 +381,15 @@ class ThreadTimeline extends Timeline { } @override - Future getEventById(String id) { - // TODO: implement getEventById - throw UnimplementedError(); + Future getEventById(String id) async { + for (final event in events) { + if (event.eventId == id) return event; + } + if (_eventCache.containsKey(id)) return _eventCache[id]; + final requestedEvent = await thread.room.getEventById(id); + if (requestedEvent == null) return null; + _eventCache[id] = requestedEvent; + return _eventCache[id]; } @override @@ -187,9 +401,15 @@ class ThreadTimeline extends Timeline { @override Future requestHistory( - {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) { - // TODO: implement requestHistory - throw UnimplementedError(); + {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) async { + if (isRequestingHistory) return; + isRequestingHistory = true; + await _requestEvents( + direction: Direction.b, + historyCount: historyCount, + filter: filter, + ); + isRequestingHistory = false; } @override @@ -200,8 +420,10 @@ class ThreadTimeline extends Timeline { @override Future setReadMarker({String? eventId, bool? public}) { - // TODO: implement setReadMarker - throw UnimplementedError(); + return thread.setReadMarker( + eventId: eventId, + public: public, + ); } @override