From a6999255d6fe3d7b2320245e2e363c16d54e026f Mon Sep 17 00:00:00 2001 From: Karthikeyan S Date: Thu, 8 May 2025 19:17:53 +0530 Subject: [PATCH] fix: incorrect Event status update in constructor --- lib/src/event.dart | 12 +++- lib/src/room.dart | 23 ++++--- test/event_timeout_test.dart | 117 +++++++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+), 8 deletions(-) create mode 100644 test/event_timeout_test.dart diff --git a/lib/src/event.dart b/lib/src/event.dart index f08a383a..885e28ba 100644 --- a/lib/src/event.dart +++ b/lib/src/event.dart @@ -126,7 +126,17 @@ class Event extends MatrixEvent { originServerTs.millisecondsSinceEpoch; final room = this.room; - if (age > room.client.sendTimelineEventTimeout.inMilliseconds) { + + if ( + // We don't want to mark the event as failed if it's the lastEvent in the room + // since that would be a race condition (with the same event from timeline) + // The `room.lastEvent` is null at the time this constructor is called for it, + // there's no other way to check this. + room.lastEvent?.eventId != null && + // If the event is in the sending queue, then we don't mess with it. + !room.sendingQueueEventsByTxId.contains(transactionId) && + // Else, if the event is older than the timeout, then we mark it as failed. + age > room.client.sendTimelineEventTimeout.inMilliseconds) { // Update this event in database and open timelines final json = toJson(); json['unsigned'] ??= {}; diff --git a/lib/src/room.dart b/lib/src/room.dart index 8bfeff94..e66fad1d 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -72,7 +72,12 @@ class Room { /// Key-Value store for private account data only visible for this user. Map roomAccountData = {}; - final _sendingQueue = []; + /// Queue of sending events + /// NOTE: This shouldn't be modified directly, use [sendEvent] instead. This is only used for testing. + final sendingQueue = []; + + /// List of transaction IDs of events that are currently queued to be sent + final sendingQueueEventsByTxId = []; Timer? _clearTypingIndicatorTimer; @@ -1106,11 +1111,14 @@ class Room { }, ), ); + // we need to add the transaction ID to the set of events that are currently queued to be sent + // even before the fake sync is called, so that the event constructor can check if the event is in the sending state + sendingQueueEventsByTxId.add(messageID); await _handleFakeSync(syncUpdate); final completer = Completer(); - _sendingQueue.add(completer); - while (_sendingQueue.first != completer) { - await _sendingQueue.first.future; + sendingQueue.add(completer); + while (sendingQueue.first != completer) { + await sendingQueue.first.future; } final timeoutDate = DateTime.now().add(client.sendTimelineEventTimeout); @@ -1142,7 +1150,8 @@ class Room { .unsigned![messageSendingStatusKey] = EventStatus.error.intValue; await _handleFakeSync(syncUpdate); completer.complete(); - _sendingQueue.remove(completer); + sendingQueue.remove(completer); + sendingQueueEventsByTxId.remove(messageID); if (e is EventTooLarge || (e is MatrixException && e.error == MatrixError.M_FORBIDDEN)) { rethrow; @@ -1160,8 +1169,8 @@ class Room { syncUpdate.rooms!.join!.values.first.timeline!.events!.first.eventId = res; await _handleFakeSync(syncUpdate); completer.complete(); - _sendingQueue.remove(completer); - + sendingQueue.remove(completer); + sendingQueueEventsByTxId.remove(messageID); return res; } diff --git a/test/event_timeout_test.dart b/test/event_timeout_test.dart new file mode 100644 index 00000000..62cb2f74 --- /dev/null +++ b/test/event_timeout_test.dart @@ -0,0 +1,117 @@ +import 'dart:async'; +import 'dart:io'; + +import 'package:path/path.dart'; +import 'package:test/test.dart'; + +import 'package:matrix/matrix.dart'; +import 'fake_client.dart'; + +void main() { + group('Event timeout tests', () { + late Client client; + late Room room; + + setUp(() async { + client = await getClient( + sendTimelineEventTimeout: const Duration(seconds: 5), + databasePath: join(Directory.current.path, 'test.sqlite'), + ); + room = Room( + id: '!1234:example.com', + client: client, + roomAccountData: {}, + ); + client.rooms.add(room); + }); + + tearDown(() async { + await client.logout(); + await client.dispose(closeDatabase: true); + }); + + test('Event constructor correctly checks timeout from originServerTs', + () async { + final completer = Completer(); + room.sendingQueue.add(completer); // to block the events from being sent + + String? eventId; + // we don't await this because the actual sending will only be done after + // `sendingQueue` is unblocked. + // but the fake sync will be called with this event in sending state right away + unawaited( + room.sendTextEvent('test', txid: '1234').then((value) { + eventId = value; + }), + ); + + // do the timeout + final timeout = + Duration(seconds: client.sendTimelineEventTimeout.inSeconds + 2); + await Future.delayed(timeout); + + // this will trigger the check in the Event constructor to see if the + // event is in error state (and call fake sync with updated error status) + await client.oneShotSync(); + Timeline timeline = await room.getTimeline(); + expect(timeline.events.length, 1); + expect(timeline.events.first.status, EventStatus.sending); + + // fake sync would have been triggered by now (if there was one), which shouldn't happen + await client.oneShotSync(); + timeline = await room.getTimeline(); + expect(timeline.events.length, 1); + expect(timeline.events.first.status, EventStatus.sending); + + // now we unblock the sending queue and this will make `sendTextEvent` + // actually send the event and the fake sync that's used to update the + // event status to sent + completer.complete(); + room.sendingQueue.remove(completer); + await FakeMatrixApi.firstWhere( + (a) => a.startsWith( + '/client/v3/rooms/!1234%3Aexample.com/send/m.room.message/1234', + ), + ); + await Future.delayed(const Duration(seconds: 1)); + expect(eventId, isNotNull); + + // now the event should be in sent state after the fake sync is called + await client.oneShotSync(); + timeline = await room.getTimeline(); + expect(timeline.events.length, 1); + expect(timeline.events.first.status, EventStatus.sent); + + // simulate the event being synced from server + await client.handleSync( + SyncUpdate( + nextBatch: '1', + rooms: RoomsUpdate( + join: { + room.id: JoinedRoomUpdate( + timeline: TimelineUpdate( + events: [ + Event( + eventId: eventId!, + content: {'msgtype': 'm.text', 'body': 'test'}, + type: 'm.room.message', + senderId: '@test:example.com', + originServerTs: DateTime.now(), + room: room, + unsigned: { + 'transaction_id': '1234', + }, + ), + ], + ), + ), + }, + ), + ), + ); + timeline = await room.getTimeline(); + expect(timeline.events.length, 1); + expect(timeline.events.first.status, EventStatus.synced); + }); + }); +}