Merge pull request #1788 from famedly/krille/better-delete-failed-send-event

refactor: delete not sent events without eventupdate stream workaround
This commit is contained in:
Krille-chan 2024-05-08 12:11:38 +02:00 committed by GitHub
commit e8c6eca7f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 78 additions and 75 deletions

View File

@ -1312,6 +1312,9 @@ class Client extends MatrixApi {
final CachedStreamController<Event> onGroupMember = CachedStreamController(); final CachedStreamController<Event> onGroupMember = CachedStreamController();
final CachedStreamController<String> onCancelSendEvent =
CachedStreamController();
/// When a state in a room has been updated this will return the room ID /// When a state in a room has been updated this will return the room ID
/// and the state event. /// and the state event.
final CachedStreamController<({String roomId, StrippedStateEvent state})> final CachedStreamController<({String roomId, StrippedStateEvent state})>

View File

@ -322,27 +322,26 @@ class Event extends MatrixEvent {
return receiptsList; return receiptsList;
} }
/// Removes this event if the status is [sending], [error] or [removed]. @Deprecated('Use [cancelSend()] instead.')
/// This event will just be removed from the database and the timelines.
/// Returns [false] if not removed.
Future<bool> remove() async { Future<bool> remove() async {
final room = this.room; try {
await cancelSend();
if (!status.isSent) {
await room.client.database?.removeEvent(eventId, room.id);
room.client.onEvent.add(EventUpdate(
roomID: room.id,
type: EventUpdateType.timeline,
content: {
'event_id': eventId,
'status': EventStatus.removed.intValue,
'content': {'body': 'Removed...'}
},
));
return true; return true;
} catch (_) {
return false;
} }
return false; }
/// Removes an unsent or yet-to-send event from the database and timeline.
/// These are events marked with the status `SENDING` or `ERROR`.
/// Throws an exception if used for an already sent event!
Future<void> cancelSend() async {
if (status.isSent) {
throw Exception('Can only delete events which are not sent yet!');
}
await room.client.database?.removeEvent(eventId, room.id);
room.client.onCancelSendEvent.add(eventId);
} }
/// Try to send this event again. Only works with events of status -1. /// Try to send this event again. Only works with events of status -1.
@ -358,7 +357,7 @@ class Event extends MatrixEvent {
}.contains(messageType)) { }.contains(messageType)) {
final file = room.sendingFilePlaceholders[eventId]; final file = room.sendingFilePlaceholders[eventId];
if (file == null) { if (file == null) {
await remove(); await cancelSend();
throw Exception('Can not try to send again. File is no longer cached.'); throw Exception('Can not try to send again. File is no longer cached.');
} }
final thumbnail = room.sendingFileThumbnails[eventId]; final thumbnail = room.sendingFileThumbnails[eventId];

View File

@ -6,7 +6,6 @@
/// - synced: (event came from sync loop) /// - synced: (event came from sync loop)
/// - roomState /// - roomState
enum EventStatus { enum EventStatus {
removed,
error, error,
sending, sending,
sent, sent,
@ -33,7 +32,6 @@ EventStatus latestEventStatus(EventStatus status1, EventStatus status2) =>
extension EventStatusExtension on EventStatus { extension EventStatusExtension on EventStatus {
/// Returns int value of the event status. /// Returns int value of the event status.
/// ///
/// - -2 == removed;
/// - -1 == error; /// - -1 == error;
/// - 0 == sending; /// - 0 == sending;
/// - 1 == sent; /// - 1 == sent;
@ -41,9 +39,6 @@ extension EventStatusExtension on EventStatus {
/// - 3 == roomState; /// - 3 == roomState;
int get intValue => (index - 2); int get intValue => (index - 2);
/// Return `true` if the `EventStatus` equals `removed`.
bool get isRemoved => this == EventStatus.removed;
/// Return `true` if the `EventStatus` equals `error`. /// Return `true` if the `EventStatus` equals `error`.
bool get isError => this == EventStatus.error; bool get isError => this == EventStatus.error;

View File

@ -44,6 +44,7 @@ class Timeline {
StreamSubscription<EventUpdate>? sub; StreamSubscription<EventUpdate>? sub;
StreamSubscription<SyncUpdate>? roomSub; StreamSubscription<SyncUpdate>? roomSub;
StreamSubscription<String>? sessionIdReceivedSub; StreamSubscription<String>? sessionIdReceivedSub;
StreamSubscription<String>? cancelSendEventSub;
bool isRequestingHistory = false; bool isRequestingHistory = false;
bool isRequestingFuture = false; bool isRequestingFuture = false;
@ -278,6 +279,8 @@ class Timeline {
sessionIdReceivedSub = sessionIdReceivedSub =
room.onSessionKeyReceived.stream.listen(_sessionKeyReceived); room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
cancelSendEventSub =
room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
// we want to populate our aggregated events // we want to populate our aggregated events
for (final e in events) { for (final e in events) {
@ -291,6 +294,16 @@ class Timeline {
} }
} }
void _cleanUpCancelledEvent(String eventId) {
final i = _findEvent(event_id: eventId);
if (i < events.length) {
removeAggregatedEvent(events[i]);
events.removeAt(i);
onRemove?.call(i);
onUpdate?.call();
}
}
/// Removes all entries from [events] which are not in this SyncUpdate. /// Removes all entries from [events] which are not in this SyncUpdate.
void _removeEventsNotInThisSync(SyncUpdate sync) { void _removeEventsNotInThisSync(SyncUpdate sync) {
final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? []; final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
@ -306,6 +319,8 @@ class Timeline {
roomSub?.cancel(); roomSub?.cancel();
// ignore: discarded_futures // ignore: discarded_futures
sessionIdReceivedSub?.cancel(); sessionIdReceivedSub?.cancel();
// ignore: discarded_futures
cancelSendEventSub?.cancel();
} }
void _sessionKeyReceived(String sessionId) async { void _sessionKeyReceived(String sessionId) async {
@ -462,55 +477,46 @@ class Timeline {
: null) ?? : null) ??
EventStatus.synced.intValue); EventStatus.synced.intValue);
if (status.isRemoved) { final i = _findEvent(
final i = _findEvent(event_id: eventUpdate.content['event_id']); event_id: eventUpdate.content['event_id'],
if (i < events.length) { unsigned_txid: eventUpdate.content['unsigned'] is Map
removeAggregatedEvent(events[i]); ? eventUpdate.content['unsigned']['transaction_id']
events.removeAt(i); : null);
onRemove?.call(i);
if (i < events.length) {
// if the old status is larger than the new one, we also want to preserve the old status
final oldStatus = events[i].status;
events[i] = Event.fromJson(
eventUpdate.content,
room,
);
// do we preserve the status? we should allow 0 -> -1 updates and status increases
if ((latestEventStatus(status, oldStatus) == oldStatus) &&
!(status.isError && oldStatus.isSending)) {
events[i].status = oldStatus;
} }
addAggregatedEvent(events[i]);
onChange?.call(i);
} else { } else {
final i = _findEvent( final newEvent = Event.fromJson(
event_id: eventUpdate.content['event_id'], eventUpdate.content,
unsigned_txid: eventUpdate.content['unsigned'] is Map room,
? eventUpdate.content['unsigned']['transaction_id'] );
: null);
if (i < events.length) { if (eventUpdate.type == EventUpdateType.history &&
// if the old status is larger than the new one, we also want to preserve the old status events.indexWhere(
final oldStatus = events[i].status; (e) => e.eventId == eventUpdate.content['event_id']) !=
events[i] = Event.fromJson( -1) return;
eventUpdate.content, var index = events.length;
room, if (eventUpdate.type == EventUpdateType.history) {
); events.add(newEvent);
// do we preserve the status? we should allow 0 -> -1 updates and status increases
if ((latestEventStatus(status, oldStatus) == oldStatus) &&
!(status.isError && oldStatus.isSending)) {
events[i].status = oldStatus;
}
addAggregatedEvent(events[i]);
onChange?.call(i);
} else { } else {
final newEvent = Event.fromJson( index = events.firstIndexWhereNotError;
eventUpdate.content, events.insert(index, newEvent);
room,
);
if (eventUpdate.type == EventUpdateType.history &&
events.indexWhere(
(e) => e.eventId == eventUpdate.content['event_id']) !=
-1) return;
var index = events.length;
if (eventUpdate.type == EventUpdateType.history) {
events.add(newEvent);
} else {
index = events.firstIndexWhereNotError;
events.insert(index, newEvent);
}
onInsert?.call(index);
addAggregatedEvent(newEvent);
} }
onInsert?.call(index);
addAggregatedEvent(newEvent);
} }
// Handle redaction events // Handle redaction events

View File

@ -242,12 +242,12 @@ void main() {
test('remove', () async { test('remove', () async {
final event = Event.fromJson( final event = Event.fromJson(
jsonObj, Room(id: '1234', client: Client('testclient'))); jsonObj,
final removed1 = await event.remove(); Room(id: '1234', client: Client('testclient')),
);
expect(() async => await event.cancelSend(), throwsException);
event.status = EventStatus.sending; event.status = EventStatus.sending;
final removed2 = await event.remove(); await event.cancelSend();
expect(removed1, false);
expect(removed2, true);
}); });
test('sendAgain', () async { test('sendAgain', () async {

View File

@ -270,7 +270,7 @@ void main() {
)); ));
await waitForCount(7); await waitForCount(7);
await timeline.events[0].remove(); await timeline.events[0].cancelSend();
await waitForCount(8); await waitForCount(8);
expect(updateCount, 8); expect(updateCount, 8);

View File

@ -507,7 +507,7 @@ void main() {
)); ));
await waitForCount(1); await waitForCount(1);
await timeline.events[0].remove(); await timeline.events[0].cancelSend();
await waitForCount(2); await waitForCount(2);