working hard
This commit is contained in:
parent
74c39f91fe
commit
98031bbb3d
|
|
@ -1392,7 +1392,6 @@ class MatrixSdkDatabase extends DatabaseApi with DatabaseFileStorage {
|
|||
final key = TupleKey(roomId, threadRootEventId).toString();
|
||||
final thread = await _threadsBox.get(key);
|
||||
if (thread == null) return null;
|
||||
Logs().w(thread.toString());
|
||||
return Thread.fromJson(thread.cast<String, dynamic>(), client);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -136,6 +136,8 @@ class Room {
|
|||
partial = false;
|
||||
}
|
||||
|
||||
Map<String, Thread> threads = <String, Thread>{};
|
||||
|
||||
Future<void> _loadThreadsFromServer() async {
|
||||
try {
|
||||
final response = await client.getThreadRoots(id);
|
||||
|
|
@ -151,6 +153,7 @@ class Room {
|
|||
1, // count
|
||||
client,
|
||||
);
|
||||
threads[event.eventId] = (await client.database.getThread(id, event.eventId, client))!;
|
||||
}
|
||||
} catch (e) {
|
||||
Logs().w('Failed to load threads from server', e);
|
||||
|
|
@ -161,7 +164,8 @@ class Room {
|
|||
// This should be called from the client's sync handling
|
||||
// when a thread-related event is received
|
||||
|
||||
if (event.relationshipType == RelationshipTypes.thread && event.relationshipEventId != null) {
|
||||
if (event.relationshipType == RelationshipTypes.thread &&
|
||||
event.relationshipEventId != null) {
|
||||
// Update thread metadata in database
|
||||
final root = await getEventById(event.relationshipEventId!);
|
||||
if (root == null) return;
|
||||
|
|
@ -226,6 +230,18 @@ class Room {
|
|||
return dict;
|
||||
}
|
||||
|
||||
Future<Thread> getThread(Event rootEvent) async {
|
||||
final threads = await getThreads();
|
||||
if (threads.containsKey(rootEvent.eventId)) return threads[rootEvent.eventId]!;
|
||||
return Thread(
|
||||
room: this,
|
||||
rootEvent: rootEvent,
|
||||
client: client,
|
||||
currentUserParticipated: false,
|
||||
count: 0,
|
||||
);
|
||||
}
|
||||
|
||||
/// ID of the fully read marker event.
|
||||
String get fullyRead =>
|
||||
roomAccountData['m.fully_read']?.content.tryGet<String>('event_id') ?? '';
|
||||
|
|
|
|||
|
|
@ -37,9 +37,13 @@ class RoomTimeline extends Timeline {
|
|||
StreamSubscription<String>? sessionIdReceivedSub;
|
||||
StreamSubscription<String>? cancelSendEventSub;
|
||||
|
||||
@override
|
||||
bool isRequestingHistory = false;
|
||||
@override
|
||||
bool isRequestingFuture = false;
|
||||
@override
|
||||
bool allowNewEvent = true;
|
||||
@override
|
||||
bool isFragmentedTimeline = false;
|
||||
|
||||
final Map<String, Event> _eventCache = {};
|
||||
|
|
|
|||
|
|
@ -45,6 +45,16 @@ class Thread {
|
|||
room,
|
||||
);
|
||||
}
|
||||
if (json['unsigned']?['m.thread']?['latest_event'] != null) {
|
||||
lastEvent = Event.fromMatrixEvent(
|
||||
MatrixEvent.fromJson(
|
||||
json['unsigned']?['m.thread']?['latest_event'],
|
||||
),
|
||||
room,
|
||||
);
|
||||
}
|
||||
// Although I was making this part according to specification, it's a bit off
|
||||
// I have no clue why
|
||||
final thread = Thread(
|
||||
room: room,
|
||||
client: client,
|
||||
|
|
@ -334,7 +344,7 @@ class Thread {
|
|||
if (prev_batch == null) {
|
||||
throw 'Tried to request history without a prev_batch token';
|
||||
}
|
||||
|
||||
|
||||
final resp = await client.getRelatingEventsWithRelType(
|
||||
room.id,
|
||||
rootEvent.eventId,
|
||||
|
|
@ -350,7 +360,8 @@ class Thread {
|
|||
await client.database.transaction(() async {
|
||||
if (storeInDatabase && direction == Direction.b) {
|
||||
this.prev_batch = resp.prevBatch;
|
||||
await client.database.setThreadPrevBatch(resp.prevBatch, room.id, rootEvent.eventId, client);
|
||||
await client.database.setThreadPrevBatch(
|
||||
resp.prevBatch, room.id, rootEvent.eventId, client);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:matrix/matrix.dart';
|
||||
import 'package:matrix/src/models/timeline_chunk.dart';
|
||||
|
|
@ -23,17 +22,18 @@ class ThreadTimeline extends Timeline {
|
|||
StreamSubscription<String>? sessionIdReceivedSub;
|
||||
StreamSubscription<String>? cancelSendEventSub;
|
||||
|
||||
@override
|
||||
bool isRequestingHistory = false;
|
||||
|
||||
@override
|
||||
bool isFragmentedTimeline = false;
|
||||
|
||||
final Map<String, Event> _eventCache = {};
|
||||
|
||||
bool _fetchedAllDatabaseEvents = false;
|
||||
|
||||
|
||||
@override
|
||||
bool allowNewEvent = true;
|
||||
|
||||
bool _collectHistoryUpdates = false;
|
||||
|
||||
|
||||
@override
|
||||
bool isRequestingFuture = false;
|
||||
|
||||
ThreadTimeline({
|
||||
|
|
@ -52,6 +52,17 @@ class ThreadTimeline extends Timeline {
|
|||
historySub = room.client.onHistoryEvent.stream.listen(
|
||||
(event) => _handleEventUpdate(event, EventUpdateType.history),
|
||||
);
|
||||
|
||||
// we want to populate our aggregated events
|
||||
for (final e in events) {
|
||||
addAggregatedEvent(e);
|
||||
}
|
||||
|
||||
// we are using a fragmented timeline
|
||||
if (chunk.nextBatch != '') {
|
||||
allowNewEvent = false;
|
||||
isFragmentedTimeline = true;
|
||||
}
|
||||
}
|
||||
|
||||
void _handleEventUpdate(
|
||||
|
|
@ -63,7 +74,9 @@ class ThreadTimeline extends Timeline {
|
|||
if (event.roomId != thread.room.id) return;
|
||||
// Ignore events outside of this thread
|
||||
if (event.relationshipType != RelationshipTypes.thread ||
|
||||
event.relationshipEventId != thread.rootEvent.eventId) return;
|
||||
event.relationshipEventId != thread.rootEvent.eventId) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
|
||||
return;
|
||||
|
|
@ -152,14 +165,21 @@ class ThreadTimeline extends Timeline {
|
|||
dir: direction,
|
||||
from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
|
||||
limit: historyCount,
|
||||
recurse: true,
|
||||
);
|
||||
|
||||
Logs().w(
|
||||
'Loading thread events from server ${resp.chunk.length} ${resp.prevBatch}',
|
||||
);
|
||||
|
||||
if (resp.nextBatch == null) {
|
||||
Logs().w('We reached the end of the timeline');
|
||||
}
|
||||
|
||||
final newNextBatch = direction == Direction.b ? resp.prevBatch : resp.nextBatch;
|
||||
final newPrevBatch = direction == Direction.b ? resp.nextBatch : resp.prevBatch;
|
||||
final newNextBatch =
|
||||
direction == Direction.b ? resp.prevBatch : resp.nextBatch;
|
||||
final newPrevBatch =
|
||||
direction == Direction.b ? resp.nextBatch : resp.prevBatch;
|
||||
|
||||
final type = direction == Direction.b
|
||||
? EventUpdateType.history
|
||||
|
|
@ -193,7 +213,8 @@ class ThreadTimeline extends Timeline {
|
|||
if (allowNewEvent) {
|
||||
Logs().d('We now allow sync update into the timeline.');
|
||||
newEvents.addAll(
|
||||
await thread.client.database.getThreadEventList(thread, onlySending: true),
|
||||
await thread.client.database
|
||||
.getThreadEventList(thread, onlySending: true),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -258,7 +279,8 @@ class ThreadTimeline extends Timeline {
|
|||
}
|
||||
// Fetch all users from database we have got here.
|
||||
for (final event in events) {
|
||||
if (thread.room.getState(EventTypes.RoomMember, event.senderId) != null) {
|
||||
if (thread.room.getState(EventTypes.RoomMember, event.senderId) !=
|
||||
null) {
|
||||
continue;
|
||||
}
|
||||
final dbUser =
|
||||
|
|
@ -282,7 +304,6 @@ class ThreadTimeline extends Timeline {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
_fetchedAllDatabaseEvents = true;
|
||||
Logs().i('No more events found in the store. Request from server...');
|
||||
|
||||
if (isFragmentedTimeline) {
|
||||
|
|
@ -298,16 +319,13 @@ class ThreadTimeline extends Timeline {
|
|||
await thread.requestHistory(
|
||||
historyCount: historyCount,
|
||||
direction: direction,
|
||||
onHistoryReceived: () {
|
||||
_collectHistoryUpdates = true;
|
||||
},
|
||||
onHistoryReceived: () {},
|
||||
filter: filter,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
_collectHistoryUpdates = false;
|
||||
isRequestingHistory = false;
|
||||
onUpdate?.call();
|
||||
}
|
||||
|
|
@ -387,8 +405,10 @@ class ThreadTimeline extends Timeline {
|
|||
}
|
||||
|
||||
@override
|
||||
Future<void> requestHistory(
|
||||
{int historyCount = Room.defaultHistoryCount, StateFilter? filter}) async {
|
||||
Future<void> requestHistory({
|
||||
int historyCount = Room.defaultHistoryCount,
|
||||
StateFilter? filter,
|
||||
}) async {
|
||||
if (isRequestingHistory) return;
|
||||
isRequestingHistory = true;
|
||||
await _requestEvents(
|
||||
|
|
@ -421,45 +441,45 @@ class ThreadTimeline extends Timeline {
|
|||
}
|
||||
|
||||
@override
|
||||
bool get canRequestFuture => chunk.nextBatch != null && chunk.nextBatch!.isNotEmpty;
|
||||
bool get canRequestFuture => chunk.nextBatch.isNotEmpty;
|
||||
|
||||
@override
|
||||
bool get canRequestHistory => chunk.prevBatch != null && chunk.prevBatch!.isNotEmpty;
|
||||
@override
|
||||
bool get canRequestHistory => chunk.prevBatch.isNotEmpty;
|
||||
|
||||
@override
|
||||
Future<void> requestFuture({
|
||||
int historyCount = Room.defaultHistoryCount,
|
||||
StateFilter? filter,
|
||||
}) async {
|
||||
if (isRequestingFuture || !canRequestFuture) return;
|
||||
isRequestingFuture = true;
|
||||
|
||||
try {
|
||||
await getThreadEvents(
|
||||
historyCount: historyCount,
|
||||
direction: Direction.f,
|
||||
filter: filter,
|
||||
);
|
||||
} finally {
|
||||
isRequestingFuture = false;
|
||||
@override
|
||||
Future<void> requestFuture({
|
||||
int historyCount = Room.defaultHistoryCount,
|
||||
StateFilter? filter,
|
||||
}) async {
|
||||
if (isRequestingFuture || !canRequestFuture) return;
|
||||
isRequestingFuture = true;
|
||||
|
||||
try {
|
||||
await getThreadEvents(
|
||||
historyCount: historyCount,
|
||||
direction: Direction.f,
|
||||
filter: filter,
|
||||
);
|
||||
} finally {
|
||||
isRequestingFuture = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@override
|
||||
void requestKeys({
|
||||
bool tryOnlineBackup = true,
|
||||
bool onlineKeyBackupOnly = true,
|
||||
}) {
|
||||
for (final event in events) {
|
||||
if (event.type == EventTypes.Encrypted &&
|
||||
event.messageType == MessageTypes.BadEncrypted &&
|
||||
event.content['can_request_session'] == true) {
|
||||
final sessionId = event.content.tryGet<String>('session_id');
|
||||
final senderKey = event.content.tryGet<String>('sender_key');
|
||||
if (sessionId != null && senderKey != null) {
|
||||
thread.room.requestSessionKey(sessionId, senderKey);
|
||||
@override
|
||||
void requestKeys({
|
||||
bool tryOnlineBackup = true,
|
||||
bool onlineKeyBackupOnly = true,
|
||||
}) {
|
||||
for (final event in events) {
|
||||
if (event.type == EventTypes.Encrypted &&
|
||||
event.messageType == MessageTypes.BadEncrypted &&
|
||||
event.content['can_request_session'] == true) {
|
||||
final sessionId = event.content.tryGet<String>('session_id');
|
||||
final senderKey = event.content.tryGet<String>('sender_key');
|
||||
if (sessionId != null && senderKey != null) {
|
||||
thread.room.requestSessionKey(sessionId, senderKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,10 @@ abstract class Timeline {
|
|||
|
||||
bool get canRequestHistory;
|
||||
bool get canRequestFuture;
|
||||
bool get allowNewEvent;
|
||||
bool get isRequestingFuture;
|
||||
bool get isRequestingHistory;
|
||||
bool get isFragmentedTimeline;
|
||||
|
||||
Timeline({
|
||||
this.onUpdate,
|
||||
|
|
|
|||
Loading…
Reference in New Issue