Merge pull request 'Feature: threads' (#1) from feature/threads into main
Reviewed-on: #1
This commit is contained in:
commit
f678376e3a
|
|
@ -0,0 +1,23 @@
|
||||||
|
import fs from 'fs';
|
||||||
|
|
||||||
|
const files = fs.readdirSync('lib/', {
|
||||||
|
recursive: true
|
||||||
|
});
|
||||||
|
|
||||||
|
const q = process.argv[2];
|
||||||
|
|
||||||
|
var total = 0;
|
||||||
|
|
||||||
|
for (const f of files) {
|
||||||
|
try {
|
||||||
|
const b = fs.readFileSync(`lib/${f}`, 'utf-8');
|
||||||
|
if (b.includes(q) || f.includes(q)) {
|
||||||
|
total ++;
|
||||||
|
console.log(f);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`${total} files in total`);
|
||||||
|
|
@ -49,7 +49,10 @@ export 'src/voip/utils/famedly_call_extension.dart';
|
||||||
export 'src/voip/utils/types.dart';
|
export 'src/voip/utils/types.dart';
|
||||||
export 'src/voip/utils/wrapped_media_stream.dart';
|
export 'src/voip/utils/wrapped_media_stream.dart';
|
||||||
export 'src/room.dart';
|
export 'src/room.dart';
|
||||||
|
export 'src/thread.dart';
|
||||||
export 'src/timeline.dart';
|
export 'src/timeline.dart';
|
||||||
|
export 'src/room_timeline.dart';
|
||||||
|
export 'src/thread_timeline.dart';
|
||||||
export 'src/user.dart';
|
export 'src/user.dart';
|
||||||
export 'src/utils/cached_profile_information.dart';
|
export 'src/utils/cached_profile_information.dart';
|
||||||
export 'src/utils/commands_extension.dart';
|
export 'src/utils/commands_extension.dart';
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,9 @@ import 'dart:convert';
|
||||||
|
|
||||||
import 'package:matrix/matrix_api_lite.dart';
|
import 'package:matrix/matrix_api_lite.dart';
|
||||||
import 'package:matrix/src/event.dart';
|
import 'package:matrix/src/event.dart';
|
||||||
import 'package:matrix/src/timeline.dart';
|
import 'package:matrix/src/room_timeline.dart';
|
||||||
|
|
||||||
extension TimelineExportExtension on Timeline {
|
extension TimelineExportExtension on RoomTimeline {
|
||||||
/// Exports timeline events from a Matrix room within a specified date range.
|
/// Exports timeline events from a Matrix room within a specified date range.
|
||||||
///
|
///
|
||||||
/// The export process provides progress updates through the returned stream with the following information:
|
/// The export process provides progress updates through the returned stream with the following information:
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ import 'dart:typed_data';
|
||||||
import 'package:async/async.dart';
|
import 'package:async/async.dart';
|
||||||
import 'package:collection/collection.dart' show IterableExtension;
|
import 'package:collection/collection.dart' show IterableExtension;
|
||||||
import 'package:http/http.dart' as http;
|
import 'package:http/http.dart' as http;
|
||||||
|
import 'package:matrix/src/room_timeline.dart';
|
||||||
import 'package:mime/mime.dart';
|
import 'package:mime/mime.dart';
|
||||||
import 'package:random_string/random_string.dart';
|
import 'package:random_string/random_string.dart';
|
||||||
import 'package:vodozemac/vodozemac.dart' as vod;
|
import 'package:vodozemac/vodozemac.dart' as vod;
|
||||||
|
|
@ -1213,7 +1214,7 @@ class Client extends MatrixApi {
|
||||||
// Set membership of room to leave, in the case we got a left room passed, otherwise
|
// Set membership of room to leave, in the case we got a left room passed, otherwise
|
||||||
// the left room would have still membership join, which would be wrong for the setState later
|
// the left room would have still membership join, which would be wrong for the setState later
|
||||||
archivedRoom.membership = Membership.leave;
|
archivedRoom.membership = Membership.leave;
|
||||||
final timeline = Timeline(
|
final timeline = RoomTimeline(
|
||||||
room: archivedRoom,
|
room: archivedRoom,
|
||||||
chunk: TimelineChunk(
|
chunk: TimelineChunk(
|
||||||
events: roomUpdate.timeline?.events?.reversed
|
events: roomUpdate.timeline?.events?.reversed
|
||||||
|
|
@ -2775,13 +2776,13 @@ class Client extends MatrixApi {
|
||||||
final List<ReceiptEventContent> receipts = [];
|
final List<ReceiptEventContent> receipts = [];
|
||||||
|
|
||||||
for (final event in events) {
|
for (final event in events) {
|
||||||
|
|
||||||
room.setEphemeral(event);
|
room.setEphemeral(event);
|
||||||
|
|
||||||
// Receipt events are deltas between two states. We will create a
|
// Receipt events are deltas between two states. We will create a
|
||||||
// fake room account data event for this and store the difference
|
// fake room account data event for this and store the difference
|
||||||
// there.
|
// there.
|
||||||
if (event.type != 'm.receipt') continue;
|
if (event.type != 'm.receipt') continue;
|
||||||
|
|
||||||
receipts.add(ReceiptEventContent.fromJson(event.content));
|
receipts.add(ReceiptEventContent.fromJson(event.content));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2796,6 +2797,7 @@ class Client extends MatrixApi {
|
||||||
type: LatestReceiptState.eventType,
|
type: LatestReceiptState.eventType,
|
||||||
content: receiptStateContent.toJson(),
|
content: receiptStateContent.toJson(),
|
||||||
);
|
);
|
||||||
|
|
||||||
await database.storeRoomAccountData(room.id, event);
|
await database.storeRoomAccountData(room.id, event);
|
||||||
room.roomAccountData[event.type] = event;
|
room.roomAccountData[event.type] = event;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -59,6 +59,21 @@ abstract class DatabaseApi {
|
||||||
|
|
||||||
Future<List<Room>> getRoomList(Client client);
|
Future<List<Room>> getRoomList(Client client);
|
||||||
|
|
||||||
|
Future<List<Thread>> getThreadList(String roomId, Client client);
|
||||||
|
|
||||||
|
Future<Thread?> getThread(String roomId, String threadRootEventId, Client client);
|
||||||
|
|
||||||
|
Future<void> storeThread(
|
||||||
|
String roomId,
|
||||||
|
Event threadRootEvent,
|
||||||
|
Event? lastEvent,
|
||||||
|
bool currentUserParticipated,
|
||||||
|
int? notificationCount,
|
||||||
|
int? highlightCount,
|
||||||
|
int count,
|
||||||
|
Client client,
|
||||||
|
);
|
||||||
|
|
||||||
Future<Room?> getSingleRoom(
|
Future<Room?> getSingleRoom(
|
||||||
Client client,
|
Client client,
|
||||||
String roomId, {
|
String roomId, {
|
||||||
|
|
@ -78,6 +93,8 @@ abstract class DatabaseApi {
|
||||||
|
|
||||||
Future<void> deleteTimelineForRoom(String roomId);
|
Future<void> deleteTimelineForRoom(String roomId);
|
||||||
|
|
||||||
|
Future<void> deleteTimelineForThread(String roomId, String threadRootEventId);
|
||||||
|
|
||||||
/// Stores an EventUpdate object in the database. Must be called inside of
|
/// Stores an EventUpdate object in the database. Must be called inside of
|
||||||
/// [transaction].
|
/// [transaction].
|
||||||
Future<void> storeEventUpdate(
|
Future<void> storeEventUpdate(
|
||||||
|
|
@ -115,6 +132,13 @@ abstract class DatabaseApi {
|
||||||
int? limit,
|
int? limit,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Future<List<Event>> getThreadEventList(
|
||||||
|
Thread thread, {
|
||||||
|
int start = 0,
|
||||||
|
bool onlySending = false,
|
||||||
|
int? limit,
|
||||||
|
});
|
||||||
|
|
||||||
Future<List<String>> getEventIdList(
|
Future<List<String>> getEventIdList(
|
||||||
Room room, {
|
Room room, {
|
||||||
int start = 0,
|
int start = 0,
|
||||||
|
|
@ -265,6 +289,13 @@ abstract class DatabaseApi {
|
||||||
|
|
||||||
Future removeEvent(String eventId, String roomId);
|
Future removeEvent(String eventId, String roomId);
|
||||||
|
|
||||||
|
Future setThreadPrevBatch(
|
||||||
|
String? prevBatch,
|
||||||
|
String roomId,
|
||||||
|
String threadRootEventId,
|
||||||
|
Client client,
|
||||||
|
);
|
||||||
|
|
||||||
Future setRoomPrevBatch(
|
Future setRoomPrevBatch(
|
||||||
String? prevBatch,
|
String? prevBatch,
|
||||||
String roomId,
|
String roomId,
|
||||||
|
|
|
||||||
|
|
@ -59,6 +59,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
late Box<String> _clientBox;
|
late Box<String> _clientBox;
|
||||||
late Box<Map> _accountDataBox;
|
late Box<Map> _accountDataBox;
|
||||||
late Box<Map> _roomsBox;
|
late Box<Map> _roomsBox;
|
||||||
|
late Box<Map> _threadsBox;
|
||||||
late Box<Map> _toDeviceQueueBox;
|
late Box<Map> _toDeviceQueueBox;
|
||||||
|
|
||||||
/// Key is a tuple as TupleKey(roomId, type, stateKey) where stateKey can be
|
/// Key is a tuple as TupleKey(roomId, type, stateKey) where stateKey can be
|
||||||
|
|
@ -122,6 +123,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
|
|
||||||
static const String _roomsBoxName = 'box_rooms';
|
static const String _roomsBoxName = 'box_rooms';
|
||||||
|
|
||||||
|
static const String _threadsBoxName = 'box_threads';
|
||||||
|
|
||||||
static const String _toDeviceQueueBoxName = 'box_to_device_queue';
|
static const String _toDeviceQueueBoxName = 'box_to_device_queue';
|
||||||
|
|
||||||
static const String _preloadRoomStateBoxName = 'box_preload_room_states';
|
static const String _preloadRoomStateBoxName = 'box_preload_room_states';
|
||||||
|
|
@ -218,6 +221,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
_clientBoxName,
|
_clientBoxName,
|
||||||
_accountDataBoxName,
|
_accountDataBoxName,
|
||||||
_roomsBoxName,
|
_roomsBoxName,
|
||||||
|
_threadsBoxName,
|
||||||
_toDeviceQueueBoxName,
|
_toDeviceQueueBoxName,
|
||||||
_preloadRoomStateBoxName,
|
_preloadRoomStateBoxName,
|
||||||
_nonPreloadRoomStateBoxName,
|
_nonPreloadRoomStateBoxName,
|
||||||
|
|
@ -252,6 +256,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
_roomsBox = _collection.openBox<Map>(
|
_roomsBox = _collection.openBox<Map>(
|
||||||
_roomsBoxName,
|
_roomsBoxName,
|
||||||
);
|
);
|
||||||
|
_threadsBox = _collection.openBox<Map>(_threadsBoxName);
|
||||||
_preloadRoomStateBox = _collection.openBox(
|
_preloadRoomStateBox = _collection.openBox(
|
||||||
_preloadRoomStateBoxName,
|
_preloadRoomStateBoxName,
|
||||||
);
|
);
|
||||||
|
|
@ -357,6 +362,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
_clientBox.clearQuickAccessCache();
|
_clientBox.clearQuickAccessCache();
|
||||||
_accountDataBox.clearQuickAccessCache();
|
_accountDataBox.clearQuickAccessCache();
|
||||||
_roomsBox.clearQuickAccessCache();
|
_roomsBox.clearQuickAccessCache();
|
||||||
|
_threadsBox.clearQuickAccessCache();
|
||||||
_preloadRoomStateBox.clearQuickAccessCache();
|
_preloadRoomStateBox.clearQuickAccessCache();
|
||||||
_nonPreloadRoomStateBox.clearQuickAccessCache();
|
_nonPreloadRoomStateBox.clearQuickAccessCache();
|
||||||
_roomMembersBox.clearQuickAccessCache();
|
_roomMembersBox.clearQuickAccessCache();
|
||||||
|
|
@ -383,6 +389,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
@override
|
@override
|
||||||
Future<void> clearCache() => transaction(() async {
|
Future<void> clearCache() => transaction(() async {
|
||||||
await _roomsBox.clear();
|
await _roomsBox.clear();
|
||||||
|
await _threadsBox.clear();
|
||||||
await _accountDataBox.clear();
|
await _accountDataBox.clear();
|
||||||
await _roomAccountDataBox.clear();
|
await _roomAccountDataBox.clear();
|
||||||
await _preloadRoomStateBox.clear();
|
await _preloadRoomStateBox.clear();
|
||||||
|
|
@ -532,6 +539,46 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
return await _getEventsByIds(eventIds.cast<String>(), room);
|
return await _getEventsByIds(eventIds.cast<String>(), room);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<List<Event>> getThreadEventList(
|
||||||
|
Thread thread, {
|
||||||
|
int start = 0,
|
||||||
|
bool onlySending = false,
|
||||||
|
int? limit,
|
||||||
|
}) =>
|
||||||
|
runBenchmarked<List<Event>>('Get event list', () async {
|
||||||
|
// Get the synced event IDs from the store
|
||||||
|
final timelineKey =
|
||||||
|
TupleKey(thread.room.id, '', thread.rootEvent.eventId).toString();
|
||||||
|
final timelineEventIds =
|
||||||
|
(await _timelineFragmentsBox.get(timelineKey) ?? []);
|
||||||
|
|
||||||
|
// Get the local stored SENDING events from the store
|
||||||
|
late final List sendingEventIds;
|
||||||
|
if (start != 0) {
|
||||||
|
sendingEventIds = [];
|
||||||
|
} else {
|
||||||
|
final sendingTimelineKey =
|
||||||
|
TupleKey(thread.room.id, 'SENDING', thread.rootEvent.eventId)
|
||||||
|
.toString();
|
||||||
|
sendingEventIds =
|
||||||
|
(await _timelineFragmentsBox.get(sendingTimelineKey) ?? []);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Combine those two lists while respecting the start and limit parameters.
|
||||||
|
final end = min(
|
||||||
|
timelineEventIds.length,
|
||||||
|
start + (limit ?? timelineEventIds.length),
|
||||||
|
);
|
||||||
|
final eventIds = [
|
||||||
|
...sendingEventIds,
|
||||||
|
if (!onlySending && start < timelineEventIds.length)
|
||||||
|
...timelineEventIds.getRange(start, end),
|
||||||
|
];
|
||||||
|
|
||||||
|
return await _getEventsByIds(eventIds.cast<String>(), thread.room);
|
||||||
|
});
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<StoredInboundGroupSession?> getInboundGroupSession(
|
Future<StoredInboundGroupSession?> getInboundGroupSession(
|
||||||
String roomId,
|
String roomId,
|
||||||
|
|
@ -1053,6 +1100,22 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> setThreadPrevBatch(
|
||||||
|
String? prevBatch,
|
||||||
|
String roomId,
|
||||||
|
String threadRootEventId,
|
||||||
|
Client client,
|
||||||
|
) async {
|
||||||
|
final raw =
|
||||||
|
await _threadsBox.get(TupleKey(roomId, threadRootEventId).toString());
|
||||||
|
if (raw == null) return;
|
||||||
|
final thread = Thread.fromJson(copyMap(raw), client);
|
||||||
|
thread.prev_batch = prevBatch;
|
||||||
|
await _threadsBox.put(roomId, thread.toJson());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> setVerifiedUserCrossSigningKey(
|
Future<void> setVerifiedUserCrossSigningKey(
|
||||||
bool verified,
|
bool verified,
|
||||||
|
|
@ -1302,6 +1365,64 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<List<Thread>> getThreadList(String roomId, Client client) async {
|
||||||
|
final allThreadsKeys = await _threadsBox.getAllKeys();
|
||||||
|
final threads = <Thread>{};
|
||||||
|
|
||||||
|
// TERRIBLE implementation. Better to create another box (String[roomId]->List<string>[event ids])
|
||||||
|
for (final key in allThreadsKeys) {
|
||||||
|
if (key.startsWith('$roomId|')) {
|
||||||
|
final thread = await getThread(roomId, key.split('|')[1], client);
|
||||||
|
if (thread != null) {
|
||||||
|
threads.add(thread);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return threads.toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<Thread?> getThread(
|
||||||
|
String roomId,
|
||||||
|
String threadRootEventId,
|
||||||
|
Client client,
|
||||||
|
) async {
|
||||||
|
final key = TupleKey(roomId, threadRootEventId).toString();
|
||||||
|
final thread = await _threadsBox.get(key);
|
||||||
|
if (thread == null) return null;
|
||||||
|
return Thread.fromJson(thread.cast<String, dynamic>(), client);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> storeThread(
|
||||||
|
String roomId,
|
||||||
|
Event threadRootEvent,
|
||||||
|
Event? lastEvent,
|
||||||
|
bool currentUserParticipated,
|
||||||
|
int? notificationCount,
|
||||||
|
int? highlightCount,
|
||||||
|
int count,
|
||||||
|
Client client,
|
||||||
|
) async {
|
||||||
|
final key = TupleKey(roomId, threadRootEvent.eventId).toString();
|
||||||
|
// final currentRawThread = await _threadsBox.get(key);
|
||||||
|
await _threadsBox.put(
|
||||||
|
key,
|
||||||
|
Thread(
|
||||||
|
room: Room(id: roomId, client: client),
|
||||||
|
rootEvent: threadRootEvent,
|
||||||
|
lastEvent: lastEvent,
|
||||||
|
client: client,
|
||||||
|
currentUserParticipated: currentUserParticipated,
|
||||||
|
count: count,
|
||||||
|
notificationCount: notificationCount ?? 0,
|
||||||
|
highlightCount: highlightCount ?? 0,
|
||||||
|
).toJson(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> storeRoomUpdate(
|
Future<void> storeRoomUpdate(
|
||||||
String roomId,
|
String roomId,
|
||||||
|
|
@ -1314,6 +1435,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
await forgetRoom(roomId);
|
await forgetRoom(roomId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final membership = roomUpdate is LeftRoomUpdate
|
final membership = roomUpdate is LeftRoomUpdate
|
||||||
? Membership.leave
|
? Membership.leave
|
||||||
: roomUpdate is InvitedRoomUpdate
|
: roomUpdate is InvitedRoomUpdate
|
||||||
|
|
@ -1376,6 +1498,12 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
Future<void> deleteTimelineForRoom(String roomId) =>
|
Future<void> deleteTimelineForRoom(String roomId) =>
|
||||||
_timelineFragmentsBox.delete(TupleKey(roomId, '').toString());
|
_timelineFragmentsBox.delete(TupleKey(roomId, '').toString());
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> deleteTimelineForThread(
|
||||||
|
String roomId, String threadRootEventId) =>
|
||||||
|
_timelineFragmentsBox
|
||||||
|
.delete(TupleKey(roomId, '', threadRootEventId).toString());
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> storeSSSSCache(
|
Future<void> storeSSSSCache(
|
||||||
String type,
|
String type,
|
||||||
|
|
|
||||||
|
|
@ -129,9 +129,71 @@ class Room {
|
||||||
for (final state in allStates) {
|
for (final state in allStates) {
|
||||||
setState(state);
|
setState(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await loadThreadsFromServer();
|
||||||
|
|
||||||
partial = false;
|
partial = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Map<String, Thread> threads = <String, Thread>{};
|
||||||
|
String? getThreadRootsBatch;
|
||||||
|
bool loadedAllThreads = false;
|
||||||
|
|
||||||
|
Future<void> loadThreadsFromServer() async {
|
||||||
|
try {
|
||||||
|
if (loadedAllThreads) return;
|
||||||
|
final response =
|
||||||
|
await client.getThreadRoots(id, from: getThreadRootsBatch);
|
||||||
|
|
||||||
|
for (final threadEvent in response.chunk) {
|
||||||
|
final event = Event.fromMatrixEvent(threadEvent, this);
|
||||||
|
final thread = Thread.fromJson(threadEvent.toJson(), client);
|
||||||
|
// Store thread in database
|
||||||
|
await client.database.storeThread(
|
||||||
|
id,
|
||||||
|
event,
|
||||||
|
thread.lastEvent, // lastEvent
|
||||||
|
thread.currentUserParticipated ?? false, // currentUserParticipated
|
||||||
|
0, 0,
|
||||||
|
thread.count ?? 1, // count
|
||||||
|
client,
|
||||||
|
);
|
||||||
|
threads[event.eventId] = thread;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (response.nextBatch == null) {
|
||||||
|
loadedAllThreads = true;
|
||||||
|
} else {
|
||||||
|
getThreadRootsBatch = response.nextBatch;
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
Logs().w('Failed to load threads from server', e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> handleThreadSync(Event event) async {
|
||||||
|
// This should be called from the client's sync handling
|
||||||
|
// when a thread-related event is received
|
||||||
|
|
||||||
|
// if (event.relationshipType == RelationshipTypes.thread &&
|
||||||
|
// event.relationshipEventId != null) {
|
||||||
|
// Update thread metadata in database
|
||||||
|
final root = await getEventById(event.relationshipEventId!);
|
||||||
|
if (root == null) return;
|
||||||
|
final thread = await client.database.getThread(id, event.relationshipEventId!, client);
|
||||||
|
await client.database.storeThread(
|
||||||
|
id,
|
||||||
|
root,
|
||||||
|
event, // update last event
|
||||||
|
event.senderId == client.userID || (thread?.currentUserParticipated ?? false), // currentUserParticipated
|
||||||
|
(thread?.count ?? 0) + 1, // increment count - should be calculated properly
|
||||||
|
0, 0,
|
||||||
|
client,
|
||||||
|
);
|
||||||
|
threads[event.relationshipEventId!] = (await client.database.getThread(id, event.relationshipEventId!, client))!;
|
||||||
|
//}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the [Event] for the given [typeKey] and optional [stateKey].
|
/// Returns the [Event] for the given [typeKey] and optional [stateKey].
|
||||||
/// If no [stateKey] is provided, it defaults to an empty string.
|
/// If no [stateKey] is provided, it defaults to an empty string.
|
||||||
/// This returns either a `StrippedStateEvent` for rooms with membership
|
/// This returns either a `StrippedStateEvent` for rooms with membership
|
||||||
|
|
@ -173,6 +235,31 @@ class Room {
|
||||||
client.onRoomState.add((roomId: id, state: state));
|
client.onRoomState.add((roomId: id, state: state));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<Map<String, Thread>> getThreads() async {
|
||||||
|
final dict = <String, Thread>{};
|
||||||
|
final list = await client.database.getThreadList(id, client);
|
||||||
|
for (final thread in list) {
|
||||||
|
dict[thread.rootEvent.eventId] = thread;
|
||||||
|
}
|
||||||
|
return dict;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Thread> getThread(Event rootEvent) async {
|
||||||
|
final threads = await getThreads();
|
||||||
|
if (threads.containsKey(rootEvent.eventId)) {
|
||||||
|
return threads[rootEvent.eventId]!;
|
||||||
|
}
|
||||||
|
return Thread(
|
||||||
|
room: this,
|
||||||
|
rootEvent: rootEvent,
|
||||||
|
client: client,
|
||||||
|
currentUserParticipated: false,
|
||||||
|
count: 0,
|
||||||
|
highlightCount: 0,
|
||||||
|
notificationCount: 0,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/// ID of the fully read marker event.
|
/// ID of the fully read marker event.
|
||||||
String get fullyRead =>
|
String get fullyRead =>
|
||||||
roomAccountData['m.fully_read']?.content.tryGet<String>('event_id') ?? '';
|
roomAccountData['m.fully_read']?.content.tryGet<String>('event_id') ?? '';
|
||||||
|
|
@ -1442,6 +1529,8 @@ class Room {
|
||||||
direction = Direction.b,
|
direction = Direction.b,
|
||||||
StateFilter? filter,
|
StateFilter? filter,
|
||||||
}) async {
|
}) async {
|
||||||
|
unawaited(loadThreadsFromServer());
|
||||||
|
|
||||||
final prev_batch = this.prev_batch;
|
final prev_batch = this.prev_batch;
|
||||||
|
|
||||||
final storeInDatabase = !isArchived;
|
final storeInDatabase = !isArchived;
|
||||||
|
|
@ -1648,7 +1737,7 @@ class Room {
|
||||||
/// [onChange], [onRemove], [onInsert] and the [onHistoryReceived] callbacks.
|
/// [onChange], [onRemove], [onInsert] and the [onHistoryReceived] callbacks.
|
||||||
/// This method can also retrieve the timeline at a specific point by setting
|
/// This method can also retrieve the timeline at a specific point by setting
|
||||||
/// the [eventContextId]
|
/// the [eventContextId]
|
||||||
Future<Timeline> getTimeline({
|
Future<RoomTimeline> getTimeline({
|
||||||
void Function(int index)? onChange,
|
void Function(int index)? onChange,
|
||||||
void Function(int index)? onRemove,
|
void Function(int index)? onRemove,
|
||||||
void Function(int insertID)? onInsert,
|
void Function(int insertID)? onInsert,
|
||||||
|
|
@ -1690,7 +1779,7 @@ class Room {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final timeline = Timeline(
|
final timeline = RoomTimeline(
|
||||||
room: this,
|
room: this,
|
||||||
chunk: chunk,
|
chunk: chunk,
|
||||||
onChange: onChange,
|
onChange: onChange,
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,660 @@
|
||||||
|
/*
|
||||||
|
* Famedly Matrix SDK
|
||||||
|
* Copyright (C) 2019, 2020, 2021 Famedly GmbH
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as
|
||||||
|
* published by the Free Software Foundation, either version 3 of the
|
||||||
|
* License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import 'dart:async';
|
||||||
|
import 'dart:convert';
|
||||||
|
|
||||||
|
import 'package:collection/collection.dart';
|
||||||
|
import 'package:matrix/matrix.dart';
|
||||||
|
import 'package:matrix/src/models/timeline_chunk.dart';
|
||||||
|
|
||||||
|
/// Represents the main timeline of a room.
|
||||||
|
class RoomTimeline extends Timeline {
|
||||||
|
final Room room;
|
||||||
|
@override
|
||||||
|
List<Event> get events => chunk.events;
|
||||||
|
|
||||||
|
TimelineChunk chunk;
|
||||||
|
|
||||||
|
StreamSubscription<Event>? timelineSub;
|
||||||
|
StreamSubscription<Event>? historySub;
|
||||||
|
StreamSubscription<SyncUpdate>? roomSub;
|
||||||
|
StreamSubscription<String>? sessionIdReceivedSub;
|
||||||
|
StreamSubscription<String>? cancelSendEventSub;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool isRequestingHistory = false;
|
||||||
|
@override
|
||||||
|
bool isRequestingFuture = false;
|
||||||
|
@override
|
||||||
|
bool allowNewEvent = true;
|
||||||
|
@override
|
||||||
|
bool isFragmentedTimeline = false;
|
||||||
|
|
||||||
|
final Map<String, Event> _eventCache = {};
|
||||||
|
|
||||||
|
// When fetching history, we will collect them into the `_historyUpdates` set
|
||||||
|
// first, and then only process all events at once, once we have the full history.
|
||||||
|
// This ensures that the entire history fetching only triggers `onUpdate` only *once*,
|
||||||
|
// even if /sync's complete while history is being proccessed.
|
||||||
|
bool _collectHistoryUpdates = false;
|
||||||
|
|
||||||
|
// We confirmed, that there are no more events to load from the database.
|
||||||
|
bool _fetchedAllDatabaseEvents = false;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get canRequestHistory {
|
||||||
|
if (!{Membership.join, Membership.leave}.contains(room.membership)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (events.isEmpty) return true;
|
||||||
|
return !_fetchedAllDatabaseEvents ||
|
||||||
|
(room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get canRequestFuture => !allowNewEvent;
|
||||||
|
|
||||||
|
RoomTimeline({
|
||||||
|
required this.room,
|
||||||
|
required this.chunk,
|
||||||
|
super.onUpdate,
|
||||||
|
super.onChange,
|
||||||
|
super.onInsert,
|
||||||
|
super.onRemove,
|
||||||
|
super.onNewEvent,
|
||||||
|
}) {
|
||||||
|
timelineSub = room.client.onTimelineEvent.stream.listen(
|
||||||
|
(event) => _handleEventUpdate(event, EventUpdateType.timeline),
|
||||||
|
);
|
||||||
|
historySub = room.client.onHistoryEvent.stream.listen(
|
||||||
|
(event) => _handleEventUpdate(event, EventUpdateType.history),
|
||||||
|
);
|
||||||
|
|
||||||
|
// If the timeline is limited we want to clear our events cache
|
||||||
|
roomSub = room.client.onSync.stream
|
||||||
|
.where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
|
||||||
|
.listen(_removeEventsNotInThisSync);
|
||||||
|
|
||||||
|
sessionIdReceivedSub =
|
||||||
|
room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
|
||||||
|
cancelSendEventSub =
|
||||||
|
room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
|
||||||
|
|
||||||
|
// we want to populate our aggregated events
|
||||||
|
for (final e in events) {
|
||||||
|
addAggregatedEvent(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// we are using a fragmented timeline
|
||||||
|
if (chunk.nextBatch != '') {
|
||||||
|
allowNewEvent = false;
|
||||||
|
isFragmentedTimeline = true;
|
||||||
|
// fragmented timelines never read from the database.
|
||||||
|
_fetchedAllDatabaseEvents = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<Event?> getEventById(String id) async {
|
||||||
|
for (final event in events) {
|
||||||
|
if (event.eventId == id) return event;
|
||||||
|
}
|
||||||
|
if (_eventCache.containsKey(id)) return _eventCache[id];
|
||||||
|
final requestedEvent = await room.getEventById(id);
|
||||||
|
if (requestedEvent == null) return null;
|
||||||
|
_eventCache[id] = requestedEvent;
|
||||||
|
return _eventCache[id];
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> requestHistory({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
if (isRequestingHistory) return;
|
||||||
|
isRequestingHistory = true;
|
||||||
|
await _requestEvents(
|
||||||
|
direction: Direction.b,
|
||||||
|
historyCount: historyCount,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
isRequestingHistory = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> requestFuture({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
if (allowNewEvent) return;
|
||||||
|
if (isRequestingFuture) return;
|
||||||
|
isRequestingFuture = true;
|
||||||
|
await _requestEvents(
|
||||||
|
direction: Direction.f,
|
||||||
|
historyCount: historyCount,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
isRequestingFuture = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _requestEvents({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
required Direction direction,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
onUpdate?.call();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Look up for events in the database first. With fragmented view, we should delete the database cache
|
||||||
|
final eventsFromStore = isFragmentedTimeline
|
||||||
|
? null
|
||||||
|
: await room.client.database.getEventList(
|
||||||
|
room,
|
||||||
|
start: events.length,
|
||||||
|
limit: historyCount,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
|
||||||
|
for (final e in eventsFromStore) {
|
||||||
|
addAggregatedEvent(e);
|
||||||
|
}
|
||||||
|
// Fetch all users from database we have got here.
|
||||||
|
for (final event in events) {
|
||||||
|
if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final dbUser =
|
||||||
|
await room.client.database.getUser(event.senderId, room);
|
||||||
|
if (dbUser != null) room.setState(dbUser);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (direction == Direction.b) {
|
||||||
|
events.addAll(eventsFromStore);
|
||||||
|
final startIndex = events.length - eventsFromStore.length;
|
||||||
|
final endIndex = events.length;
|
||||||
|
for (var i = startIndex; i < endIndex; i++) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
events.insertAll(0, eventsFromStore);
|
||||||
|
final startIndex = eventsFromStore.length;
|
||||||
|
final endIndex = 0;
|
||||||
|
for (var i = startIndex; i > endIndex; i--) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
_fetchedAllDatabaseEvents = true;
|
||||||
|
Logs().i('No more events found in the store. Request from server...');
|
||||||
|
|
||||||
|
if (isFragmentedTimeline) {
|
||||||
|
await getRoomEvents(
|
||||||
|
historyCount: historyCount,
|
||||||
|
direction: direction,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
if (room.prev_batch == null) {
|
||||||
|
Logs().i('No more events to request from server...');
|
||||||
|
} else {
|
||||||
|
await room.requestHistory(
|
||||||
|
historyCount: historyCount,
|
||||||
|
direction: direction,
|
||||||
|
onHistoryReceived: () {
|
||||||
|
_collectHistoryUpdates = true;
|
||||||
|
},
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
_collectHistoryUpdates = false;
|
||||||
|
isRequestingHistory = false;
|
||||||
|
onUpdate?.call();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Request more previous events from the server.
|
||||||
|
Future<int> getRoomEvents({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
direction = Direction.b,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
// Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
|
||||||
|
filter ??= StateFilter(lazyLoadMembers: true);
|
||||||
|
filter.lazyLoadMembers ??= true;
|
||||||
|
|
||||||
|
final resp = await room.client.getRoomEvents(
|
||||||
|
room.id,
|
||||||
|
direction,
|
||||||
|
from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
|
||||||
|
limit: historyCount,
|
||||||
|
filter: jsonEncode(filter.toJson()),
|
||||||
|
);
|
||||||
|
|
||||||
|
if (resp.end == null) {
|
||||||
|
Logs().w('We reached the end of the timeline');
|
||||||
|
}
|
||||||
|
|
||||||
|
final newNextBatch = direction == Direction.b ? resp.start : resp.end;
|
||||||
|
final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
|
||||||
|
|
||||||
|
final type = direction == Direction.b
|
||||||
|
? EventUpdateType.history
|
||||||
|
: EventUpdateType.timeline;
|
||||||
|
|
||||||
|
if ((resp.state?.length ?? 0) == 0 &&
|
||||||
|
resp.start != resp.end &&
|
||||||
|
newPrevBatch != null &&
|
||||||
|
newNextBatch != null) {
|
||||||
|
if (type == EventUpdateType.history) {
|
||||||
|
Logs().w(
|
||||||
|
'[nav] we can still request history prevBatch: $type $newPrevBatch',
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
Logs().w(
|
||||||
|
'[nav] we can still request timeline nextBatch: $type $newNextBatch',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final newEvents =
|
||||||
|
resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
|
||||||
|
|
||||||
|
if (!allowNewEvent) {
|
||||||
|
if (resp.start == resp.end ||
|
||||||
|
(resp.end == null && direction == Direction.f)) {
|
||||||
|
allowNewEvent = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allowNewEvent) {
|
||||||
|
Logs().d('We now allow sync update into the timeline.');
|
||||||
|
newEvents.addAll(
|
||||||
|
await room.client.database.getEventList(room, onlySending: true),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to decrypt encrypted events but don't update the database.
|
||||||
|
if (room.encrypted && room.client.encryptionEnabled) {
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
if (newEvents[i].type == EventTypes.Encrypted) {
|
||||||
|
newEvents[i] = await room.client.encryption!.decryptRoomEvent(
|
||||||
|
newEvents[i],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update chunk anchors
|
||||||
|
if (type == EventUpdateType.history) {
|
||||||
|
chunk.prevBatch = newPrevBatch ?? '';
|
||||||
|
|
||||||
|
final offset = chunk.events.length;
|
||||||
|
|
||||||
|
chunk.events.addAll(newEvents);
|
||||||
|
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
onInsert?.call(i + offset);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
chunk.nextBatch = newNextBatch ?? '';
|
||||||
|
chunk.events.insertAll(0, newEvents.reversed);
|
||||||
|
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (onUpdate != null) {
|
||||||
|
onUpdate!();
|
||||||
|
}
|
||||||
|
return resp.chunk.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
void _cleanUpCancelledEvent(String eventId) {
|
||||||
|
final i = _findEvent(event_id: eventId);
|
||||||
|
if (i < events.length) {
|
||||||
|
removeAggregatedEvent(events[i]);
|
||||||
|
events.removeAt(i);
|
||||||
|
onRemove?.call(i);
|
||||||
|
onUpdate?.call();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes all entries from [events] which are not in this SyncUpdate.
|
||||||
|
void _removeEventsNotInThisSync(SyncUpdate sync) {
|
||||||
|
final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
|
||||||
|
final keepEventIds = newSyncEvents.map((e) => e.eventId);
|
||||||
|
events.removeWhere((e) => !keepEventIds.contains(e.eventId));
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void cancelSubscriptions() {
|
||||||
|
timelineSub?.cancel();
|
||||||
|
historySub?.cancel();
|
||||||
|
roomSub?.cancel();
|
||||||
|
sessionIdReceivedSub?.cancel();
|
||||||
|
cancelSendEventSub?.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
void _sessionKeyReceived(String sessionId) async {
|
||||||
|
var decryptAtLeastOneEvent = false;
|
||||||
|
Future<void> decryptFn() async {
|
||||||
|
final encryption = room.client.encryption;
|
||||||
|
if (!room.client.encryptionEnabled || encryption == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (var i = 0; i < events.length; i++) {
|
||||||
|
if (events[i].type == EventTypes.Encrypted &&
|
||||||
|
events[i].messageType == MessageTypes.BadEncrypted &&
|
||||||
|
events[i].content['session_id'] == sessionId) {
|
||||||
|
events[i] = await encryption.decryptRoomEvent(
|
||||||
|
events[i],
|
||||||
|
store: true,
|
||||||
|
updateType: EventUpdateType.history,
|
||||||
|
);
|
||||||
|
addAggregatedEvent(events[i]);
|
||||||
|
onChange?.call(i);
|
||||||
|
if (events[i].type != EventTypes.Encrypted) {
|
||||||
|
decryptAtLeastOneEvent = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await room.client.database.transaction(decryptFn);
|
||||||
|
if (decryptAtLeastOneEvent) onUpdate?.call();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void requestKeys({
|
||||||
|
bool tryOnlineBackup = true,
|
||||||
|
bool onlineKeyBackupOnly = true,
|
||||||
|
}) {
|
||||||
|
for (final event in events) {
|
||||||
|
if (event.type == EventTypes.Encrypted &&
|
||||||
|
event.messageType == MessageTypes.BadEncrypted &&
|
||||||
|
event.content['can_request_session'] == true) {
|
||||||
|
final sessionId = event.content.tryGet<String>('session_id');
|
||||||
|
final senderKey = event.content.tryGet<String>('sender_key');
|
||||||
|
if (sessionId != null && senderKey != null) {
|
||||||
|
room.client.encryption?.keyManager.maybeAutoRequest(
|
||||||
|
room.id,
|
||||||
|
sessionId,
|
||||||
|
senderKey,
|
||||||
|
tryOnlineBackup: tryOnlineBackup,
|
||||||
|
onlineKeyBackupOnly: onlineKeyBackupOnly,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> setReadMarker({String? eventId, bool? public}) async {
|
||||||
|
eventId ??=
|
||||||
|
events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
|
||||||
|
if (eventId == null) return;
|
||||||
|
return room.setReadMarker(eventId, mRead: eventId, public: public);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find event index by event ID or transaction ID
|
||||||
|
int _findEvent({String? event_id, String? unsigned_txid}) {
|
||||||
|
final searchNeedle = <String>{};
|
||||||
|
if (event_id != null) searchNeedle.add(event_id);
|
||||||
|
if (unsigned_txid != null) searchNeedle.add(unsigned_txid);
|
||||||
|
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < events.length; i++) {
|
||||||
|
final searchHaystack = <String>{events[i].eventId};
|
||||||
|
final txnid = events[i].transactionId;
|
||||||
|
if (txnid != null) searchHaystack.add(txnid);
|
||||||
|
if (searchNeedle.intersection(searchHaystack).isNotEmpty) break;
|
||||||
|
}
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
|
void _handleEventUpdate(
|
||||||
|
Event event,
|
||||||
|
EventUpdateType type, {
|
||||||
|
bool update = true,
|
||||||
|
}) {
|
||||||
|
try {
|
||||||
|
if (event.roomId != room.id) return;
|
||||||
|
|
||||||
|
if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip thread events in main timeline - THEY SHOULD ONLY APPEAR IN THREAD TIMELINES
|
||||||
|
if (event.relationshipType == RelationshipTypes.thread &&
|
||||||
|
event.relationshipEventId != null) {
|
||||||
|
unawaited(room.handleThreadSync(event));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type == EventUpdateType.timeline) {
|
||||||
|
onNewEvent?.call();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!allowNewEvent) return;
|
||||||
|
|
||||||
|
final status = event.status;
|
||||||
|
|
||||||
|
final i = _findEvent(
|
||||||
|
event_id: event.eventId,
|
||||||
|
unsigned_txid: event.transactionId,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (i < events.length) {
|
||||||
|
// if the old status is larger than the new one, we also want to preserve the old status
|
||||||
|
final oldStatus = events[i].status;
|
||||||
|
events[i] = event;
|
||||||
|
// do we preserve the status? we should allow 0 -> -1 updates and status increases
|
||||||
|
if ((latestEventStatus(status, oldStatus) == oldStatus) &&
|
||||||
|
!(status.isError && oldStatus.isSending)) {
|
||||||
|
events[i].status = oldStatus;
|
||||||
|
}
|
||||||
|
addAggregatedEvent(events[i]);
|
||||||
|
onChange?.call(i);
|
||||||
|
} else {
|
||||||
|
if (type == EventUpdateType.history &&
|
||||||
|
events.indexWhere((e) => e.eventId == event.eventId) != -1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var index = events.length;
|
||||||
|
if (type == EventUpdateType.history) {
|
||||||
|
events.add(event);
|
||||||
|
} else {
|
||||||
|
index = events.firstIndexWhereNotError;
|
||||||
|
events.insert(index, event);
|
||||||
|
}
|
||||||
|
onInsert?.call(index);
|
||||||
|
|
||||||
|
addAggregatedEvent(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.relationshipEventId != null &&
|
||||||
|
(event.relationshipType == RelationshipTypes.edit ||
|
||||||
|
event.relationshipType == RelationshipTypes.reaction ||
|
||||||
|
event.relationshipType == RelationshipTypes.reference)) {
|
||||||
|
final parentEventIndex =
|
||||||
|
_findEvent(event_id: event.relationshipEventId);
|
||||||
|
unawaited(room.handleThreadSync(events[parentEventIndex]));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle redaction events
|
||||||
|
if (event.type == EventTypes.Redaction) {
|
||||||
|
final index = _findEvent(event_id: event.redacts);
|
||||||
|
if (index < events.length) {
|
||||||
|
removeAggregatedEvent(events[index]);
|
||||||
|
|
||||||
|
// Is the redacted event a reaction? Then update the event this
|
||||||
|
// belongs to:
|
||||||
|
if (onChange != null) {
|
||||||
|
final relationshipEventId = events[index].relationshipEventId;
|
||||||
|
if (relationshipEventId != null) {
|
||||||
|
onChange?.call(_findEvent(event_id: relationshipEventId));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
events[index].setRedactionEvent(event);
|
||||||
|
onChange?.call(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (update && !_collectHistoryUpdates) {
|
||||||
|
onUpdate?.call();
|
||||||
|
}
|
||||||
|
} catch (e, s) {
|
||||||
|
Logs().w('Handle event update failed', e, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<(List<Event>, String?)> startSearch({
|
||||||
|
String? searchTerm,
|
||||||
|
int requestHistoryCount = 100,
|
||||||
|
int maxHistoryRequests = 10,
|
||||||
|
String? prevBatch,
|
||||||
|
@Deprecated('Use [prevBatch] instead.') String? sinceEventId,
|
||||||
|
int? limit,
|
||||||
|
bool Function(Event)? searchFunc,
|
||||||
|
}) async* {
|
||||||
|
assert(searchTerm != null || searchFunc != null);
|
||||||
|
searchFunc ??= (event) =>
|
||||||
|
event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
|
||||||
|
final found = <Event>[];
|
||||||
|
|
||||||
|
if (sinceEventId == null) {
|
||||||
|
// Search locally
|
||||||
|
for (final event in events) {
|
||||||
|
if (searchFunc(event)) {
|
||||||
|
yield (found..add(event), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search in database
|
||||||
|
var start = events.length;
|
||||||
|
while (true) {
|
||||||
|
final eventsFromStore = await room.client.database.getEventList(
|
||||||
|
room,
|
||||||
|
start: start,
|
||||||
|
limit: requestHistoryCount,
|
||||||
|
);
|
||||||
|
if (eventsFromStore.isEmpty) break;
|
||||||
|
start += eventsFromStore.length;
|
||||||
|
for (final event in eventsFromStore) {
|
||||||
|
if (searchFunc(event)) {
|
||||||
|
yield (found..add(event), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search on the server
|
||||||
|
prevBatch ??= room.prev_batch;
|
||||||
|
if (sinceEventId != null) {
|
||||||
|
prevBatch =
|
||||||
|
(await room.client.getEventContext(room.id, sinceEventId)).end;
|
||||||
|
}
|
||||||
|
final encryption = room.client.encryption;
|
||||||
|
for (var i = 0; i < maxHistoryRequests; i++) {
|
||||||
|
if (prevBatch == null) break;
|
||||||
|
if (limit != null && found.length >= limit) break;
|
||||||
|
try {
|
||||||
|
final resp = await room.client.getRoomEvents(
|
||||||
|
room.id,
|
||||||
|
Direction.b,
|
||||||
|
from: prevBatch,
|
||||||
|
limit: requestHistoryCount,
|
||||||
|
filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
|
||||||
|
);
|
||||||
|
for (final matrixEvent in resp.chunk) {
|
||||||
|
var event = Event.fromMatrixEvent(matrixEvent, room);
|
||||||
|
if (event.type == EventTypes.Encrypted && encryption != null) {
|
||||||
|
event = await encryption.decryptRoomEvent(event);
|
||||||
|
if (event.type == EventTypes.Encrypted &&
|
||||||
|
event.messageType == MessageTypes.BadEncrypted &&
|
||||||
|
event.content['can_request_session'] == true) {
|
||||||
|
// Await requestKey() here to ensure decrypted message bodies
|
||||||
|
await event.requestKey();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (searchFunc(event)) {
|
||||||
|
yield (found..add(event), resp.end);
|
||||||
|
if (limit != null && found.length >= limit) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prevBatch = resp.end;
|
||||||
|
// We are at the beginning of the room
|
||||||
|
if (resp.chunk.length < requestHistoryCount) break;
|
||||||
|
} on MatrixException catch (e) {
|
||||||
|
// We have no permission anymore to request the history
|
||||||
|
if (e.error == MatrixError.M_FORBIDDEN) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
rethrow;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add an event to the aggregation tree
|
||||||
|
void addAggregatedEvent(Event event) {
|
||||||
|
final relationshipType = event.relationshipType;
|
||||||
|
final relationshipEventId = event.relationshipEventId;
|
||||||
|
if (relationshipType == null || relationshipEventId == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final e = (aggregatedEvents[relationshipEventId] ??=
|
||||||
|
<String, Set<Event>>{})[relationshipType] ??= <Event>{};
|
||||||
|
_removeEventFromSet(e, event);
|
||||||
|
e.add(event);
|
||||||
|
if (onChange != null) {
|
||||||
|
final index = _findEvent(event_id: relationshipEventId);
|
||||||
|
onChange?.call(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove an event from aggregation
|
||||||
|
void removeAggregatedEvent(Event event) {
|
||||||
|
aggregatedEvents.remove(event.eventId);
|
||||||
|
if (event.transactionId != null) {
|
||||||
|
aggregatedEvents.remove(event.transactionId);
|
||||||
|
}
|
||||||
|
for (final types in aggregatedEvents.values) {
|
||||||
|
for (final e in types.values) {
|
||||||
|
_removeEventFromSet(e, event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove event from set based on event or transaction ID
|
||||||
|
void _removeEventFromSet(Set<Event> eventSet, Event event) {
|
||||||
|
eventSet.removeWhere(
|
||||||
|
(e) =>
|
||||||
|
e.matchesEventOrTransactionId(event.eventId) ||
|
||||||
|
event.unsigned != null &&
|
||||||
|
e.matchesEventOrTransactionId(event.transactionId),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,431 @@
|
||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:matrix/matrix.dart';
|
||||||
|
import 'package:matrix/src/models/timeline_chunk.dart';
|
||||||
|
|
||||||
|
class Thread {
|
||||||
|
final Room room;
|
||||||
|
final Event rootEvent;
|
||||||
|
Event? lastEvent;
|
||||||
|
String? prev_batch;
|
||||||
|
bool? currentUserParticipated;
|
||||||
|
int? count;
|
||||||
|
final Client client;
|
||||||
|
|
||||||
|
/// The count of unread notifications.
|
||||||
|
int notificationCount = 0;
|
||||||
|
|
||||||
|
/// The count of highlighted notifications.
|
||||||
|
int highlightCount = 0;
|
||||||
|
|
||||||
|
Thread({
|
||||||
|
required this.room,
|
||||||
|
required this.rootEvent,
|
||||||
|
required this.client,
|
||||||
|
required this.currentUserParticipated,
|
||||||
|
required this.count,
|
||||||
|
required this.notificationCount,
|
||||||
|
required this.highlightCount,
|
||||||
|
this.prev_batch,
|
||||||
|
this.lastEvent,
|
||||||
|
});
|
||||||
|
|
||||||
|
/// Returns true if this room is unread. To check if there are new messages
|
||||||
|
/// in muted rooms, use [hasNewMessages].
|
||||||
|
bool get isUnread => notificationCount > 0;
|
||||||
|
|
||||||
|
Map<String, dynamic> toJson() => {
|
||||||
|
...rootEvent.toJson(),
|
||||||
|
'unsigned': {
|
||||||
|
'm.thread': {
|
||||||
|
'latest_event': lastEvent?.toJson(),
|
||||||
|
'count': count,
|
||||||
|
'current_user_participated': currentUserParticipated,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
factory Thread.fromJson(Map<String, dynamic> json, Client client) {
|
||||||
|
final room = client.getRoomById(json['room_id']);
|
||||||
|
if (room == null) throw Error();
|
||||||
|
Event? lastEvent;
|
||||||
|
if (json['unsigned']?['m.relations']?['m.thread']?['latest_event'] !=
|
||||||
|
null) {
|
||||||
|
lastEvent = Event.fromMatrixEvent(
|
||||||
|
MatrixEvent.fromJson(
|
||||||
|
json['unsigned']?['m.relations']?['m.thread']?['latest_event'],
|
||||||
|
),
|
||||||
|
room,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (json['unsigned']?['m.thread']?['latest_event'] != null) {
|
||||||
|
lastEvent = Event.fromMatrixEvent(
|
||||||
|
MatrixEvent.fromJson(
|
||||||
|
json['unsigned']?['m.thread']?['latest_event'],
|
||||||
|
),
|
||||||
|
room,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// Although I was making this part according to specification, it's a bit off
|
||||||
|
// I have no clue why
|
||||||
|
final thread = Thread(
|
||||||
|
room: room,
|
||||||
|
client: client,
|
||||||
|
rootEvent: Event.fromMatrixEvent(
|
||||||
|
MatrixEvent.fromJson(json),
|
||||||
|
room,
|
||||||
|
),
|
||||||
|
lastEvent: lastEvent,
|
||||||
|
count: json['unsigned']?['m.relations']?['m.thread']?['count'],
|
||||||
|
currentUserParticipated: json['unsigned']?['m.relations']?['m.thread']
|
||||||
|
?['current_user_participated'],
|
||||||
|
highlightCount: 0,
|
||||||
|
notificationCount: 0,
|
||||||
|
);
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Event?> refreshLastEvent({
|
||||||
|
timeout = const Duration(seconds: 30),
|
||||||
|
}) async {
|
||||||
|
final lastEvent = _refreshingLastEvent ??= _refreshLastEvent();
|
||||||
|
_refreshingLastEvent = null;
|
||||||
|
return lastEvent;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Event?>? _refreshingLastEvent;
|
||||||
|
|
||||||
|
Future<Event?> _refreshLastEvent({
|
||||||
|
timeout = const Duration(seconds: 30),
|
||||||
|
}) async {
|
||||||
|
if (room.membership != Membership.join) return null;
|
||||||
|
|
||||||
|
final result = await client
|
||||||
|
.getRelatingEventsWithRelType(
|
||||||
|
room.id,
|
||||||
|
rootEvent.eventId,
|
||||||
|
'm.thread',
|
||||||
|
recurse: true,
|
||||||
|
)
|
||||||
|
.timeout(timeout);
|
||||||
|
final matrixEvent = result.chunk.firstOrNull;
|
||||||
|
if (matrixEvent == null) {
|
||||||
|
if (lastEvent?.type == EventTypes.refreshingLastEvent) {
|
||||||
|
lastEvent = null;
|
||||||
|
}
|
||||||
|
Logs().d(
|
||||||
|
'No last event found for thread ${rootEvent.eventId} in ${rootEvent.roomId}',
|
||||||
|
);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
var event = Event.fromMatrixEvent(
|
||||||
|
matrixEvent,
|
||||||
|
room,
|
||||||
|
status: EventStatus.synced,
|
||||||
|
);
|
||||||
|
if (event.type == EventTypes.Encrypted) {
|
||||||
|
final encryption = client.encryption;
|
||||||
|
if (encryption != null) {
|
||||||
|
event = await encryption.decryptRoomEvent(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastEvent = event;
|
||||||
|
|
||||||
|
return event;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When was the last event received.
|
||||||
|
DateTime get latestEventReceivedTime {
|
||||||
|
final lastEventTime = lastEvent?.originServerTs;
|
||||||
|
if (lastEventTime != null) return lastEventTime;
|
||||||
|
|
||||||
|
if (room.membership == Membership.invite) return DateTime.now();
|
||||||
|
|
||||||
|
return rootEvent.originServerTs;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool get hasNewMessages {
|
||||||
|
final lastEvent = this.lastEvent;
|
||||||
|
|
||||||
|
// There is no known event or the last event is only a state fallback event,
|
||||||
|
// we assume there is no new messages.
|
||||||
|
if (lastEvent == null ||
|
||||||
|
!client.roomPreviewLastEvents.contains(lastEvent.type)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read marker is on the last event so no new messages.
|
||||||
|
if (lastEvent.receipts
|
||||||
|
.any((receipt) => receipt.user.senderId == client.userID!)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the last event is sent, we mark the room as read.
|
||||||
|
if (lastEvent.senderId == client.userID) return false;
|
||||||
|
|
||||||
|
// Get the timestamp of read marker and compare
|
||||||
|
final readAtMilliseconds = room.receiptState.byThread[rootEvent.eventId]?.latestOwnReceipt?.ts ?? 0;
|
||||||
|
return readAtMilliseconds < lastEvent.originServerTs.millisecondsSinceEpoch;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<TimelineChunk?> getEventContext(String eventId) async {
|
||||||
|
// TODO: probably find events with relationship
|
||||||
|
final resp = await client.getEventContext(
|
||||||
|
room.id, eventId,
|
||||||
|
limit: Room.defaultHistoryCount,
|
||||||
|
|
||||||
|
// filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
final events = [
|
||||||
|
if (resp.eventsAfter != null) ...resp.eventsAfter!.reversed,
|
||||||
|
if (resp.event != null) resp.event!,
|
||||||
|
if (resp.eventsBefore != null) ...resp.eventsBefore!,
|
||||||
|
].map((e) => Event.fromMatrixEvent(e, room)).where((e) => e.relationshipType == RelationshipTypes.thread && e.relationshipEventId == rootEvent.eventId).toList();
|
||||||
|
|
||||||
|
// Try again to decrypt encrypted events but don't update the database.
|
||||||
|
if (room.encrypted && client.encryptionEnabled) {
|
||||||
|
for (var i = 0; i < events.length; i++) {
|
||||||
|
if (events[i].type == EventTypes.Encrypted &&
|
||||||
|
events[i].content['can_request_session'] == true) {
|
||||||
|
events[i] = await client.encryption!.decryptRoomEvent(events[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final chunk = TimelineChunk(
|
||||||
|
nextBatch: resp.end ?? '',
|
||||||
|
prevBatch: resp.start ?? '',
|
||||||
|
events: events,
|
||||||
|
);
|
||||||
|
|
||||||
|
return chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<ThreadTimeline> getTimeline({
|
||||||
|
void Function(int index)? onChange,
|
||||||
|
void Function(int index)? onRemove,
|
||||||
|
void Function(int insertID)? onInsert,
|
||||||
|
void Function()? onNewEvent,
|
||||||
|
void Function()? onUpdate,
|
||||||
|
String? eventContextId,
|
||||||
|
int? limit = Room.defaultHistoryCount,
|
||||||
|
}) async {
|
||||||
|
// await postLoad();
|
||||||
|
|
||||||
|
var events = <Event>[];
|
||||||
|
|
||||||
|
await client.database.transaction(() async {
|
||||||
|
events = await client.database.getThreadEventList(
|
||||||
|
this,
|
||||||
|
limit: limit,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
var chunk = TimelineChunk(events: events);
|
||||||
|
// Load the timeline arround eventContextId if set
|
||||||
|
if (eventContextId != null) {
|
||||||
|
if (!events.any((Event event) => event.eventId == eventContextId)) {
|
||||||
|
chunk =
|
||||||
|
await getEventContext(eventContextId) ?? TimelineChunk(events: []);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final timeline = ThreadTimeline(
|
||||||
|
thread: this,
|
||||||
|
chunk: chunk,
|
||||||
|
onChange: onChange,
|
||||||
|
onRemove: onRemove,
|
||||||
|
onInsert: onInsert,
|
||||||
|
onNewEvent: onNewEvent,
|
||||||
|
onUpdate: onUpdate,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Fetch all users from database we have got here.
|
||||||
|
if (eventContextId == null) {
|
||||||
|
final userIds = events.map((event) => event.senderId).toSet();
|
||||||
|
for (final userId in userIds) {
|
||||||
|
if (room.getState(EventTypes.RoomMember, userId) != null) continue;
|
||||||
|
final dbUser = await client.database.getUser(userId, room);
|
||||||
|
if (dbUser != null) room.setState(dbUser);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try again to decrypt encrypted events and update the database.
|
||||||
|
if (room.encrypted && client.encryptionEnabled) {
|
||||||
|
// decrypt messages
|
||||||
|
for (var i = 0; i < chunk.events.length; i++) {
|
||||||
|
if (chunk.events[i].type == EventTypes.Encrypted) {
|
||||||
|
if (eventContextId != null) {
|
||||||
|
// for the fragmented timeline, we don't cache the decrypted
|
||||||
|
//message in the database
|
||||||
|
chunk.events[i] = await client.encryption!.decryptRoomEvent(
|
||||||
|
chunk.events[i],
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// else, we need the database
|
||||||
|
await client.database.transaction(() async {
|
||||||
|
for (var i = 0; i < chunk.events.length; i++) {
|
||||||
|
if (chunk.events[i].content['can_request_session'] == true) {
|
||||||
|
chunk.events[i] = await client.encryption!.decryptRoomEvent(
|
||||||
|
chunk.events[i],
|
||||||
|
store: !room.isArchived,
|
||||||
|
updateType: EventUpdateType.history,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return timeline;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<String?> sendTextEvent(
|
||||||
|
String message, {
|
||||||
|
String? txid,
|
||||||
|
Event? inReplyTo,
|
||||||
|
String? editEventId,
|
||||||
|
bool parseMarkdown = true,
|
||||||
|
bool parseCommands = true,
|
||||||
|
String msgtype = MessageTypes.Text,
|
||||||
|
StringBuffer? commandStdout,
|
||||||
|
bool addMentions = true,
|
||||||
|
|
||||||
|
/// Displays an event in the timeline with the transaction ID as the event
|
||||||
|
/// ID and a status of SENDING, SENT or ERROR until it gets replaced by
|
||||||
|
/// the sync event. Using this can display a different sort order of events
|
||||||
|
/// as the sync event does replace but not relocate the pending event.
|
||||||
|
bool displayPendingEvent = true,
|
||||||
|
}) {
|
||||||
|
return room.sendTextEvent(
|
||||||
|
message,
|
||||||
|
txid: txid,
|
||||||
|
inReplyTo: inReplyTo,
|
||||||
|
editEventId: editEventId,
|
||||||
|
parseCommands: parseCommands,
|
||||||
|
parseMarkdown: parseMarkdown,
|
||||||
|
msgtype: msgtype,
|
||||||
|
commandStdout: commandStdout,
|
||||||
|
addMentions: addMentions,
|
||||||
|
displayPendingEvent: displayPendingEvent,
|
||||||
|
threadLastEventId: lastEvent?.eventId,
|
||||||
|
threadRootEventId: rootEvent.eventId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<String?> sendLocation(String body, String geoUri, {String? txid}) {
|
||||||
|
final event = <String, dynamic>{
|
||||||
|
'msgtype': 'm.location',
|
||||||
|
'body': body,
|
||||||
|
'geo_uri': geoUri,
|
||||||
|
};
|
||||||
|
return room.sendEvent(
|
||||||
|
event,
|
||||||
|
txid: txid,
|
||||||
|
threadLastEventId: lastEvent?.eventId,
|
||||||
|
threadRootEventId: rootEvent.eventId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<String?> sendFileEvent(
|
||||||
|
MatrixFile file, {
|
||||||
|
String? txid,
|
||||||
|
Event? inReplyTo,
|
||||||
|
String? editEventId,
|
||||||
|
int? shrinkImageMaxDimension,
|
||||||
|
MatrixImageFile? thumbnail,
|
||||||
|
Map<String, dynamic>? extraContent,
|
||||||
|
|
||||||
|
/// Displays an event in the timeline with the transaction ID as the event
|
||||||
|
/// ID and a status of SENDING, SENT or ERROR until it gets replaced by
|
||||||
|
/// the sync event. Using this can display a different sort order of events
|
||||||
|
/// as the sync event does replace but not relocate the pending event.
|
||||||
|
bool displayPendingEvent = true,
|
||||||
|
}) async {
|
||||||
|
return await room.sendFileEvent(
|
||||||
|
file,
|
||||||
|
txid: txid,
|
||||||
|
inReplyTo: inReplyTo,
|
||||||
|
editEventId: editEventId,
|
||||||
|
shrinkImageMaxDimension: shrinkImageMaxDimension,
|
||||||
|
thumbnail: thumbnail,
|
||||||
|
extraContent: extraContent,
|
||||||
|
displayPendingEvent: displayPendingEvent,
|
||||||
|
threadLastEventId: lastEvent?.eventId,
|
||||||
|
threadRootEventId: rootEvent.eventId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> setReadMarker({String? eventId, bool? public}) async {
|
||||||
|
if (eventId == null) return null;
|
||||||
|
return await client.postReceipt(
|
||||||
|
room.id,
|
||||||
|
(public ?? client.receiptsPublicByDefault)
|
||||||
|
? ReceiptType.mRead
|
||||||
|
: ReceiptType.mReadPrivate,
|
||||||
|
eventId,
|
||||||
|
threadId: rootEvent.eventId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> setLastEvent(Event event) async {
|
||||||
|
lastEvent = event;
|
||||||
|
final thread = await client.database.getThread(room.id, rootEvent.eventId, client);
|
||||||
|
Logs().v('Set lastEvent to ${room.id}:${rootEvent.eventId} (${event.senderId})');
|
||||||
|
await client.database.storeThread(
|
||||||
|
room.id,
|
||||||
|
rootEvent,
|
||||||
|
lastEvent,
|
||||||
|
currentUserParticipated ?? false,
|
||||||
|
notificationCount,
|
||||||
|
highlightCount,
|
||||||
|
(thread?.count ?? 0) + 1,
|
||||||
|
client,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<int> requestHistory({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
void Function()? onHistoryReceived,
|
||||||
|
direction = Direction.b,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
final prev_batch = this.prev_batch;
|
||||||
|
|
||||||
|
final storeInDatabase = !room.isArchived;
|
||||||
|
|
||||||
|
// Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
|
||||||
|
filter ??= StateFilter(lazyLoadMembers: true);
|
||||||
|
filter.lazyLoadMembers ??= true;
|
||||||
|
|
||||||
|
if (prev_batch == null) {
|
||||||
|
throw 'Tried to request history without a prev_batch token';
|
||||||
|
}
|
||||||
|
|
||||||
|
final resp = await client.getRelatingEvents(
|
||||||
|
room.id,
|
||||||
|
rootEvent.eventId,
|
||||||
|
from: prev_batch,
|
||||||
|
limit: historyCount,
|
||||||
|
dir: direction,
|
||||||
|
recurse: true,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (onHistoryReceived != null) onHistoryReceived();
|
||||||
|
|
||||||
|
await client.database.transaction(() async {
|
||||||
|
if (storeInDatabase && direction == Direction.b) {
|
||||||
|
this.prev_batch = resp.prevBatch;
|
||||||
|
await client.database.setThreadPrevBatch(
|
||||||
|
resp.prevBatch, room.id, rootEvent.eventId, client);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return resp.chunk.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,500 @@
|
||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:matrix/matrix.dart';
|
||||||
|
import 'package:matrix/src/models/timeline_chunk.dart';
|
||||||
|
import 'package:matrix/src/thread.dart';
|
||||||
|
|
||||||
|
// ThreadTimeline: hey RoomTimeline can i copy your homework?
|
||||||
|
// RoomTimeline: sure just don't make it too obvious
|
||||||
|
// ThreadTimeline:
|
||||||
|
|
||||||
|
class ThreadTimeline extends Timeline {
|
||||||
|
final Thread thread;
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<Event> get events => chunk.events;
|
||||||
|
|
||||||
|
TimelineChunk chunk;
|
||||||
|
|
||||||
|
StreamSubscription<Event>? timelineSub;
|
||||||
|
StreamSubscription<Event>? historySub;
|
||||||
|
StreamSubscription<SyncUpdate>? roomSub;
|
||||||
|
StreamSubscription<String>? sessionIdReceivedSub;
|
||||||
|
StreamSubscription<String>? cancelSendEventSub;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool isRequestingHistory = false;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool isFragmentedTimeline = false;
|
||||||
|
|
||||||
|
final Map<String, Event> _eventCache = {};
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool allowNewEvent = true;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool isRequestingFuture = false;
|
||||||
|
|
||||||
|
ThreadTimeline({
|
||||||
|
required this.thread,
|
||||||
|
required this.chunk,
|
||||||
|
super.onUpdate,
|
||||||
|
super.onChange,
|
||||||
|
super.onInsert,
|
||||||
|
super.onRemove,
|
||||||
|
super.onNewEvent,
|
||||||
|
}) {
|
||||||
|
final room = thread.room;
|
||||||
|
timelineSub = room.client.onTimelineEvent.stream.listen(
|
||||||
|
(event) => _handleEventUpdate(event, EventUpdateType.timeline),
|
||||||
|
);
|
||||||
|
historySub = room.client.onHistoryEvent.stream.listen(
|
||||||
|
(event) => _handleEventUpdate(event, EventUpdateType.history),
|
||||||
|
);
|
||||||
|
|
||||||
|
// we want to populate our aggregated events
|
||||||
|
for (final e in events) {
|
||||||
|
addAggregatedEvent(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// we are using a fragmented timeline
|
||||||
|
if (chunk.nextBatch != '') {
|
||||||
|
allowNewEvent = false;
|
||||||
|
isFragmentedTimeline = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void _handleEventUpdate(
|
||||||
|
Event event,
|
||||||
|
EventUpdateType type, {
|
||||||
|
bool update = true,
|
||||||
|
}) {
|
||||||
|
try {
|
||||||
|
if (event.roomId != thread.room.id) return;
|
||||||
|
// Ignore events outside of this thread
|
||||||
|
if (event.relationshipType == RelationshipTypes.thread &&
|
||||||
|
event.relationshipEventId != thread.rootEvent.eventId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.relationshipType == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type == EventUpdateType.timeline) {
|
||||||
|
onNewEvent?.call();
|
||||||
|
}
|
||||||
|
|
||||||
|
final status = event.status;
|
||||||
|
final i = _findEvent(
|
||||||
|
event_id: event.eventId,
|
||||||
|
unsigned_txid: event.transactionId,
|
||||||
|
);
|
||||||
|
if (i < events.length) {
|
||||||
|
// if the old status is larger than the new one, we also want to preserve the old status
|
||||||
|
final oldStatus = events[i].status;
|
||||||
|
events[i] = event;
|
||||||
|
// do we preserve the status? we should allow 0 -> -1 updates and status increases
|
||||||
|
if ((latestEventStatus(status, oldStatus) == oldStatus) &&
|
||||||
|
!(status.isError && oldStatus.isSending)) {
|
||||||
|
events[i].status = oldStatus;
|
||||||
|
}
|
||||||
|
addAggregatedEvent(events[i]);
|
||||||
|
onChange?.call(i);
|
||||||
|
} else {
|
||||||
|
if (type == EventUpdateType.history &&
|
||||||
|
events.indexWhere((e) => e.eventId == event.eventId) != -1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var index = events.length;
|
||||||
|
if (type == EventUpdateType.history) {
|
||||||
|
events.add(event);
|
||||||
|
} else {
|
||||||
|
index = events.firstIndexWhereNotError;
|
||||||
|
events.insert(index, event);
|
||||||
|
}
|
||||||
|
onInsert?.call(index);
|
||||||
|
|
||||||
|
addAggregatedEvent(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
unawaited(thread.setLastEvent(events[events.length - 1]));
|
||||||
|
|
||||||
|
// Handle redaction events
|
||||||
|
if (event.type == EventTypes.Redaction) {
|
||||||
|
final index = _findEvent(event_id: event.redacts);
|
||||||
|
if (index < events.length) {
|
||||||
|
removeAggregatedEvent(events[index]);
|
||||||
|
|
||||||
|
// Is the redacted event a reaction? Then update the event this
|
||||||
|
// belongs to:
|
||||||
|
if (onChange != null) {
|
||||||
|
final relationshipEventId = events[index].relationshipEventId;
|
||||||
|
if (relationshipEventId != null) {
|
||||||
|
onChange?.call(_findEvent(event_id: relationshipEventId));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
events[index].setRedactionEvent(event);
|
||||||
|
onChange?.call(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (update) {
|
||||||
|
onUpdate?.call();
|
||||||
|
}
|
||||||
|
} catch (e, s) {
|
||||||
|
Logs().w('Handle event update failed', e, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Request more previous events from the server.
|
||||||
|
Future<int> getThreadEvents({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
direction = Direction.b,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
// Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
|
||||||
|
filter ??= StateFilter(lazyLoadMembers: true);
|
||||||
|
filter.lazyLoadMembers ??= true;
|
||||||
|
|
||||||
|
final resp = await thread.client.getRelatingEvents(
|
||||||
|
thread.room.id,
|
||||||
|
thread.rootEvent.eventId,
|
||||||
|
dir: direction,
|
||||||
|
from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
|
||||||
|
limit: historyCount,
|
||||||
|
recurse: true,
|
||||||
|
);
|
||||||
|
|
||||||
|
Logs().w(
|
||||||
|
'Loading thread events from server ${resp.chunk.length} ${resp.prevBatch}',
|
||||||
|
);
|
||||||
|
|
||||||
|
if (resp.nextBatch == null) {
|
||||||
|
Logs().w('We reached the end of the timeline');
|
||||||
|
}
|
||||||
|
|
||||||
|
final newNextBatch =
|
||||||
|
direction == Direction.b ? resp.prevBatch : resp.nextBatch;
|
||||||
|
final newPrevBatch =
|
||||||
|
direction == Direction.b ? resp.nextBatch : resp.prevBatch;
|
||||||
|
|
||||||
|
final type = direction == Direction.b
|
||||||
|
? EventUpdateType.history
|
||||||
|
: EventUpdateType.timeline;
|
||||||
|
|
||||||
|
// I dont know what this piece of code does
|
||||||
|
// if ((resp.state?.length ?? 0) == 0 &&
|
||||||
|
// resp.start != resp.end &&
|
||||||
|
// newPrevBatch != null &&
|
||||||
|
// newNextBatch != null) {
|
||||||
|
// if (type == EventUpdateType.history) {
|
||||||
|
// Logs().w(
|
||||||
|
// '[nav] we can still request history prevBatch: $type $newPrevBatch',
|
||||||
|
// );
|
||||||
|
// } else {
|
||||||
|
// Logs().w(
|
||||||
|
// '[nav] we can still request timeline nextBatch: $type $newNextBatch',
|
||||||
|
// );
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
final newEvents =
|
||||||
|
resp.chunk.map((e) => Event.fromMatrixEvent(e, thread.room)).toList();
|
||||||
|
|
||||||
|
if (!allowNewEvent) {
|
||||||
|
if (resp.prevBatch == resp.nextBatch ||
|
||||||
|
(resp.nextBatch == null && direction == Direction.f)) {
|
||||||
|
allowNewEvent = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allowNewEvent) {
|
||||||
|
Logs().d('We now allow sync update into the timeline.');
|
||||||
|
newEvents.addAll(
|
||||||
|
await thread.client.database
|
||||||
|
.getThreadEventList(thread, onlySending: true),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to decrypt encrypted events but don't update the database.
|
||||||
|
if (thread.room.encrypted && thread.client.encryptionEnabled) {
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
if (newEvents[i].type == EventTypes.Encrypted) {
|
||||||
|
newEvents[i] = await thread.client.encryption!.decryptRoomEvent(
|
||||||
|
newEvents[i],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update chunk anchors
|
||||||
|
if (type == EventUpdateType.history) {
|
||||||
|
chunk.prevBatch = newPrevBatch ?? '';
|
||||||
|
|
||||||
|
final offset = chunk.events.length;
|
||||||
|
|
||||||
|
chunk.events.addAll(newEvents);
|
||||||
|
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
onInsert?.call(i + offset);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
chunk.nextBatch = newNextBatch ?? '';
|
||||||
|
chunk.events.insertAll(0, newEvents.reversed);
|
||||||
|
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (onUpdate != null) {
|
||||||
|
onUpdate!();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (final e in events) {
|
||||||
|
addAggregatedEvent(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.chunk.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _requestEvents({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
required Direction direction,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
onUpdate?.call();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Look up for events in the database first. With fragmented view, we should delete the database cache
|
||||||
|
final eventsFromStore = isFragmentedTimeline
|
||||||
|
? null
|
||||||
|
: await thread.client.database.getThreadEventList(
|
||||||
|
thread,
|
||||||
|
start: events.length,
|
||||||
|
limit: historyCount,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
|
||||||
|
for (final e in eventsFromStore) {
|
||||||
|
addAggregatedEvent(e);
|
||||||
|
}
|
||||||
|
// Fetch all users from database we have got here.
|
||||||
|
for (final event in events) {
|
||||||
|
if (thread.room.getState(EventTypes.RoomMember, event.senderId) !=
|
||||||
|
null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final dbUser =
|
||||||
|
await thread.client.database.getUser(event.senderId, thread.room);
|
||||||
|
if (dbUser != null) thread.room.setState(dbUser);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (direction == Direction.b) {
|
||||||
|
events.addAll(eventsFromStore);
|
||||||
|
final startIndex = events.length - eventsFromStore.length;
|
||||||
|
final endIndex = events.length;
|
||||||
|
for (var i = startIndex; i < endIndex; i++) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
events.insertAll(0, eventsFromStore);
|
||||||
|
final startIndex = eventsFromStore.length;
|
||||||
|
final endIndex = 0;
|
||||||
|
for (var i = startIndex; i > endIndex; i--) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Logs().i('No more events found in the store. Request from server...');
|
||||||
|
|
||||||
|
if (isFragmentedTimeline) {
|
||||||
|
await getThreadEvents(
|
||||||
|
historyCount: historyCount,
|
||||||
|
direction: direction,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
if (thread.prev_batch == null) {
|
||||||
|
Logs().i('No more events to request from server...');
|
||||||
|
} else {
|
||||||
|
await thread.requestHistory(
|
||||||
|
historyCount: historyCount,
|
||||||
|
direction: direction,
|
||||||
|
onHistoryReceived: () {},
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
isRequestingHistory = false;
|
||||||
|
onUpdate?.call();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add an event to the aggregation tree
|
||||||
|
void addAggregatedEvent(Event event) {
|
||||||
|
final relationshipType = event.relationshipType;
|
||||||
|
final relationshipEventId = event.relationshipEventId;
|
||||||
|
if (relationshipType == null ||
|
||||||
|
relationshipType == RelationshipTypes.thread ||
|
||||||
|
relationshipEventId == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Logs().w(
|
||||||
|
// 'Adding aggregated event ${event.type} ${event.eventId} to $relationshipEventId ($relationshipType)');
|
||||||
|
final e = (aggregatedEvents[relationshipEventId] ??=
|
||||||
|
<String, Set<Event>>{})[relationshipType] ??= <Event>{};
|
||||||
|
_removeEventFromSet(e, event);
|
||||||
|
e.add(event);
|
||||||
|
if (onChange != null) {
|
||||||
|
final index = _findEvent(event_id: relationshipEventId);
|
||||||
|
onChange?.call(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove an event from aggregation
|
||||||
|
void removeAggregatedEvent(Event event) {
|
||||||
|
aggregatedEvents.remove(event.eventId);
|
||||||
|
if (event.transactionId != null) {
|
||||||
|
aggregatedEvents.remove(event.transactionId);
|
||||||
|
}
|
||||||
|
for (final types in aggregatedEvents.values) {
|
||||||
|
for (final e in types.values) {
|
||||||
|
_removeEventFromSet(e, event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove event from set based on event or transaction ID
|
||||||
|
void _removeEventFromSet(Set<Event> eventSet, Event event) {
|
||||||
|
eventSet.removeWhere(
|
||||||
|
(e) =>
|
||||||
|
e.matchesEventOrTransactionId(event.eventId) ||
|
||||||
|
event.unsigned != null &&
|
||||||
|
e.matchesEventOrTransactionId(event.transactionId),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find event index by event ID or transaction ID
|
||||||
|
int _findEvent({String? event_id, String? unsigned_txid}) {
|
||||||
|
final searchNeedle = <String>{};
|
||||||
|
if (event_id != null) searchNeedle.add(event_id);
|
||||||
|
if (unsigned_txid != null) searchNeedle.add(unsigned_txid);
|
||||||
|
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < events.length; i++) {
|
||||||
|
final searchHaystack = <String>{events[i].eventId};
|
||||||
|
final txnid = events[i].transactionId;
|
||||||
|
if (txnid != null) searchHaystack.add(txnid);
|
||||||
|
if (searchNeedle.intersection(searchHaystack).isNotEmpty) break;
|
||||||
|
}
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
void cancelSubscriptions() {
|
||||||
|
// TODO: implement cancelSubscriptions
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<Event?> getEventById(String id) async {
|
||||||
|
for (final event in events) {
|
||||||
|
if (event.eventId == id) return event;
|
||||||
|
}
|
||||||
|
if (_eventCache.containsKey(id)) return _eventCache[id];
|
||||||
|
final requestedEvent = await thread.room.getEventById(id);
|
||||||
|
if (requestedEvent == null) return null;
|
||||||
|
_eventCache[id] = requestedEvent;
|
||||||
|
return _eventCache[id];
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> requestHistory({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
if (isRequestingHistory) return;
|
||||||
|
isRequestingHistory = true;
|
||||||
|
await _requestEvents(
|
||||||
|
direction: Direction.b,
|
||||||
|
historyCount: historyCount,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
isRequestingHistory = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> setReadMarker({String? eventId, bool? public}) {
|
||||||
|
return thread.setReadMarker(
|
||||||
|
eventId: eventId,
|
||||||
|
public: public,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Stream<(List<Event>, String?)> startSearch({
|
||||||
|
String? searchTerm,
|
||||||
|
int requestHistoryCount = 100,
|
||||||
|
int maxHistoryRequests = 10,
|
||||||
|
String? prevBatch,
|
||||||
|
String? sinceEventId,
|
||||||
|
int? limit,
|
||||||
|
bool Function(Event p1)? searchFunc,
|
||||||
|
}) {
|
||||||
|
// TODO: implement startSearch
|
||||||
|
throw UnimplementedError();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get canRequestFuture => chunk.nextBatch.isNotEmpty;
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool get canRequestHistory => chunk.prevBatch.isNotEmpty;
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> requestFuture({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
if (isRequestingFuture || !canRequestFuture) return;
|
||||||
|
isRequestingFuture = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await getThreadEvents(
|
||||||
|
historyCount: historyCount,
|
||||||
|
direction: Direction.f,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
isRequestingFuture = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> requestKeys({
|
||||||
|
bool tryOnlineBackup = true,
|
||||||
|
bool onlineKeyBackupOnly = true,
|
||||||
|
}) async {
|
||||||
|
for (final event in events) {
|
||||||
|
if (event.type == EventTypes.Encrypted &&
|
||||||
|
event.messageType == MessageTypes.BadEncrypted &&
|
||||||
|
event.content['can_request_session'] == true) {
|
||||||
|
final sessionId = event.content.tryGet<String>('session_id');
|
||||||
|
final senderKey = event.content.tryGet<String>('sender_key');
|
||||||
|
if (sessionId != null && senderKey != null) {
|
||||||
|
await thread.room.requestSessionKey(sessionId, senderKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -17,598 +17,87 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:convert';
|
|
||||||
|
|
||||||
import 'package:collection/collection.dart';
|
|
||||||
|
|
||||||
import 'package:matrix/matrix.dart';
|
import 'package:matrix/matrix.dart';
|
||||||
import 'package:matrix/src/models/timeline_chunk.dart';
|
|
||||||
|
|
||||||
/// Represents the timeline of a room. The callback [onUpdate] will be triggered
|
/// Abstract base class for all timeline implementations.
|
||||||
/// automatically. The initial
|
/// Provides common functionality for event management, aggregation, and search.
|
||||||
/// event list will be retreived when created by the `room.getTimeline()` method.
|
abstract class Timeline {
|
||||||
|
/// The list of events in this timeline
|
||||||
class Timeline {
|
List<Event> get events;
|
||||||
final Room room;
|
|
||||||
List<Event> get events => chunk.events;
|
|
||||||
|
|
||||||
/// Map of event ID to map of type to set of aggregated events
|
/// Map of event ID to map of type to set of aggregated events
|
||||||
final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
|
final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
|
||||||
|
|
||||||
|
/// Called when the timeline is updated
|
||||||
final void Function()? onUpdate;
|
final void Function()? onUpdate;
|
||||||
|
|
||||||
|
/// Called when an event at specific index changes
|
||||||
final void Function(int index)? onChange;
|
final void Function(int index)? onChange;
|
||||||
|
|
||||||
|
/// Called when an event is inserted at specific index
|
||||||
final void Function(int index)? onInsert;
|
final void Function(int index)? onInsert;
|
||||||
|
|
||||||
|
/// Called when an event is removed from specific index
|
||||||
final void Function(int index)? onRemove;
|
final void Function(int index)? onRemove;
|
||||||
|
|
||||||
|
/// Called when a new event is added to the timeline
|
||||||
final void Function()? onNewEvent;
|
final void Function()? onNewEvent;
|
||||||
|
|
||||||
StreamSubscription<Event>? timelineSub;
|
bool get canRequestHistory;
|
||||||
StreamSubscription<Event>? historySub;
|
bool get canRequestFuture;
|
||||||
StreamSubscription<SyncUpdate>? roomSub;
|
bool get allowNewEvent;
|
||||||
StreamSubscription<String>? sessionIdReceivedSub;
|
bool get isRequestingFuture;
|
||||||
StreamSubscription<String>? cancelSendEventSub;
|
bool get isRequestingHistory;
|
||||||
bool isRequestingHistory = false;
|
bool get isFragmentedTimeline;
|
||||||
bool isRequestingFuture = false;
|
|
||||||
|
|
||||||
bool allowNewEvent = true;
|
|
||||||
bool isFragmentedTimeline = false;
|
|
||||||
|
|
||||||
final Map<String, Event> _eventCache = {};
|
|
||||||
|
|
||||||
TimelineChunk chunk;
|
|
||||||
|
|
||||||
/// Searches for the event in this timeline. If not
|
|
||||||
/// found, requests from the server. Requested events
|
|
||||||
/// are cached.
|
|
||||||
Future<Event?> getEventById(String id) async {
|
|
||||||
for (final event in events) {
|
|
||||||
if (event.eventId == id) return event;
|
|
||||||
}
|
|
||||||
if (_eventCache.containsKey(id)) return _eventCache[id];
|
|
||||||
final requestedEvent = await room.getEventById(id);
|
|
||||||
if (requestedEvent == null) return null;
|
|
||||||
_eventCache[id] = requestedEvent;
|
|
||||||
return _eventCache[id];
|
|
||||||
}
|
|
||||||
|
|
||||||
// When fetching history, we will collect them into the `_historyUpdates` set
|
|
||||||
// first, and then only process all events at once, once we have the full history.
|
|
||||||
// This ensures that the entire history fetching only triggers `onUpdate` only *once*,
|
|
||||||
// even if /sync's complete while history is being proccessed.
|
|
||||||
bool _collectHistoryUpdates = false;
|
|
||||||
|
|
||||||
// We confirmed, that there are no more events to load from the database.
|
|
||||||
bool _fetchedAllDatabaseEvents = false;
|
|
||||||
|
|
||||||
bool get canRequestHistory {
|
|
||||||
if (!{Membership.join, Membership.leave}.contains(room.membership)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (events.isEmpty) return true;
|
|
||||||
return !_fetchedAllDatabaseEvents ||
|
|
||||||
(room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request more previous events from the server. [historyCount] defines how many events should
|
|
||||||
/// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
|
|
||||||
/// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
|
|
||||||
/// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
|
|
||||||
/// true by default, but this can be overridden.
|
|
||||||
/// This method does not return a value.
|
|
||||||
Future<void> requestHistory({
|
|
||||||
int historyCount = Room.defaultHistoryCount,
|
|
||||||
StateFilter? filter,
|
|
||||||
}) async {
|
|
||||||
if (isRequestingHistory) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
isRequestingHistory = true;
|
|
||||||
await _requestEvents(
|
|
||||||
direction: Direction.b,
|
|
||||||
historyCount: historyCount,
|
|
||||||
filter: filter,
|
|
||||||
);
|
|
||||||
isRequestingHistory = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool get canRequestFuture => !allowNewEvent;
|
|
||||||
|
|
||||||
/// Request more future events from the server. [historyCount] defines how many events should
|
|
||||||
/// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
|
|
||||||
/// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
|
|
||||||
/// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
|
|
||||||
/// true by default, but this can be overridden.
|
|
||||||
/// This method does not return a value.
|
|
||||||
Future<void> requestFuture({
|
|
||||||
int historyCount = Room.defaultHistoryCount,
|
|
||||||
StateFilter? filter,
|
|
||||||
}) async {
|
|
||||||
if (allowNewEvent) {
|
|
||||||
return; // we shouldn't force to add new events if they will autatically be added
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isRequestingFuture) return;
|
|
||||||
isRequestingFuture = true;
|
|
||||||
await _requestEvents(
|
|
||||||
direction: Direction.f,
|
|
||||||
historyCount: historyCount,
|
|
||||||
filter: filter,
|
|
||||||
);
|
|
||||||
isRequestingFuture = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<void> _requestEvents({
|
|
||||||
int historyCount = Room.defaultHistoryCount,
|
|
||||||
required Direction direction,
|
|
||||||
StateFilter? filter,
|
|
||||||
}) async {
|
|
||||||
onUpdate?.call();
|
|
||||||
|
|
||||||
try {
|
|
||||||
// Look up for events in the database first. With fragmented view, we should delete the database cache
|
|
||||||
final eventsFromStore = isFragmentedTimeline
|
|
||||||
? null
|
|
||||||
: await room.client.database.getEventList(
|
|
||||||
room,
|
|
||||||
start: events.length,
|
|
||||||
limit: historyCount,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
|
|
||||||
for (final e in eventsFromStore) {
|
|
||||||
addAggregatedEvent(e);
|
|
||||||
}
|
|
||||||
// Fetch all users from database we have got here.
|
|
||||||
for (final event in events) {
|
|
||||||
if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
final dbUser =
|
|
||||||
await room.client.database.getUser(event.senderId, room);
|
|
||||||
if (dbUser != null) room.setState(dbUser);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (direction == Direction.b) {
|
|
||||||
events.addAll(eventsFromStore);
|
|
||||||
final startIndex = events.length - eventsFromStore.length;
|
|
||||||
final endIndex = events.length;
|
|
||||||
for (var i = startIndex; i < endIndex; i++) {
|
|
||||||
onInsert?.call(i);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
events.insertAll(0, eventsFromStore);
|
|
||||||
final startIndex = eventsFromStore.length;
|
|
||||||
final endIndex = 0;
|
|
||||||
for (var i = startIndex; i > endIndex; i--) {
|
|
||||||
onInsert?.call(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
_fetchedAllDatabaseEvents = true;
|
|
||||||
Logs().i('No more events found in the store. Request from server...');
|
|
||||||
|
|
||||||
if (isFragmentedTimeline) {
|
|
||||||
await getRoomEvents(
|
|
||||||
historyCount: historyCount,
|
|
||||||
direction: direction,
|
|
||||||
filter: filter,
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
if (room.prev_batch == null) {
|
|
||||||
Logs().i('No more events to request from server...');
|
|
||||||
} else {
|
|
||||||
await room.requestHistory(
|
|
||||||
historyCount: historyCount,
|
|
||||||
direction: direction,
|
|
||||||
onHistoryReceived: () {
|
|
||||||
_collectHistoryUpdates = true;
|
|
||||||
},
|
|
||||||
filter: filter,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
_collectHistoryUpdates = false;
|
|
||||||
isRequestingHistory = false;
|
|
||||||
onUpdate?.call();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request more previous events from the server. [historyCount] defines how much events should
|
|
||||||
/// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
|
|
||||||
/// the historical events will be published in the onEvent stream. [filter] allows you to specify a
|
|
||||||
/// [StateFilter] object to filter the events, which can include various criteria such as
|
|
||||||
/// event types (e.g., [EventTypes.Message]) and other state-related filters.
|
|
||||||
/// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden.
|
|
||||||
/// Returns the actual count of received timeline events.
|
|
||||||
Future<int> getRoomEvents({
|
|
||||||
int historyCount = Room.defaultHistoryCount,
|
|
||||||
direction = Direction.b,
|
|
||||||
StateFilter? filter,
|
|
||||||
}) async {
|
|
||||||
// Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
|
|
||||||
filter ??= StateFilter(lazyLoadMembers: true);
|
|
||||||
filter.lazyLoadMembers ??= true;
|
|
||||||
|
|
||||||
final resp = await room.client.getRoomEvents(
|
|
||||||
room.id,
|
|
||||||
direction,
|
|
||||||
from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
|
|
||||||
limit: historyCount,
|
|
||||||
filter: jsonEncode(filter.toJson()),
|
|
||||||
);
|
|
||||||
|
|
||||||
if (resp.end == null) {
|
|
||||||
Logs().w('We reached the end of the timeline');
|
|
||||||
}
|
|
||||||
|
|
||||||
final newNextBatch = direction == Direction.b ? resp.start : resp.end;
|
|
||||||
final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
|
|
||||||
|
|
||||||
final type = direction == Direction.b
|
|
||||||
? EventUpdateType.history
|
|
||||||
: EventUpdateType.timeline;
|
|
||||||
|
|
||||||
if ((resp.state?.length ?? 0) == 0 &&
|
|
||||||
resp.start != resp.end &&
|
|
||||||
newPrevBatch != null &&
|
|
||||||
newNextBatch != null) {
|
|
||||||
if (type == EventUpdateType.history) {
|
|
||||||
Logs().w(
|
|
||||||
'[nav] we can still request history prevBatch: $type $newPrevBatch',
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
Logs().w(
|
|
||||||
'[nav] we can still request timeline nextBatch: $type $newNextBatch',
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final newEvents =
|
|
||||||
resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
|
|
||||||
|
|
||||||
if (!allowNewEvent) {
|
|
||||||
if (resp.start == resp.end ||
|
|
||||||
(resp.end == null && direction == Direction.f)) {
|
|
||||||
allowNewEvent = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (allowNewEvent) {
|
|
||||||
Logs().d('We now allow sync update into the timeline.');
|
|
||||||
newEvents.addAll(
|
|
||||||
await room.client.database.getEventList(room, onlySending: true),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to decrypt encrypted events but don't update the database.
|
|
||||||
if (room.encrypted && room.client.encryptionEnabled) {
|
|
||||||
for (var i = 0; i < newEvents.length; i++) {
|
|
||||||
if (newEvents[i].type == EventTypes.Encrypted) {
|
|
||||||
newEvents[i] = await room.client.encryption!.decryptRoomEvent(
|
|
||||||
newEvents[i],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// update chunk anchors
|
|
||||||
if (type == EventUpdateType.history) {
|
|
||||||
chunk.prevBatch = newPrevBatch ?? '';
|
|
||||||
|
|
||||||
final offset = chunk.events.length;
|
|
||||||
|
|
||||||
chunk.events.addAll(newEvents);
|
|
||||||
|
|
||||||
for (var i = 0; i < newEvents.length; i++) {
|
|
||||||
onInsert?.call(i + offset);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
chunk.nextBatch = newNextBatch ?? '';
|
|
||||||
chunk.events.insertAll(0, newEvents.reversed);
|
|
||||||
|
|
||||||
for (var i = 0; i < newEvents.length; i++) {
|
|
||||||
onInsert?.call(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (onUpdate != null) {
|
|
||||||
onUpdate!();
|
|
||||||
}
|
|
||||||
return resp.chunk.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
Timeline({
|
Timeline({
|
||||||
required this.room,
|
|
||||||
this.onUpdate,
|
this.onUpdate,
|
||||||
this.onChange,
|
this.onChange,
|
||||||
this.onInsert,
|
this.onInsert,
|
||||||
this.onRemove,
|
this.onRemove,
|
||||||
this.onNewEvent,
|
this.onNewEvent,
|
||||||
required this.chunk,
|
});
|
||||||
}) {
|
|
||||||
timelineSub = room.client.onTimelineEvent.stream.listen(
|
|
||||||
(event) => _handleEventUpdate(
|
|
||||||
event,
|
|
||||||
EventUpdateType.timeline,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
historySub = room.client.onHistoryEvent.stream.listen(
|
|
||||||
(event) => _handleEventUpdate(
|
|
||||||
event,
|
|
||||||
EventUpdateType.history,
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
// If the timeline is limited we want to clear our events cache
|
/// Searches for the event in this timeline. If not found, requests from server.
|
||||||
roomSub = room.client.onSync.stream
|
Future<Event?> getEventById(String id);
|
||||||
.where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
|
|
||||||
.listen(_removeEventsNotInThisSync);
|
|
||||||
|
|
||||||
sessionIdReceivedSub =
|
/// Request more previous events
|
||||||
room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
|
Future<void> requestHistory({
|
||||||
cancelSendEventSub =
|
int historyCount = Room.defaultHistoryCount,
|
||||||
room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
|
StateFilter? filter,
|
||||||
|
});
|
||||||
|
|
||||||
// we want to populate our aggregated events
|
/// Request more future events
|
||||||
for (final e in events) {
|
Future<void> requestFuture({
|
||||||
addAggregatedEvent(e);
|
int historyCount = Room.defaultHistoryCount,
|
||||||
}
|
StateFilter? filter,
|
||||||
|
});
|
||||||
|
|
||||||
// we are using a fragmented timeline
|
/// Set the read marker to an event in this timeline
|
||||||
if (chunk.nextBatch != '') {
|
Future<void> setReadMarker({String? eventId, bool? public});
|
||||||
allowNewEvent = false;
|
|
||||||
isFragmentedTimeline = true;
|
|
||||||
// fragmented timelines never read from the database.
|
|
||||||
_fetchedAllDatabaseEvents = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void _cleanUpCancelledEvent(String eventId) {
|
/// Request keys for undecryptable events
|
||||||
final i = _findEvent(event_id: eventId);
|
|
||||||
if (i < events.length) {
|
|
||||||
removeAggregatedEvent(events[i]);
|
|
||||||
events.removeAt(i);
|
|
||||||
onRemove?.call(i);
|
|
||||||
onUpdate?.call();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Removes all entries from [events] which are not in this SyncUpdate.
|
|
||||||
void _removeEventsNotInThisSync(SyncUpdate sync) {
|
|
||||||
final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
|
|
||||||
final keepEventIds = newSyncEvents.map((e) => e.eventId);
|
|
||||||
events.removeWhere((e) => !keepEventIds.contains(e.eventId));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Don't forget to call this before you dismiss this object!
|
|
||||||
void cancelSubscriptions() {
|
|
||||||
// ignore: discarded_futures
|
|
||||||
timelineSub?.cancel();
|
|
||||||
// ignore: discarded_futures
|
|
||||||
historySub?.cancel();
|
|
||||||
// ignore: discarded_futures
|
|
||||||
roomSub?.cancel();
|
|
||||||
// ignore: discarded_futures
|
|
||||||
sessionIdReceivedSub?.cancel();
|
|
||||||
// ignore: discarded_futures
|
|
||||||
cancelSendEventSub?.cancel();
|
|
||||||
}
|
|
||||||
|
|
||||||
void _sessionKeyReceived(String sessionId) async {
|
|
||||||
var decryptAtLeastOneEvent = false;
|
|
||||||
Future<void> decryptFn() async {
|
|
||||||
final encryption = room.client.encryption;
|
|
||||||
if (!room.client.encryptionEnabled || encryption == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (var i = 0; i < events.length; i++) {
|
|
||||||
if (events[i].type == EventTypes.Encrypted &&
|
|
||||||
events[i].messageType == MessageTypes.BadEncrypted &&
|
|
||||||
events[i].content['session_id'] == sessionId) {
|
|
||||||
events[i] = await encryption.decryptRoomEvent(
|
|
||||||
events[i],
|
|
||||||
store: true,
|
|
||||||
updateType: EventUpdateType.history,
|
|
||||||
);
|
|
||||||
addAggregatedEvent(events[i]);
|
|
||||||
onChange?.call(i);
|
|
||||||
if (events[i].type != EventTypes.Encrypted) {
|
|
||||||
decryptAtLeastOneEvent = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await room.client.database.transaction(decryptFn);
|
|
||||||
if (decryptAtLeastOneEvent) onUpdate?.call();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Request the keys for undecryptable events of this timeline
|
|
||||||
void requestKeys({
|
void requestKeys({
|
||||||
bool tryOnlineBackup = true,
|
bool tryOnlineBackup = true,
|
||||||
bool onlineKeyBackupOnly = true,
|
bool onlineKeyBackupOnly = true,
|
||||||
}) {
|
});
|
||||||
for (final event in events) {
|
|
||||||
if (event.type == EventTypes.Encrypted &&
|
|
||||||
event.messageType == MessageTypes.BadEncrypted &&
|
|
||||||
event.content['can_request_session'] == true) {
|
|
||||||
final sessionId = event.content.tryGet<String>('session_id');
|
|
||||||
final senderKey = event.content.tryGet<String>('sender_key');
|
|
||||||
if (sessionId != null && senderKey != null) {
|
|
||||||
room.client.encryption?.keyManager.maybeAutoRequest(
|
|
||||||
room.id,
|
|
||||||
sessionId,
|
|
||||||
senderKey,
|
|
||||||
tryOnlineBackup: tryOnlineBackup,
|
|
||||||
onlineKeyBackupOnly: onlineKeyBackupOnly,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set the read marker to the last synced event in this timeline.
|
/// Search events in this timeline
|
||||||
Future<void> setReadMarker({String? eventId, bool? public}) async {
|
Stream<(List<Event>, String?)> startSearch({
|
||||||
eventId ??=
|
String? searchTerm,
|
||||||
events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
|
int requestHistoryCount = 100,
|
||||||
if (eventId == null) return;
|
int maxHistoryRequests = 10,
|
||||||
return room.setReadMarker(eventId, mRead: eventId, public: public);
|
String? prevBatch,
|
||||||
}
|
@Deprecated('Use [prevBatch] instead.') String? sinceEventId,
|
||||||
|
int? limit,
|
||||||
|
bool Function(Event)? searchFunc,
|
||||||
|
});
|
||||||
|
|
||||||
int _findEvent({String? event_id, String? unsigned_txid}) {
|
/// Handle event updates (to be implemented by subclasses)
|
||||||
// we want to find any existing event where either the passed event_id or the passed unsigned_txid
|
void _handleEventUpdate(Event event, EventUpdateType type, {bool update = true});
|
||||||
// matches either the event_id or transaction_id of the existing event.
|
|
||||||
// For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
|
|
||||||
// Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
|
|
||||||
// thus meaning we found our element.
|
|
||||||
final searchNeedle = <String>{};
|
|
||||||
if (event_id != null) {
|
|
||||||
searchNeedle.add(event_id);
|
|
||||||
}
|
|
||||||
if (unsigned_txid != null) {
|
|
||||||
searchNeedle.add(unsigned_txid);
|
|
||||||
}
|
|
||||||
int i;
|
|
||||||
for (i = 0; i < events.length; i++) {
|
|
||||||
final searchHaystack = <String>{events[i].eventId};
|
|
||||||
|
|
||||||
final txnid = events[i].transactionId;
|
/// Cancel all subscriptions
|
||||||
if (txnid != null) {
|
void cancelSubscriptions();
|
||||||
searchHaystack.add(txnid);
|
|
||||||
}
|
|
||||||
if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
|
|
||||||
void _removeEventFromSet(Set<Event> eventSet, Event event) {
|
|
||||||
eventSet.removeWhere(
|
|
||||||
(e) =>
|
|
||||||
e.matchesEventOrTransactionId(event.eventId) ||
|
|
||||||
event.unsigned != null &&
|
|
||||||
e.matchesEventOrTransactionId(event.transactionId),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
void addAggregatedEvent(Event event) {
|
|
||||||
// we want to add an event to the aggregation tree
|
|
||||||
final relationshipType = event.relationshipType;
|
|
||||||
final relationshipEventId = event.relationshipEventId;
|
|
||||||
if (relationshipType == null || relationshipEventId == null) {
|
|
||||||
return; // nothing to do
|
|
||||||
}
|
|
||||||
final e = (aggregatedEvents[relationshipEventId] ??=
|
|
||||||
<String, Set<Event>>{})[relationshipType] ??= <Event>{};
|
|
||||||
// remove a potential old event
|
|
||||||
_removeEventFromSet(e, event);
|
|
||||||
// add the new one
|
|
||||||
e.add(event);
|
|
||||||
if (onChange != null) {
|
|
||||||
final index = _findEvent(event_id: relationshipEventId);
|
|
||||||
onChange?.call(index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void removeAggregatedEvent(Event event) {
|
|
||||||
aggregatedEvents.remove(event.eventId);
|
|
||||||
if (event.transactionId != null) {
|
|
||||||
aggregatedEvents.remove(event.transactionId);
|
|
||||||
}
|
|
||||||
for (final types in aggregatedEvents.values) {
|
|
||||||
for (final e in types.values) {
|
|
||||||
_removeEventFromSet(e, event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void _handleEventUpdate(
|
|
||||||
Event event,
|
|
||||||
EventUpdateType type, {
|
|
||||||
bool update = true,
|
|
||||||
}) {
|
|
||||||
try {
|
|
||||||
if (event.roomId != room.id) return;
|
|
||||||
|
|
||||||
if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type == EventUpdateType.timeline) {
|
|
||||||
onNewEvent?.call();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!allowNewEvent) return;
|
|
||||||
|
|
||||||
final status = event.status;
|
|
||||||
|
|
||||||
final i = _findEvent(
|
|
||||||
event_id: event.eventId,
|
|
||||||
unsigned_txid: event.transactionId,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (i < events.length) {
|
|
||||||
// if the old status is larger than the new one, we also want to preserve the old status
|
|
||||||
final oldStatus = events[i].status;
|
|
||||||
events[i] = event;
|
|
||||||
// do we preserve the status? we should allow 0 -> -1 updates and status increases
|
|
||||||
if ((latestEventStatus(status, oldStatus) == oldStatus) &&
|
|
||||||
!(status.isError && oldStatus.isSending)) {
|
|
||||||
events[i].status = oldStatus;
|
|
||||||
}
|
|
||||||
addAggregatedEvent(events[i]);
|
|
||||||
onChange?.call(i);
|
|
||||||
} else {
|
|
||||||
if (type == EventUpdateType.history &&
|
|
||||||
events.indexWhere(
|
|
||||||
(e) => e.eventId == event.eventId,
|
|
||||||
) !=
|
|
||||||
-1) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
var index = events.length;
|
|
||||||
if (type == EventUpdateType.history) {
|
|
||||||
events.add(event);
|
|
||||||
} else {
|
|
||||||
index = events.firstIndexWhereNotError;
|
|
||||||
events.insert(index, event);
|
|
||||||
}
|
|
||||||
onInsert?.call(index);
|
|
||||||
|
|
||||||
addAggregatedEvent(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle redaction events
|
|
||||||
if (event.type == EventTypes.Redaction) {
|
|
||||||
final index = _findEvent(event_id: event.redacts);
|
|
||||||
if (index < events.length) {
|
|
||||||
removeAggregatedEvent(events[index]);
|
|
||||||
|
|
||||||
// Is the redacted event a reaction? Then update the event this
|
|
||||||
// belongs to:
|
|
||||||
if (onChange != null) {
|
|
||||||
final relationshipEventId = events[index].relationshipEventId;
|
|
||||||
if (relationshipEventId != null) {
|
|
||||||
onChange?.call(_findEvent(event_id: relationshipEventId));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
events[index].setRedactionEvent(event);
|
|
||||||
onChange?.call(index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (update && !_collectHistoryUpdates) {
|
|
||||||
onUpdate?.call();
|
|
||||||
}
|
|
||||||
} catch (e, s) {
|
|
||||||
Logs().w('Handle event update failed', e, s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated('Use [startSearch] instead.')
|
@Deprecated('Use [startSearch] instead.')
|
||||||
Stream<List<Event>> searchEvent({
|
Stream<List<Event>> searchEvent({
|
||||||
|
|
@ -623,114 +112,18 @@ class Timeline {
|
||||||
searchTerm: searchTerm,
|
searchTerm: searchTerm,
|
||||||
requestHistoryCount: requestHistoryCount,
|
requestHistoryCount: requestHistoryCount,
|
||||||
maxHistoryRequests: maxHistoryRequests,
|
maxHistoryRequests: maxHistoryRequests,
|
||||||
// ignore: deprecated_member_use_from_same_package
|
|
||||||
sinceEventId: sinceEventId,
|
sinceEventId: sinceEventId,
|
||||||
limit: limit,
|
limit: limit,
|
||||||
searchFunc: searchFunc,
|
searchFunc: searchFunc,
|
||||||
).map((result) => result.$1);
|
).map((result) => result.$1);
|
||||||
|
|
||||||
/// Searches [searchTerm] in this timeline. It first searches in the
|
|
||||||
/// cache, then in the database and then on the server. The search can
|
|
||||||
/// take a while, which is why this returns a stream so the already found
|
|
||||||
/// events can already be displayed.
|
|
||||||
/// Override the [searchFunc] if you need another search. This will then
|
|
||||||
/// ignore [searchTerm].
|
|
||||||
/// Returns the List of Events and the next prevBatch at the end of the
|
|
||||||
/// search.
|
|
||||||
Stream<(List<Event>, String?)> startSearch({
|
|
||||||
String? searchTerm,
|
|
||||||
int requestHistoryCount = 100,
|
|
||||||
int maxHistoryRequests = 10,
|
|
||||||
String? prevBatch,
|
|
||||||
@Deprecated('Use [prevBatch] instead.') String? sinceEventId,
|
|
||||||
int? limit,
|
|
||||||
bool Function(Event)? searchFunc,
|
|
||||||
}) async* {
|
|
||||||
assert(searchTerm != null || searchFunc != null);
|
|
||||||
searchFunc ??= (event) =>
|
|
||||||
event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
|
|
||||||
final found = <Event>[];
|
|
||||||
|
|
||||||
if (sinceEventId == null) {
|
|
||||||
// Search locally
|
|
||||||
for (final event in events) {
|
|
||||||
if (searchFunc(event)) {
|
|
||||||
yield (found..add(event), null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Search in database
|
|
||||||
var start = events.length;
|
|
||||||
while (true) {
|
|
||||||
final eventsFromStore = await room.client.database.getEventList(
|
|
||||||
room,
|
|
||||||
start: start,
|
|
||||||
limit: requestHistoryCount,
|
|
||||||
);
|
|
||||||
if (eventsFromStore.isEmpty) break;
|
|
||||||
start += eventsFromStore.length;
|
|
||||||
for (final event in eventsFromStore) {
|
|
||||||
if (searchFunc(event)) {
|
|
||||||
yield (found..add(event), null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Search on the server
|
|
||||||
prevBatch ??= room.prev_batch;
|
|
||||||
if (sinceEventId != null) {
|
|
||||||
prevBatch =
|
|
||||||
(await room.client.getEventContext(room.id, sinceEventId)).end;
|
|
||||||
}
|
|
||||||
final encryption = room.client.encryption;
|
|
||||||
for (var i = 0; i < maxHistoryRequests; i++) {
|
|
||||||
if (prevBatch == null) break;
|
|
||||||
if (limit != null && found.length >= limit) break;
|
|
||||||
try {
|
|
||||||
final resp = await room.client.getRoomEvents(
|
|
||||||
room.id,
|
|
||||||
Direction.b,
|
|
||||||
from: prevBatch,
|
|
||||||
limit: requestHistoryCount,
|
|
||||||
filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
|
|
||||||
);
|
|
||||||
for (final matrixEvent in resp.chunk) {
|
|
||||||
var event = Event.fromMatrixEvent(matrixEvent, room);
|
|
||||||
if (event.type == EventTypes.Encrypted && encryption != null) {
|
|
||||||
event = await encryption.decryptRoomEvent(event);
|
|
||||||
if (event.type == EventTypes.Encrypted &&
|
|
||||||
event.messageType == MessageTypes.BadEncrypted &&
|
|
||||||
event.content['can_request_session'] == true) {
|
|
||||||
// Await requestKey() here to ensure decrypted message bodies
|
|
||||||
await event.requestKey();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (searchFunc(event)) {
|
|
||||||
yield (found..add(event), resp.end);
|
|
||||||
if (limit != null && found.length >= limit) break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
prevBatch = resp.end;
|
|
||||||
// We are at the beginning of the room
|
|
||||||
if (resp.chunk.length < requestHistoryCount) break;
|
|
||||||
} on MatrixException catch (e) {
|
|
||||||
// We have no permission anymore to request the history
|
|
||||||
if (e.error == MatrixError.M_FORBIDDEN) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
rethrow;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
extension on List<Event> {
|
// TODO: make up a better name
|
||||||
|
extension TimelineExtension on List<Event> {
|
||||||
int get firstIndexWhereNotError {
|
int get firstIndexWhereNotError {
|
||||||
if (isEmpty) return 0;
|
if (isEmpty) return 0;
|
||||||
final index = indexWhere((event) => !event.status.isError);
|
final index = indexWhere((event) => !event.status.isError);
|
||||||
if (index == -1) return length;
|
if (index == -1) return length;
|
||||||
return index;
|
return index;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue