working on threads
This commit is contained in:
parent
889477e07c
commit
0fb0a6c47f
|
|
@ -49,7 +49,10 @@ export 'src/voip/utils/famedly_call_extension.dart';
|
||||||
export 'src/voip/utils/types.dart';
|
export 'src/voip/utils/types.dart';
|
||||||
export 'src/voip/utils/wrapped_media_stream.dart';
|
export 'src/voip/utils/wrapped_media_stream.dart';
|
||||||
export 'src/room.dart';
|
export 'src/room.dart';
|
||||||
|
export 'src/thread.dart';
|
||||||
export 'src/timeline.dart';
|
export 'src/timeline.dart';
|
||||||
|
export 'src/room_timeline.dart';
|
||||||
|
export 'src/thread_timeline.dart';
|
||||||
export 'src/user.dart';
|
export 'src/user.dart';
|
||||||
export 'src/utils/cached_profile_information.dart';
|
export 'src/utils/cached_profile_information.dart';
|
||||||
export 'src/utils/commands_extension.dart';
|
export 'src/utils/commands_extension.dart';
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,9 @@ import 'dart:convert';
|
||||||
|
|
||||||
import 'package:matrix/matrix_api_lite.dart';
|
import 'package:matrix/matrix_api_lite.dart';
|
||||||
import 'package:matrix/src/event.dart';
|
import 'package:matrix/src/event.dart';
|
||||||
import 'package:matrix/src/timeline.dart';
|
import 'package:matrix/src/room_timeline.dart';
|
||||||
|
|
||||||
extension TimelineExportExtension on Timeline {
|
extension TimelineExportExtension on RoomTimeline {
|
||||||
/// Exports timeline events from a Matrix room within a specified date range.
|
/// Exports timeline events from a Matrix room within a specified date range.
|
||||||
///
|
///
|
||||||
/// The export process provides progress updates through the returned stream with the following information:
|
/// The export process provides progress updates through the returned stream with the following information:
|
||||||
|
|
|
||||||
|
|
@ -59,6 +59,17 @@ abstract class DatabaseApi {
|
||||||
|
|
||||||
Future<List<Room>> getRoomList(Client client);
|
Future<List<Room>> getRoomList(Client client);
|
||||||
|
|
||||||
|
Future<List<Thread>> getThreadList(String roomId, Client client);
|
||||||
|
|
||||||
|
Future<void> storeThread(
|
||||||
|
String roomId,
|
||||||
|
Event threadRootEvent,
|
||||||
|
Event? lastEvent,
|
||||||
|
bool currentUserParticipated,
|
||||||
|
int count,
|
||||||
|
Client client,
|
||||||
|
);
|
||||||
|
|
||||||
Future<Room?> getSingleRoom(
|
Future<Room?> getSingleRoom(
|
||||||
Client client,
|
Client client,
|
||||||
String roomId, {
|
String roomId, {
|
||||||
|
|
@ -78,6 +89,8 @@ abstract class DatabaseApi {
|
||||||
|
|
||||||
Future<void> deleteTimelineForRoom(String roomId);
|
Future<void> deleteTimelineForRoom(String roomId);
|
||||||
|
|
||||||
|
Future<void> deleteTimelineForThread(String roomId, String threadRootEventId);
|
||||||
|
|
||||||
/// Stores an EventUpdate object in the database. Must be called inside of
|
/// Stores an EventUpdate object in the database. Must be called inside of
|
||||||
/// [transaction].
|
/// [transaction].
|
||||||
Future<void> storeEventUpdate(
|
Future<void> storeEventUpdate(
|
||||||
|
|
@ -115,6 +128,13 @@ abstract class DatabaseApi {
|
||||||
int? limit,
|
int? limit,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Future<List<Event>> getThreadEventList(
|
||||||
|
Thread thread, {
|
||||||
|
int start = 0,
|
||||||
|
bool onlySending = false,
|
||||||
|
int? limit,
|
||||||
|
});
|
||||||
|
|
||||||
Future<List<String>> getEventIdList(
|
Future<List<String>> getEventIdList(
|
||||||
Room room, {
|
Room room, {
|
||||||
int start = 0,
|
int start = 0,
|
||||||
|
|
@ -265,6 +285,13 @@ abstract class DatabaseApi {
|
||||||
|
|
||||||
Future removeEvent(String eventId, String roomId);
|
Future removeEvent(String eventId, String roomId);
|
||||||
|
|
||||||
|
Future setThreadPrevBatch(
|
||||||
|
String? prevBatch,
|
||||||
|
String roomId,
|
||||||
|
String threadRootEventId,
|
||||||
|
Client client,
|
||||||
|
);
|
||||||
|
|
||||||
Future setRoomPrevBatch(
|
Future setRoomPrevBatch(
|
||||||
String? prevBatch,
|
String? prevBatch,
|
||||||
String roomId,
|
String roomId,
|
||||||
|
|
|
||||||
|
|
@ -59,6 +59,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
late Box<String> _clientBox;
|
late Box<String> _clientBox;
|
||||||
late Box<Map> _accountDataBox;
|
late Box<Map> _accountDataBox;
|
||||||
late Box<Map> _roomsBox;
|
late Box<Map> _roomsBox;
|
||||||
|
late Box<Map> _threadsBox;
|
||||||
late Box<Map> _toDeviceQueueBox;
|
late Box<Map> _toDeviceQueueBox;
|
||||||
|
|
||||||
/// Key is a tuple as TupleKey(roomId, type, stateKey) where stateKey can be
|
/// Key is a tuple as TupleKey(roomId, type, stateKey) where stateKey can be
|
||||||
|
|
@ -122,6 +123,8 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
|
|
||||||
static const String _roomsBoxName = 'box_rooms';
|
static const String _roomsBoxName = 'box_rooms';
|
||||||
|
|
||||||
|
static const String _threadsBoxName = 'box_threads';
|
||||||
|
|
||||||
static const String _toDeviceQueueBoxName = 'box_to_device_queue';
|
static const String _toDeviceQueueBoxName = 'box_to_device_queue';
|
||||||
|
|
||||||
static const String _preloadRoomStateBoxName = 'box_preload_room_states';
|
static const String _preloadRoomStateBoxName = 'box_preload_room_states';
|
||||||
|
|
@ -218,6 +221,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
_clientBoxName,
|
_clientBoxName,
|
||||||
_accountDataBoxName,
|
_accountDataBoxName,
|
||||||
_roomsBoxName,
|
_roomsBoxName,
|
||||||
|
_threadsBoxName,
|
||||||
_toDeviceQueueBoxName,
|
_toDeviceQueueBoxName,
|
||||||
_preloadRoomStateBoxName,
|
_preloadRoomStateBoxName,
|
||||||
_nonPreloadRoomStateBoxName,
|
_nonPreloadRoomStateBoxName,
|
||||||
|
|
@ -252,6 +256,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
_roomsBox = _collection.openBox<Map>(
|
_roomsBox = _collection.openBox<Map>(
|
||||||
_roomsBoxName,
|
_roomsBoxName,
|
||||||
);
|
);
|
||||||
|
_threadsBox = _collection.openBox<Map>(_threadsBoxName);
|
||||||
_preloadRoomStateBox = _collection.openBox(
|
_preloadRoomStateBox = _collection.openBox(
|
||||||
_preloadRoomStateBoxName,
|
_preloadRoomStateBoxName,
|
||||||
);
|
);
|
||||||
|
|
@ -357,6 +362,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
_clientBox.clearQuickAccessCache();
|
_clientBox.clearQuickAccessCache();
|
||||||
_accountDataBox.clearQuickAccessCache();
|
_accountDataBox.clearQuickAccessCache();
|
||||||
_roomsBox.clearQuickAccessCache();
|
_roomsBox.clearQuickAccessCache();
|
||||||
|
_threadsBox.clearQuickAccessCache();
|
||||||
_preloadRoomStateBox.clearQuickAccessCache();
|
_preloadRoomStateBox.clearQuickAccessCache();
|
||||||
_nonPreloadRoomStateBox.clearQuickAccessCache();
|
_nonPreloadRoomStateBox.clearQuickAccessCache();
|
||||||
_roomMembersBox.clearQuickAccessCache();
|
_roomMembersBox.clearQuickAccessCache();
|
||||||
|
|
@ -383,6 +389,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
@override
|
@override
|
||||||
Future<void> clearCache() => transaction(() async {
|
Future<void> clearCache() => transaction(() async {
|
||||||
await _roomsBox.clear();
|
await _roomsBox.clear();
|
||||||
|
await _threadsBox.clear();
|
||||||
await _accountDataBox.clear();
|
await _accountDataBox.clear();
|
||||||
await _roomAccountDataBox.clear();
|
await _roomAccountDataBox.clear();
|
||||||
await _preloadRoomStateBox.clear();
|
await _preloadRoomStateBox.clear();
|
||||||
|
|
@ -532,6 +539,46 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
return await _getEventsByIds(eventIds.cast<String>(), room);
|
return await _getEventsByIds(eventIds.cast<String>(), room);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<List<Event>> getThreadEventList(
|
||||||
|
Thread thread, {
|
||||||
|
int start = 0,
|
||||||
|
bool onlySending = false,
|
||||||
|
int? limit,
|
||||||
|
}) =>
|
||||||
|
runBenchmarked<List<Event>>('Get event list', () async {
|
||||||
|
// Get the synced event IDs from the store
|
||||||
|
final timelineKey =
|
||||||
|
TupleKey(thread.room.id, '', thread.rootEvent.eventId).toString();
|
||||||
|
final timelineEventIds =
|
||||||
|
(await _timelineFragmentsBox.get(timelineKey) ?? []);
|
||||||
|
|
||||||
|
// Get the local stored SENDING events from the store
|
||||||
|
late final List sendingEventIds;
|
||||||
|
if (start != 0) {
|
||||||
|
sendingEventIds = [];
|
||||||
|
} else {
|
||||||
|
final sendingTimelineKey =
|
||||||
|
TupleKey(thread.room.id, 'SENDING', thread.rootEvent.eventId)
|
||||||
|
.toString();
|
||||||
|
sendingEventIds =
|
||||||
|
(await _timelineFragmentsBox.get(sendingTimelineKey) ?? []);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Combine those two lists while respecting the start and limit parameters.
|
||||||
|
final end = min(
|
||||||
|
timelineEventIds.length,
|
||||||
|
start + (limit ?? timelineEventIds.length),
|
||||||
|
);
|
||||||
|
final eventIds = [
|
||||||
|
...sendingEventIds,
|
||||||
|
if (!onlySending && start < timelineEventIds.length)
|
||||||
|
...timelineEventIds.getRange(start, end),
|
||||||
|
];
|
||||||
|
|
||||||
|
return await _getEventsByIds(eventIds.cast<String>(), thread.room);
|
||||||
|
});
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<StoredInboundGroupSession?> getInboundGroupSession(
|
Future<StoredInboundGroupSession?> getInboundGroupSession(
|
||||||
String roomId,
|
String roomId,
|
||||||
|
|
@ -1053,6 +1100,22 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> setThreadPrevBatch(
|
||||||
|
String? prevBatch,
|
||||||
|
String roomId,
|
||||||
|
String threadRootEventId,
|
||||||
|
Client client,
|
||||||
|
) async {
|
||||||
|
final raw =
|
||||||
|
await _threadsBox.get(TupleKey(roomId, threadRootEventId).toString());
|
||||||
|
if (raw == null) return;
|
||||||
|
final thread = Thread.fromJson(copyMap(raw), client);
|
||||||
|
thread.prev_batch = prevBatch;
|
||||||
|
await _threadsBox.put(roomId, thread.toJson());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> setVerifiedUserCrossSigningKey(
|
Future<void> setVerifiedUserCrossSigningKey(
|
||||||
bool verified,
|
bool verified,
|
||||||
|
|
@ -1302,6 +1365,43 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<List<Thread>> getThreadList(String roomId, Client client) async {
|
||||||
|
final allThreadsKeys = await _threadsBox.getAllKeys();
|
||||||
|
final threadsKeys = <String>{};
|
||||||
|
// TERRIBLE implementation. Better to create another box (String[roomId]->List<string>[event ids])
|
||||||
|
for (final key in allThreadsKeys) {
|
||||||
|
if (key.startsWith(roomId)) threadsKeys.add(key);
|
||||||
|
}
|
||||||
|
final threads = <Thread>{};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return threads.toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> storeThread(
|
||||||
|
String roomId,
|
||||||
|
Event threadRootEvent,
|
||||||
|
Event? lastEvent,
|
||||||
|
bool currentUserParticipated,
|
||||||
|
int count,
|
||||||
|
Client client,
|
||||||
|
) async {
|
||||||
|
final key = TupleKey(roomId, threadRootEvent.eventId).toString();
|
||||||
|
// final currentRawThread = await _threadsBox.get(key);
|
||||||
|
await _threadsBox.put(
|
||||||
|
key,
|
||||||
|
Thread(
|
||||||
|
room: Room(id: roomId, client: client),
|
||||||
|
rootEvent: threadRootEvent,
|
||||||
|
client: client,
|
||||||
|
currentUserParticipated: currentUserParticipated,
|
||||||
|
count: count,
|
||||||
|
).toJson());
|
||||||
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> storeRoomUpdate(
|
Future<void> storeRoomUpdate(
|
||||||
String roomId,
|
String roomId,
|
||||||
|
|
@ -1314,6 +1414,7 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
await forgetRoom(roomId);
|
await forgetRoom(roomId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final membership = roomUpdate is LeftRoomUpdate
|
final membership = roomUpdate is LeftRoomUpdate
|
||||||
? Membership.leave
|
? Membership.leave
|
||||||
: roomUpdate is InvitedRoomUpdate
|
: roomUpdate is InvitedRoomUpdate
|
||||||
|
|
@ -1376,6 +1477,12 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
||||||
Future<void> deleteTimelineForRoom(String roomId) =>
|
Future<void> deleteTimelineForRoom(String roomId) =>
|
||||||
_timelineFragmentsBox.delete(TupleKey(roomId, '').toString());
|
_timelineFragmentsBox.delete(TupleKey(roomId, '').toString());
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> deleteTimelineForThread(
|
||||||
|
String roomId, String threadRootEventId) =>
|
||||||
|
_timelineFragmentsBox
|
||||||
|
.delete(TupleKey(roomId, '', threadRootEventId).toString());
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> storeSSSSCache(
|
Future<void> storeSSSSCache(
|
||||||
String type,
|
String type,
|
||||||
|
|
|
||||||
|
|
@ -1649,7 +1649,7 @@ class Room {
|
||||||
/// [onChange], [onRemove], [onInsert] and the [onHistoryReceived] callbacks.
|
/// [onChange], [onRemove], [onInsert] and the [onHistoryReceived] callbacks.
|
||||||
/// This method can also retrieve the timeline at a specific point by setting
|
/// This method can also retrieve the timeline at a specific point by setting
|
||||||
/// the [eventContextId]
|
/// the [eventContextId]
|
||||||
Future<Timeline> getTimeline({
|
Future<RoomTimeline> getTimeline({
|
||||||
void Function(int index)? onChange,
|
void Function(int index)? onChange,
|
||||||
void Function(int index)? onRemove,
|
void Function(int index)? onRemove,
|
||||||
void Function(int insertID)? onInsert,
|
void Function(int insertID)? onInsert,
|
||||||
|
|
|
||||||
|
|
@ -436,6 +436,9 @@ class RoomTimeline extends Timeline {
|
||||||
try {
|
try {
|
||||||
if (event.roomId != room.id) return;
|
if (event.roomId != room.id) return;
|
||||||
|
|
||||||
|
// This will be handled by ThreadTimeline
|
||||||
|
if (event.relationshipType == RelationshipTypes.thread) return;
|
||||||
|
|
||||||
if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
|
if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,20 +1,37 @@
|
||||||
import 'package:matrix/matrix.dart';
|
import 'package:matrix/matrix.dart';
|
||||||
|
import 'package:matrix/matrix_api_lite/generated/internal.dart';
|
||||||
|
import 'package:matrix/src/models/timeline_chunk.dart';
|
||||||
|
|
||||||
class Thread {
|
class Thread {
|
||||||
final Room room;
|
final Room room;
|
||||||
final Event rootEvent;
|
final Event rootEvent;
|
||||||
Event? lastEvent;
|
Event? lastEvent;
|
||||||
String? prev_batch;
|
String? prev_batch;
|
||||||
|
bool currentUserParticipated;
|
||||||
|
int count;
|
||||||
final Client client;
|
final Client client;
|
||||||
|
|
||||||
Thread({
|
Thread({
|
||||||
required this.room,
|
required this.room,
|
||||||
required this.rootEvent,
|
required this.rootEvent,
|
||||||
required this.client,
|
required this.client,
|
||||||
|
required this.currentUserParticipated,
|
||||||
|
required this.count,
|
||||||
this.prev_batch,
|
this.prev_batch,
|
||||||
this.lastEvent,
|
this.lastEvent,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Map<String, dynamic> toJson() => {
|
||||||
|
...rootEvent.toJson(),
|
||||||
|
'unsigned': {
|
||||||
|
'm.thread': {
|
||||||
|
'latest_event': lastEvent?.toJson(),
|
||||||
|
'count': count,
|
||||||
|
'current_user_participated': currentUserParticipated,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
factory Thread.fromJson(Map<String, dynamic> json, Client client) {
|
factory Thread.fromJson(Map<String, dynamic> json, Client client) {
|
||||||
final room = client.getRoomById(json['room_id']);
|
final room = client.getRoomById(json['room_id']);
|
||||||
if (room == null) throw Error();
|
if (room == null) throw Error();
|
||||||
|
|
@ -36,6 +53,9 @@ class Thread {
|
||||||
room,
|
room,
|
||||||
),
|
),
|
||||||
lastEvent: lastEvent,
|
lastEvent: lastEvent,
|
||||||
|
count: json['unsigned']?['m.relations']?['m.thread']?['count'],
|
||||||
|
currentUserParticipated: json['unsigned']?['m.relations']?['m.thread']
|
||||||
|
?['current_user_participated'],
|
||||||
);
|
);
|
||||||
return thread;
|
return thread;
|
||||||
}
|
}
|
||||||
|
|
@ -95,6 +115,120 @@ class Thread {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<TimelineChunk?> getEventContext(String eventId) async {
|
||||||
|
// TODO: probably find events with relationship
|
||||||
|
final resp = await client.getEventContext(
|
||||||
|
room.id, eventId,
|
||||||
|
limit: Room.defaultHistoryCount,
|
||||||
|
// filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
|
||||||
|
);
|
||||||
|
|
||||||
|
final events = [
|
||||||
|
if (resp.eventsAfter != null) ...resp.eventsAfter!.reversed,
|
||||||
|
if (resp.event != null) resp.event!,
|
||||||
|
if (resp.eventsBefore != null) ...resp.eventsBefore!,
|
||||||
|
].map((e) => Event.fromMatrixEvent(e, room)).toList();
|
||||||
|
|
||||||
|
// Try again to decrypt encrypted events but don't update the database.
|
||||||
|
if (room.encrypted && client.encryptionEnabled) {
|
||||||
|
for (var i = 0; i < events.length; i++) {
|
||||||
|
if (events[i].type == EventTypes.Encrypted &&
|
||||||
|
events[i].content['can_request_session'] == true) {
|
||||||
|
events[i] = await client.encryption!.decryptRoomEvent(events[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final chunk = TimelineChunk(
|
||||||
|
nextBatch: resp.end ?? '',
|
||||||
|
prevBatch: resp.start ?? '',
|
||||||
|
events: events,
|
||||||
|
);
|
||||||
|
|
||||||
|
return chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<ThreadTimeline> getTimeline({
|
||||||
|
void Function(int index)? onChange,
|
||||||
|
void Function(int index)? onRemove,
|
||||||
|
void Function(int insertID)? onInsert,
|
||||||
|
void Function()? onNewEvent,
|
||||||
|
void Function()? onUpdate,
|
||||||
|
String? eventContextId,
|
||||||
|
int? limit = Room.defaultHistoryCount,
|
||||||
|
}) async {
|
||||||
|
// await postLoad();
|
||||||
|
|
||||||
|
var events = <Event>[];
|
||||||
|
|
||||||
|
await client.database.transaction(() async {
|
||||||
|
events = await client.database.getThreadEventList(
|
||||||
|
this,
|
||||||
|
limit: limit,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
var chunk = TimelineChunk(events: events);
|
||||||
|
// Load the timeline arround eventContextId if set
|
||||||
|
if (eventContextId != null) {
|
||||||
|
if (!events.any((Event event) => event.eventId == eventContextId)) {
|
||||||
|
chunk =
|
||||||
|
await getEventContext(eventContextId) ?? TimelineChunk(events: []);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final timeline = ThreadTimeline(
|
||||||
|
thread: this,
|
||||||
|
chunk: chunk,
|
||||||
|
onChange: onChange,
|
||||||
|
onRemove: onRemove,
|
||||||
|
onInsert: onInsert,
|
||||||
|
onNewEvent: onNewEvent,
|
||||||
|
onUpdate: onUpdate,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Fetch all users from database we have got here.
|
||||||
|
if (eventContextId == null) {
|
||||||
|
final userIds = events.map((event) => event.senderId).toSet();
|
||||||
|
for (final userId in userIds) {
|
||||||
|
if (room.getState(EventTypes.RoomMember, userId) != null) continue;
|
||||||
|
final dbUser = await client.database.getUser(userId, room);
|
||||||
|
if (dbUser != null) room.setState(dbUser);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try again to decrypt encrypted events and update the database.
|
||||||
|
if (room.encrypted && client.encryptionEnabled) {
|
||||||
|
// decrypt messages
|
||||||
|
for (var i = 0; i < chunk.events.length; i++) {
|
||||||
|
if (chunk.events[i].type == EventTypes.Encrypted) {
|
||||||
|
if (eventContextId != null) {
|
||||||
|
// for the fragmented timeline, we don't cache the decrypted
|
||||||
|
//message in the database
|
||||||
|
chunk.events[i] = await client.encryption!.decryptRoomEvent(
|
||||||
|
chunk.events[i],
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
// else, we need the database
|
||||||
|
await client.database.transaction(() async {
|
||||||
|
for (var i = 0; i < chunk.events.length; i++) {
|
||||||
|
if (chunk.events[i].content['can_request_session'] == true) {
|
||||||
|
chunk.events[i] = await client.encryption!.decryptRoomEvent(
|
||||||
|
chunk.events[i],
|
||||||
|
store: !room.isArchived,
|
||||||
|
updateType: EventUpdateType.history,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return timeline;
|
||||||
|
}
|
||||||
|
|
||||||
Future<String?> sendTextEvent(
|
Future<String?> sendTextEvent(
|
||||||
String message, {
|
String message, {
|
||||||
String? txid,
|
String? txid,
|
||||||
|
|
@ -170,4 +304,56 @@ class Thread {
|
||||||
threadRootEventId: rootEvent.eventId,
|
threadRootEventId: rootEvent.eventId,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Future<void> setReadMarker({String? eventId, bool? public}) async {
|
||||||
|
if (eventId == null) return null;
|
||||||
|
return await client.postReceipt(
|
||||||
|
room.id,
|
||||||
|
(public ?? client.receiptsPublicByDefault)
|
||||||
|
? ReceiptType.mRead
|
||||||
|
: ReceiptType.mReadPrivate,
|
||||||
|
eventId,
|
||||||
|
threadId: rootEvent.eventId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<int> requestHistory({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
void Function()? onHistoryReceived,
|
||||||
|
direction = Direction.b,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
final prev_batch = this.prev_batch;
|
||||||
|
|
||||||
|
final storeInDatabase = !room.isArchived;
|
||||||
|
|
||||||
|
// Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
|
||||||
|
filter ??= StateFilter(lazyLoadMembers: true);
|
||||||
|
filter.lazyLoadMembers ??= true;
|
||||||
|
|
||||||
|
if (prev_batch == null) {
|
||||||
|
throw 'Tried to request history without a prev_batch token';
|
||||||
|
}
|
||||||
|
|
||||||
|
final resp = await client.getRelatingEventsWithRelType(
|
||||||
|
room.id,
|
||||||
|
rootEvent.eventId,
|
||||||
|
RelationshipTypes.thread,
|
||||||
|
from: prev_batch,
|
||||||
|
limit: historyCount,
|
||||||
|
dir: direction,
|
||||||
|
recurse: true,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (onHistoryReceived != null) onHistoryReceived();
|
||||||
|
|
||||||
|
await client.database.transaction(() async {
|
||||||
|
if (storeInDatabase && direction == Direction.b) {
|
||||||
|
this.prev_batch = resp.prevBatch;
|
||||||
|
await client.database.setThreadPrevBatch(resp.prevBatch, room.id, rootEvent.eventId, client);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return resp.chunk.length;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
|
import 'dart:convert';
|
||||||
|
|
||||||
import 'package:matrix/matrix.dart';
|
import 'package:matrix/matrix.dart';
|
||||||
import 'package:matrix/src/models/timeline_chunk.dart';
|
import 'package:matrix/src/models/timeline_chunk.dart';
|
||||||
|
|
@ -22,6 +23,17 @@ class ThreadTimeline extends Timeline {
|
||||||
StreamSubscription<String>? sessionIdReceivedSub;
|
StreamSubscription<String>? sessionIdReceivedSub;
|
||||||
StreamSubscription<String>? cancelSendEventSub;
|
StreamSubscription<String>? cancelSendEventSub;
|
||||||
|
|
||||||
|
bool isRequestingHistory = false;
|
||||||
|
bool isFragmentedTimeline = false;
|
||||||
|
|
||||||
|
final Map<String, Event> _eventCache = {};
|
||||||
|
|
||||||
|
bool _fetchedAllDatabaseEvents = false;
|
||||||
|
|
||||||
|
bool allowNewEvent = true;
|
||||||
|
|
||||||
|
bool _collectHistoryUpdates = false;
|
||||||
|
|
||||||
ThreadTimeline({
|
ThreadTimeline({
|
||||||
required this.thread,
|
required this.thread,
|
||||||
required this.chunk,
|
required this.chunk,
|
||||||
|
|
@ -59,6 +71,23 @@ class ThreadTimeline extends Timeline {
|
||||||
onNewEvent?.call();
|
onNewEvent?.call();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final status = event.status;
|
||||||
|
final i = _findEvent(
|
||||||
|
event_id: event.eventId,
|
||||||
|
unsigned_txid: event.transactionId,
|
||||||
|
);
|
||||||
|
if (i < events.length) {
|
||||||
|
// if the old status is larger than the new one, we also want to preserve the old status
|
||||||
|
final oldStatus = events[i].status;
|
||||||
|
events[i] = event;
|
||||||
|
// do we preserve the status? we should allow 0 -> -1 updates and status increases
|
||||||
|
if ((latestEventStatus(status, oldStatus) == oldStatus) &&
|
||||||
|
!(status.isError && oldStatus.isSending)) {
|
||||||
|
events[i].status = oldStatus;
|
||||||
|
}
|
||||||
|
addAggregatedEvent(events[i]);
|
||||||
|
onChange?.call(i);
|
||||||
|
} else {
|
||||||
if (type == EventUpdateType.history &&
|
if (type == EventUpdateType.history &&
|
||||||
events.indexWhere((e) => e.eventId == event.eventId) != -1) {
|
events.indexWhere((e) => e.eventId == event.eventId) != -1) {
|
||||||
return;
|
return;
|
||||||
|
|
@ -73,6 +102,7 @@ class ThreadTimeline extends Timeline {
|
||||||
onInsert?.call(index);
|
onInsert?.call(index);
|
||||||
|
|
||||||
addAggregatedEvent(event);
|
addAggregatedEvent(event);
|
||||||
|
}
|
||||||
|
|
||||||
// Handle redaction events
|
// Handle redaction events
|
||||||
if (event.type == EventTypes.Redaction) {
|
if (event.type == EventTypes.Redaction) {
|
||||||
|
|
@ -103,6 +133,184 @@ class ThreadTimeline extends Timeline {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Request more previous events from the server.
|
||||||
|
Future<int> getThreadEvents({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
direction = Direction.b,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
// Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
|
||||||
|
filter ??= StateFilter(lazyLoadMembers: true);
|
||||||
|
filter.lazyLoadMembers ??= true;
|
||||||
|
|
||||||
|
final resp = await thread.client.getRelatingEventsWithRelType(
|
||||||
|
thread.room.id,
|
||||||
|
thread.rootEvent.eventId,
|
||||||
|
RelationshipTypes.thread,
|
||||||
|
dir: direction,
|
||||||
|
from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
|
||||||
|
limit: historyCount,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (resp.nextBatch == null) {
|
||||||
|
Logs().w('We reached the end of the timeline');
|
||||||
|
}
|
||||||
|
|
||||||
|
final newNextBatch = direction == Direction.b ? resp.prevBatch : resp.nextBatch;
|
||||||
|
final newPrevBatch = direction == Direction.b ? resp.nextBatch : resp.prevBatch;
|
||||||
|
|
||||||
|
final type = direction == Direction.b
|
||||||
|
? EventUpdateType.history
|
||||||
|
: EventUpdateType.timeline;
|
||||||
|
|
||||||
|
// I dont know what this piece of code does
|
||||||
|
// if ((resp.state?.length ?? 0) == 0 &&
|
||||||
|
// resp.start != resp.end &&
|
||||||
|
// newPrevBatch != null &&
|
||||||
|
// newNextBatch != null) {
|
||||||
|
// if (type == EventUpdateType.history) {
|
||||||
|
// Logs().w(
|
||||||
|
// '[nav] we can still request history prevBatch: $type $newPrevBatch',
|
||||||
|
// );
|
||||||
|
// } else {
|
||||||
|
// Logs().w(
|
||||||
|
// '[nav] we can still request timeline nextBatch: $type $newNextBatch',
|
||||||
|
// );
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
final newEvents =
|
||||||
|
resp.chunk.map((e) => Event.fromMatrixEvent(e, thread.room)).toList();
|
||||||
|
|
||||||
|
if (!allowNewEvent) {
|
||||||
|
if (resp.prevBatch == resp.nextBatch ||
|
||||||
|
(resp.nextBatch == null && direction == Direction.f)) {
|
||||||
|
allowNewEvent = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allowNewEvent) {
|
||||||
|
Logs().d('We now allow sync update into the timeline.');
|
||||||
|
newEvents.addAll(
|
||||||
|
await thread.client.database.getThreadEventList(thread, onlySending: true),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to decrypt encrypted events but don't update the database.
|
||||||
|
if (thread.room.encrypted && thread.client.encryptionEnabled) {
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
if (newEvents[i].type == EventTypes.Encrypted) {
|
||||||
|
newEvents[i] = await thread.client.encryption!.decryptRoomEvent(
|
||||||
|
newEvents[i],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update chunk anchors
|
||||||
|
if (type == EventUpdateType.history) {
|
||||||
|
chunk.prevBatch = newPrevBatch ?? '';
|
||||||
|
|
||||||
|
final offset = chunk.events.length;
|
||||||
|
|
||||||
|
chunk.events.addAll(newEvents);
|
||||||
|
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
onInsert?.call(i + offset);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
chunk.nextBatch = newNextBatch ?? '';
|
||||||
|
chunk.events.insertAll(0, newEvents.reversed);
|
||||||
|
|
||||||
|
for (var i = 0; i < newEvents.length; i++) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (onUpdate != null) {
|
||||||
|
onUpdate!();
|
||||||
|
}
|
||||||
|
return resp.chunk.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _requestEvents({
|
||||||
|
int historyCount = Room.defaultHistoryCount,
|
||||||
|
required Direction direction,
|
||||||
|
StateFilter? filter,
|
||||||
|
}) async {
|
||||||
|
onUpdate?.call();
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Look up for events in the database first. With fragmented view, we should delete the database cache
|
||||||
|
final eventsFromStore = isFragmentedTimeline
|
||||||
|
? null
|
||||||
|
: await thread.client.database.getThreadEventList(
|
||||||
|
thread,
|
||||||
|
start: events.length,
|
||||||
|
limit: historyCount,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
|
||||||
|
for (final e in eventsFromStore) {
|
||||||
|
addAggregatedEvent(e);
|
||||||
|
}
|
||||||
|
// Fetch all users from database we have got here.
|
||||||
|
for (final event in events) {
|
||||||
|
if (thread.room.getState(EventTypes.RoomMember, event.senderId) != null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
final dbUser =
|
||||||
|
await thread.client.database.getUser(event.senderId, thread.room);
|
||||||
|
if (dbUser != null) thread.room.setState(dbUser);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (direction == Direction.b) {
|
||||||
|
events.addAll(eventsFromStore);
|
||||||
|
final startIndex = events.length - eventsFromStore.length;
|
||||||
|
final endIndex = events.length;
|
||||||
|
for (var i = startIndex; i < endIndex; i++) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
events.insertAll(0, eventsFromStore);
|
||||||
|
final startIndex = eventsFromStore.length;
|
||||||
|
final endIndex = 0;
|
||||||
|
for (var i = startIndex; i > endIndex; i--) {
|
||||||
|
onInsert?.call(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
_fetchedAllDatabaseEvents = true;
|
||||||
|
Logs().i('No more events found in the store. Request from server...');
|
||||||
|
|
||||||
|
if (isFragmentedTimeline) {
|
||||||
|
await getThreadEvents(
|
||||||
|
historyCount: historyCount,
|
||||||
|
direction: direction,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
if (thread.prev_batch == null) {
|
||||||
|
Logs().i('No more events to request from server...');
|
||||||
|
} else {
|
||||||
|
await thread.requestHistory(
|
||||||
|
historyCount: historyCount,
|
||||||
|
direction: direction,
|
||||||
|
onHistoryReceived: () {
|
||||||
|
_collectHistoryUpdates = true;
|
||||||
|
},
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
_collectHistoryUpdates = false;
|
||||||
|
isRequestingHistory = false;
|
||||||
|
onUpdate?.call();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Add an event to the aggregation tree
|
/// Add an event to the aggregation tree
|
||||||
void addAggregatedEvent(Event event) {
|
void addAggregatedEvent(Event event) {
|
||||||
final relationshipType = event.relationshipType;
|
final relationshipType = event.relationshipType;
|
||||||
|
|
@ -173,9 +381,15 @@ class ThreadTimeline extends Timeline {
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<Event?> getEventById(String id) {
|
Future<Event?> getEventById(String id) async {
|
||||||
// TODO: implement getEventById
|
for (final event in events) {
|
||||||
throw UnimplementedError();
|
if (event.eventId == id) return event;
|
||||||
|
}
|
||||||
|
if (_eventCache.containsKey(id)) return _eventCache[id];
|
||||||
|
final requestedEvent = await thread.room.getEventById(id);
|
||||||
|
if (requestedEvent == null) return null;
|
||||||
|
_eventCache[id] = requestedEvent;
|
||||||
|
return _eventCache[id];
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
|
@ -187,9 +401,15 @@ class ThreadTimeline extends Timeline {
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> requestHistory(
|
Future<void> requestHistory(
|
||||||
{int historyCount = Room.defaultHistoryCount, StateFilter? filter}) {
|
{int historyCount = Room.defaultHistoryCount, StateFilter? filter}) async {
|
||||||
// TODO: implement requestHistory
|
if (isRequestingHistory) return;
|
||||||
throw UnimplementedError();
|
isRequestingHistory = true;
|
||||||
|
await _requestEvents(
|
||||||
|
direction: Direction.b,
|
||||||
|
historyCount: historyCount,
|
||||||
|
filter: filter,
|
||||||
|
);
|
||||||
|
isRequestingHistory = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
|
@ -200,8 +420,10 @@ class ThreadTimeline extends Timeline {
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> setReadMarker({String? eventId, bool? public}) {
|
Future<void> setReadMarker({String? eventId, bool? public}) {
|
||||||
// TODO: implement setReadMarker
|
return thread.setReadMarker(
|
||||||
throw UnimplementedError();
|
eventId: eventId,
|
||||||
|
public: public,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue