diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart index 295abf42..317f8b75 100644 --- a/lib/src/database/matrix_sdk_database.dart +++ b/lib/src/database/matrix_sdk_database.dart @@ -1392,7 +1392,6 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage { final key = TupleKey(roomId, threadRootEventId).toString(); final thread = await _threadsBox.get(key); if (thread == null) return null; - Logs().w(thread.toString()); return Thread.fromJson(thread.cast(), client); } diff --git a/lib/src/room.dart b/lib/src/room.dart index 275e18b8..e6b8dbaa 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -136,6 +136,8 @@ class Room { partial = false; } + Map threads = {}; + Future _loadThreadsFromServer() async { try { final response = await client.getThreadRoots(id); @@ -151,6 +153,7 @@ class Room { 1, // count client, ); + threads[event.eventId] = (await client.database.getThread(id, event.eventId, client))!; } } catch (e) { Logs().w('Failed to load threads from server', e); @@ -161,7 +164,8 @@ class Room { // This should be called from the client's sync handling // when a thread-related event is received - if (event.relationshipType == RelationshipTypes.thread && event.relationshipEventId != null) { + if (event.relationshipType == RelationshipTypes.thread && + event.relationshipEventId != null) { // Update thread metadata in database final root = await getEventById(event.relationshipEventId!); if (root == null) return; @@ -226,6 +230,18 @@ class Room { return dict; } + Future getThread(Event rootEvent) async { + final threads = await getThreads(); + if (threads.containsKey(rootEvent.eventId)) return threads[rootEvent.eventId]!; + return Thread( + room: this, + rootEvent: rootEvent, + client: client, + currentUserParticipated: false, + count: 0, + ); + } + /// ID of the fully read marker event. String get fullyRead => roomAccountData['m.fully_read']?.content.tryGet('event_id') ?? ''; diff --git a/lib/src/room_timeline.dart b/lib/src/room_timeline.dart index 3229ae4c..5afd5015 100644 --- a/lib/src/room_timeline.dart +++ b/lib/src/room_timeline.dart @@ -37,9 +37,13 @@ class RoomTimeline extends Timeline { StreamSubscription? sessionIdReceivedSub; StreamSubscription? cancelSendEventSub; + @override bool isRequestingHistory = false; + @override bool isRequestingFuture = false; + @override bool allowNewEvent = true; + @override bool isFragmentedTimeline = false; final Map _eventCache = {}; diff --git a/lib/src/thread.dart b/lib/src/thread.dart index 75f776a8..7e99270b 100644 --- a/lib/src/thread.dart +++ b/lib/src/thread.dart @@ -45,6 +45,16 @@ class Thread { room, ); } + if (json['unsigned']?['m.thread']?['latest_event'] != null) { + lastEvent = Event.fromMatrixEvent( + MatrixEvent.fromJson( + json['unsigned']?['m.thread']?['latest_event'], + ), + room, + ); + } + // Although I was making this part according to specification, it's a bit off + // I have no clue why final thread = Thread( room: room, client: client, @@ -334,7 +344,7 @@ class Thread { if (prev_batch == null) { throw 'Tried to request history without a prev_batch token'; } - + final resp = await client.getRelatingEventsWithRelType( room.id, rootEvent.eventId, @@ -350,7 +360,8 @@ class Thread { await client.database.transaction(() async { if (storeInDatabase && direction == Direction.b) { this.prev_batch = resp.prevBatch; - await client.database.setThreadPrevBatch(resp.prevBatch, room.id, rootEvent.eventId, client); + await client.database.setThreadPrevBatch( + resp.prevBatch, room.id, rootEvent.eventId, client); } }); diff --git a/lib/src/thread_timeline.dart b/lib/src/thread_timeline.dart index 0d8e8e8e..4aa4fc08 100644 --- a/lib/src/thread_timeline.dart +++ b/lib/src/thread_timeline.dart @@ -1,5 +1,4 @@ import 'dart:async'; -import 'dart:convert'; import 'package:matrix/matrix.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; @@ -23,17 +22,18 @@ class ThreadTimeline extends Timeline { StreamSubscription? sessionIdReceivedSub; StreamSubscription? cancelSendEventSub; + @override bool isRequestingHistory = false; + + @override bool isFragmentedTimeline = false; final Map _eventCache = {}; - - bool _fetchedAllDatabaseEvents = false; - + + @override bool allowNewEvent = true; - - bool _collectHistoryUpdates = false; - + + @override bool isRequestingFuture = false; ThreadTimeline({ @@ -52,6 +52,17 @@ class ThreadTimeline extends Timeline { historySub = room.client.onHistoryEvent.stream.listen( (event) => _handleEventUpdate(event, EventUpdateType.history), ); + + // we want to populate our aggregated events + for (final e in events) { + addAggregatedEvent(e); + } + + // we are using a fragmented timeline + if (chunk.nextBatch != '') { + allowNewEvent = false; + isFragmentedTimeline = true; + } } void _handleEventUpdate( @@ -63,7 +74,9 @@ class ThreadTimeline extends Timeline { if (event.roomId != thread.room.id) return; // Ignore events outside of this thread if (event.relationshipType != RelationshipTypes.thread || - event.relationshipEventId != thread.rootEvent.eventId) return; + event.relationshipEventId != thread.rootEvent.eventId) { + return; + } if (type != EventUpdateType.timeline && type != EventUpdateType.history) { return; @@ -152,14 +165,21 @@ class ThreadTimeline extends Timeline { dir: direction, from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch, limit: historyCount, + recurse: true, + ); + + Logs().w( + 'Loading thread events from server ${resp.chunk.length} ${resp.prevBatch}', ); if (resp.nextBatch == null) { Logs().w('We reached the end of the timeline'); } - final newNextBatch = direction == Direction.b ? resp.prevBatch : resp.nextBatch; - final newPrevBatch = direction == Direction.b ? resp.nextBatch : resp.prevBatch; + final newNextBatch = + direction == Direction.b ? resp.prevBatch : resp.nextBatch; + final newPrevBatch = + direction == Direction.b ? resp.nextBatch : resp.prevBatch; final type = direction == Direction.b ? EventUpdateType.history @@ -193,7 +213,8 @@ class ThreadTimeline extends Timeline { if (allowNewEvent) { Logs().d('We now allow sync update into the timeline.'); newEvents.addAll( - await thread.client.database.getThreadEventList(thread, onlySending: true), + await thread.client.database + .getThreadEventList(thread, onlySending: true), ); } } @@ -258,7 +279,8 @@ class ThreadTimeline extends Timeline { } // Fetch all users from database we have got here. for (final event in events) { - if (thread.room.getState(EventTypes.RoomMember, event.senderId) != null) { + if (thread.room.getState(EventTypes.RoomMember, event.senderId) != + null) { continue; } final dbUser = @@ -282,7 +304,6 @@ class ThreadTimeline extends Timeline { } } } else { - _fetchedAllDatabaseEvents = true; Logs().i('No more events found in the store. Request from server...'); if (isFragmentedTimeline) { @@ -298,16 +319,13 @@ class ThreadTimeline extends Timeline { await thread.requestHistory( historyCount: historyCount, direction: direction, - onHistoryReceived: () { - _collectHistoryUpdates = true; - }, + onHistoryReceived: () {}, filter: filter, ); } } } } finally { - _collectHistoryUpdates = false; isRequestingHistory = false; onUpdate?.call(); } @@ -387,8 +405,10 @@ class ThreadTimeline extends Timeline { } @override - Future requestHistory( - {int historyCount = Room.defaultHistoryCount, StateFilter? filter}) async { + Future requestHistory({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { if (isRequestingHistory) return; isRequestingHistory = true; await _requestEvents( @@ -421,45 +441,45 @@ class ThreadTimeline extends Timeline { } @override -bool get canRequestFuture => chunk.nextBatch != null && chunk.nextBatch!.isNotEmpty; + bool get canRequestFuture => chunk.nextBatch.isNotEmpty; -@override -bool get canRequestHistory => chunk.prevBatch != null && chunk.prevBatch!.isNotEmpty; + @override + bool get canRequestHistory => chunk.prevBatch.isNotEmpty; -@override -Future requestFuture({ - int historyCount = Room.defaultHistoryCount, - StateFilter? filter, -}) async { - if (isRequestingFuture || !canRequestFuture) return; - isRequestingFuture = true; - - try { - await getThreadEvents( - historyCount: historyCount, - direction: Direction.f, - filter: filter, - ); - } finally { - isRequestingFuture = false; + @override + Future requestFuture({ + int historyCount = Room.defaultHistoryCount, + StateFilter? filter, + }) async { + if (isRequestingFuture || !canRequestFuture) return; + isRequestingFuture = true; + + try { + await getThreadEvents( + historyCount: historyCount, + direction: Direction.f, + filter: filter, + ); + } finally { + isRequestingFuture = false; + } } -} -@override -void requestKeys({ - bool tryOnlineBackup = true, - bool onlineKeyBackupOnly = true, -}) { - for (final event in events) { - if (event.type == EventTypes.Encrypted && - event.messageType == MessageTypes.BadEncrypted && - event.content['can_request_session'] == true) { - final sessionId = event.content.tryGet('session_id'); - final senderKey = event.content.tryGet('sender_key'); - if (sessionId != null && senderKey != null) { - thread.room.requestSessionKey(sessionId, senderKey); + @override + void requestKeys({ + bool tryOnlineBackup = true, + bool onlineKeyBackupOnly = true, + }) { + for (final event in events) { + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + final sessionId = event.content.tryGet('session_id'); + final senderKey = event.content.tryGet('sender_key'); + if (sessionId != null && senderKey != null) { + thread.room.requestSessionKey(sessionId, senderKey); + } } } } } -} diff --git a/lib/src/timeline.dart b/lib/src/timeline.dart index 4219da7b..11e50670 100644 --- a/lib/src/timeline.dart +++ b/lib/src/timeline.dart @@ -45,6 +45,10 @@ abstract class Timeline { bool get canRequestHistory; bool get canRequestFuture; + bool get allowNewEvent; + bool get isRequestingFuture; + bool get isRequestingHistory; + bool get isFragmentedTimeline; Timeline({ this.onUpdate,