fix: incorrect Event status update in constructor
This commit is contained in:
parent
86d17e6415
commit
a6999255d6
|
|
@ -126,7 +126,17 @@ class Event extends MatrixEvent {
|
||||||
originServerTs.millisecondsSinceEpoch;
|
originServerTs.millisecondsSinceEpoch;
|
||||||
|
|
||||||
final room = this.room;
|
final room = this.room;
|
||||||
if (age > room.client.sendTimelineEventTimeout.inMilliseconds) {
|
|
||||||
|
if (
|
||||||
|
// We don't want to mark the event as failed if it's the lastEvent in the room
|
||||||
|
// since that would be a race condition (with the same event from timeline)
|
||||||
|
// The `room.lastEvent` is null at the time this constructor is called for it,
|
||||||
|
// there's no other way to check this.
|
||||||
|
room.lastEvent?.eventId != null &&
|
||||||
|
// If the event is in the sending queue, then we don't mess with it.
|
||||||
|
!room.sendingQueueEventsByTxId.contains(transactionId) &&
|
||||||
|
// Else, if the event is older than the timeout, then we mark it as failed.
|
||||||
|
age > room.client.sendTimelineEventTimeout.inMilliseconds) {
|
||||||
// Update this event in database and open timelines
|
// Update this event in database and open timelines
|
||||||
final json = toJson();
|
final json = toJson();
|
||||||
json['unsigned'] ??= <String, dynamic>{};
|
json['unsigned'] ??= <String, dynamic>{};
|
||||||
|
|
|
||||||
|
|
@ -72,7 +72,12 @@ class Room {
|
||||||
/// Key-Value store for private account data only visible for this user.
|
/// Key-Value store for private account data only visible for this user.
|
||||||
Map<String, BasicEvent> roomAccountData = {};
|
Map<String, BasicEvent> roomAccountData = {};
|
||||||
|
|
||||||
final _sendingQueue = <Completer>[];
|
/// Queue of sending events
|
||||||
|
/// NOTE: This shouldn't be modified directly, use [sendEvent] instead. This is only used for testing.
|
||||||
|
final sendingQueue = <Completer>[];
|
||||||
|
|
||||||
|
/// List of transaction IDs of events that are currently queued to be sent
|
||||||
|
final sendingQueueEventsByTxId = <String>[];
|
||||||
|
|
||||||
Timer? _clearTypingIndicatorTimer;
|
Timer? _clearTypingIndicatorTimer;
|
||||||
|
|
||||||
|
|
@ -1106,11 +1111,14 @@ class Room {
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
// we need to add the transaction ID to the set of events that are currently queued to be sent
|
||||||
|
// even before the fake sync is called, so that the event constructor can check if the event is in the sending state
|
||||||
|
sendingQueueEventsByTxId.add(messageID);
|
||||||
await _handleFakeSync(syncUpdate);
|
await _handleFakeSync(syncUpdate);
|
||||||
final completer = Completer();
|
final completer = Completer();
|
||||||
_sendingQueue.add(completer);
|
sendingQueue.add(completer);
|
||||||
while (_sendingQueue.first != completer) {
|
while (sendingQueue.first != completer) {
|
||||||
await _sendingQueue.first.future;
|
await sendingQueue.first.future;
|
||||||
}
|
}
|
||||||
|
|
||||||
final timeoutDate = DateTime.now().add(client.sendTimelineEventTimeout);
|
final timeoutDate = DateTime.now().add(client.sendTimelineEventTimeout);
|
||||||
|
|
@ -1142,7 +1150,8 @@ class Room {
|
||||||
.unsigned![messageSendingStatusKey] = EventStatus.error.intValue;
|
.unsigned![messageSendingStatusKey] = EventStatus.error.intValue;
|
||||||
await _handleFakeSync(syncUpdate);
|
await _handleFakeSync(syncUpdate);
|
||||||
completer.complete();
|
completer.complete();
|
||||||
_sendingQueue.remove(completer);
|
sendingQueue.remove(completer);
|
||||||
|
sendingQueueEventsByTxId.remove(messageID);
|
||||||
if (e is EventTooLarge ||
|
if (e is EventTooLarge ||
|
||||||
(e is MatrixException && e.error == MatrixError.M_FORBIDDEN)) {
|
(e is MatrixException && e.error == MatrixError.M_FORBIDDEN)) {
|
||||||
rethrow;
|
rethrow;
|
||||||
|
|
@ -1160,8 +1169,8 @@ class Room {
|
||||||
syncUpdate.rooms!.join!.values.first.timeline!.events!.first.eventId = res;
|
syncUpdate.rooms!.join!.values.first.timeline!.events!.first.eventId = res;
|
||||||
await _handleFakeSync(syncUpdate);
|
await _handleFakeSync(syncUpdate);
|
||||||
completer.complete();
|
completer.complete();
|
||||||
_sendingQueue.remove(completer);
|
sendingQueue.remove(completer);
|
||||||
|
sendingQueueEventsByTxId.remove(messageID);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,117 @@
|
||||||
|
import 'dart:async';
|
||||||
|
import 'dart:io';
|
||||||
|
|
||||||
|
import 'package:path/path.dart';
|
||||||
|
import 'package:test/test.dart';
|
||||||
|
|
||||||
|
import 'package:matrix/matrix.dart';
|
||||||
|
import 'fake_client.dart';
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
group('Event timeout tests', () {
|
||||||
|
late Client client;
|
||||||
|
late Room room;
|
||||||
|
|
||||||
|
setUp(() async {
|
||||||
|
client = await getClient(
|
||||||
|
sendTimelineEventTimeout: const Duration(seconds: 5),
|
||||||
|
databasePath: join(Directory.current.path, 'test.sqlite'),
|
||||||
|
);
|
||||||
|
room = Room(
|
||||||
|
id: '!1234:example.com',
|
||||||
|
client: client,
|
||||||
|
roomAccountData: {},
|
||||||
|
);
|
||||||
|
client.rooms.add(room);
|
||||||
|
});
|
||||||
|
|
||||||
|
tearDown(() async {
|
||||||
|
await client.logout();
|
||||||
|
await client.dispose(closeDatabase: true);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('Event constructor correctly checks timeout from originServerTs',
|
||||||
|
() async {
|
||||||
|
final completer = Completer();
|
||||||
|
room.sendingQueue.add(completer); // to block the events from being sent
|
||||||
|
|
||||||
|
String? eventId;
|
||||||
|
// we don't await this because the actual sending will only be done after
|
||||||
|
// `sendingQueue` is unblocked.
|
||||||
|
// but the fake sync will be called with this event in sending state right away
|
||||||
|
unawaited(
|
||||||
|
room.sendTextEvent('test', txid: '1234').then((value) {
|
||||||
|
eventId = value;
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
// do the timeout
|
||||||
|
final timeout =
|
||||||
|
Duration(seconds: client.sendTimelineEventTimeout.inSeconds + 2);
|
||||||
|
await Future.delayed(timeout);
|
||||||
|
|
||||||
|
// this will trigger the check in the Event constructor to see if the
|
||||||
|
// event is in error state (and call fake sync with updated error status)
|
||||||
|
await client.oneShotSync();
|
||||||
|
Timeline timeline = await room.getTimeline();
|
||||||
|
expect(timeline.events.length, 1);
|
||||||
|
expect(timeline.events.first.status, EventStatus.sending);
|
||||||
|
|
||||||
|
// fake sync would have been triggered by now (if there was one), which shouldn't happen
|
||||||
|
await client.oneShotSync();
|
||||||
|
timeline = await room.getTimeline();
|
||||||
|
expect(timeline.events.length, 1);
|
||||||
|
expect(timeline.events.first.status, EventStatus.sending);
|
||||||
|
|
||||||
|
// now we unblock the sending queue and this will make `sendTextEvent`
|
||||||
|
// actually send the event and the fake sync that's used to update the
|
||||||
|
// event status to sent
|
||||||
|
completer.complete();
|
||||||
|
room.sendingQueue.remove(completer);
|
||||||
|
await FakeMatrixApi.firstWhere(
|
||||||
|
(a) => a.startsWith(
|
||||||
|
'/client/v3/rooms/!1234%3Aexample.com/send/m.room.message/1234',
|
||||||
|
),
|
||||||
|
);
|
||||||
|
await Future.delayed(const Duration(seconds: 1));
|
||||||
|
expect(eventId, isNotNull);
|
||||||
|
|
||||||
|
// now the event should be in sent state after the fake sync is called
|
||||||
|
await client.oneShotSync();
|
||||||
|
timeline = await room.getTimeline();
|
||||||
|
expect(timeline.events.length, 1);
|
||||||
|
expect(timeline.events.first.status, EventStatus.sent);
|
||||||
|
|
||||||
|
// simulate the event being synced from server
|
||||||
|
await client.handleSync(
|
||||||
|
SyncUpdate(
|
||||||
|
nextBatch: '1',
|
||||||
|
rooms: RoomsUpdate(
|
||||||
|
join: {
|
||||||
|
room.id: JoinedRoomUpdate(
|
||||||
|
timeline: TimelineUpdate(
|
||||||
|
events: [
|
||||||
|
Event(
|
||||||
|
eventId: eventId!,
|
||||||
|
content: {'msgtype': 'm.text', 'body': 'test'},
|
||||||
|
type: 'm.room.message',
|
||||||
|
senderId: '@test:example.com',
|
||||||
|
originServerTs: DateTime.now(),
|
||||||
|
room: room,
|
||||||
|
unsigned: {
|
||||||
|
'transaction_id': '1234',
|
||||||
|
},
|
||||||
|
),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
timeline = await room.getTimeline();
|
||||||
|
expect(timeline.events.length, 1);
|
||||||
|
expect(timeline.events.first.status, EventStatus.synced);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue