Merge pull request #2082 from famedly/karthi/event-status-fix

fix: incorrect Event status update in constructor
This commit is contained in:
Karthikeyan Sankaran 2025-05-08 19:22:11 +05:30 committed by GitHub
commit d398ade831
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 144 additions and 8 deletions

View File

@ -126,7 +126,17 @@ class Event extends MatrixEvent {
originServerTs.millisecondsSinceEpoch;
final room = this.room;
if (age > room.client.sendTimelineEventTimeout.inMilliseconds) {
if (
// We don't want to mark the event as failed if it's the lastEvent in the room
// since that would be a race condition (with the same event from timeline)
// The `room.lastEvent` is null at the time this constructor is called for it,
// there's no other way to check this.
room.lastEvent?.eventId != null &&
// If the event is in the sending queue, then we don't mess with it.
!room.sendingQueueEventsByTxId.contains(transactionId) &&
// Else, if the event is older than the timeout, then we mark it as failed.
age > room.client.sendTimelineEventTimeout.inMilliseconds) {
// Update this event in database and open timelines
final json = toJson();
json['unsigned'] ??= <String, dynamic>{};

View File

@ -72,7 +72,12 @@ class Room {
/// Key-Value store for private account data only visible for this user.
Map<String, BasicEvent> roomAccountData = {};
final _sendingQueue = <Completer>[];
/// Queue of sending events
/// NOTE: This shouldn't be modified directly, use [sendEvent] instead. This is only used for testing.
final sendingQueue = <Completer>[];
/// List of transaction IDs of events that are currently queued to be sent
final sendingQueueEventsByTxId = <String>[];
Timer? _clearTypingIndicatorTimer;
@ -1106,11 +1111,14 @@ class Room {
},
),
);
// we need to add the transaction ID to the set of events that are currently queued to be sent
// even before the fake sync is called, so that the event constructor can check if the event is in the sending state
sendingQueueEventsByTxId.add(messageID);
await _handleFakeSync(syncUpdate);
final completer = Completer();
_sendingQueue.add(completer);
while (_sendingQueue.first != completer) {
await _sendingQueue.first.future;
sendingQueue.add(completer);
while (sendingQueue.first != completer) {
await sendingQueue.first.future;
}
final timeoutDate = DateTime.now().add(client.sendTimelineEventTimeout);
@ -1142,7 +1150,8 @@ class Room {
.unsigned![messageSendingStatusKey] = EventStatus.error.intValue;
await _handleFakeSync(syncUpdate);
completer.complete();
_sendingQueue.remove(completer);
sendingQueue.remove(completer);
sendingQueueEventsByTxId.remove(messageID);
if (e is EventTooLarge ||
(e is MatrixException && e.error == MatrixError.M_FORBIDDEN)) {
rethrow;
@ -1160,8 +1169,8 @@ class Room {
syncUpdate.rooms!.join!.values.first.timeline!.events!.first.eventId = res;
await _handleFakeSync(syncUpdate);
completer.complete();
_sendingQueue.remove(completer);
sendingQueue.remove(completer);
sendingQueueEventsByTxId.remove(messageID);
return res;
}

View File

@ -0,0 +1,117 @@
import 'dart:async';
import 'dart:io';
import 'package:path/path.dart';
import 'package:test/test.dart';
import 'package:matrix/matrix.dart';
import 'fake_client.dart';
void main() {
group('Event timeout tests', () {
late Client client;
late Room room;
setUp(() async {
client = await getClient(
sendTimelineEventTimeout: const Duration(seconds: 5),
databasePath: join(Directory.current.path, 'test.sqlite'),
);
room = Room(
id: '!1234:example.com',
client: client,
roomAccountData: {},
);
client.rooms.add(room);
});
tearDown(() async {
await client.logout();
await client.dispose(closeDatabase: true);
});
test('Event constructor correctly checks timeout from originServerTs',
() async {
final completer = Completer();
room.sendingQueue.add(completer); // to block the events from being sent
String? eventId;
// we don't await this because the actual sending will only be done after
// `sendingQueue` is unblocked.
// but the fake sync will be called with this event in sending state right away
unawaited(
room.sendTextEvent('test', txid: '1234').then((value) {
eventId = value;
}),
);
// do the timeout
final timeout =
Duration(seconds: client.sendTimelineEventTimeout.inSeconds + 2);
await Future.delayed(timeout);
// this will trigger the check in the Event constructor to see if the
// event is in error state (and call fake sync with updated error status)
await client.oneShotSync();
Timeline timeline = await room.getTimeline();
expect(timeline.events.length, 1);
expect(timeline.events.first.status, EventStatus.sending);
// fake sync would have been triggered by now (if there was one), which shouldn't happen
await client.oneShotSync();
timeline = await room.getTimeline();
expect(timeline.events.length, 1);
expect(timeline.events.first.status, EventStatus.sending);
// now we unblock the sending queue and this will make `sendTextEvent`
// actually send the event and the fake sync that's used to update the
// event status to sent
completer.complete();
room.sendingQueue.remove(completer);
await FakeMatrixApi.firstWhere(
(a) => a.startsWith(
'/client/v3/rooms/!1234%3Aexample.com/send/m.room.message/1234',
),
);
await Future.delayed(const Duration(seconds: 1));
expect(eventId, isNotNull);
// now the event should be in sent state after the fake sync is called
await client.oneShotSync();
timeline = await room.getTimeline();
expect(timeline.events.length, 1);
expect(timeline.events.first.status, EventStatus.sent);
// simulate the event being synced from server
await client.handleSync(
SyncUpdate(
nextBatch: '1',
rooms: RoomsUpdate(
join: {
room.id: JoinedRoomUpdate(
timeline: TimelineUpdate(
events: [
Event(
eventId: eventId!,
content: {'msgtype': 'm.text', 'body': 'test'},
type: 'm.room.message',
senderId: '@test:example.com',
originServerTs: DateTime.now(),
room: room,
unsigned: {
'transaction_id': '1234',
},
),
],
),
),
},
),
),
);
timeline = await room.getTimeline();
expect(timeline.events.length, 1);
expect(timeline.events.first.status, EventStatus.synced);
});
});
}