From 7899f598a2c5573c067e6b8badad6b4412defec7 Mon Sep 17 00:00:00 2001 From: Karthikeyan S Date: Tue, 18 Mar 2025 22:48:59 +0530 Subject: [PATCH] feat: export timeline events --- lib/matrix.dart | 1 + .../timeline_export.dart | 286 ++++++++++++++ test/msc_extensions/timeline_export_test.dart | 352 ++++++++++++++++++ 3 files changed, 639 insertions(+) create mode 100644 lib/msc_extensions/extension_timeline_export/timeline_export.dart create mode 100644 test/msc_extensions/timeline_export_test.dart diff --git a/lib/matrix.dart b/lib/matrix.dart index 0795f49e..bf31f0c1 100644 --- a/lib/matrix.dart +++ b/lib/matrix.dart @@ -80,6 +80,7 @@ export 'msc_extensions/msc_3935_cute_events/msc_3935_cute_events.dart'; export 'msc_extensions/msc_1236_widgets/msc_1236_widgets.dart'; export 'msc_extensions/msc_2835_uia_login/msc_2835_uia_login.dart'; export 'msc_extensions/msc_3814_dehydrated_devices/msc_3814_dehydrated_devices.dart'; +export 'msc_extensions/extension_timeline_export/timeline_export.dart'; export 'src/utils/web_worker/web_worker_stub.dart' if (dart.library.html) 'src/utils/web_worker/web_worker.dart'; diff --git a/lib/msc_extensions/extension_timeline_export/timeline_export.dart b/lib/msc_extensions/extension_timeline_export/timeline_export.dart new file mode 100644 index 00000000..2f9cb4d9 --- /dev/null +++ b/lib/msc_extensions/extension_timeline_export/timeline_export.dart @@ -0,0 +1,286 @@ +import 'dart:convert'; + +import 'package:matrix/matrix_api_lite.dart'; +import 'package:matrix/src/event.dart'; +import 'package:matrix/src/timeline.dart'; + +extension TimelineExportExtension on Timeline { + /// Exports timeline events from a Matrix room within a specified date range. + /// + /// The export process provides progress updates through the returned stream with the following information: + /// - Total number of events exported + /// - Count of unable-to-decrypt (UTD) events + /// - Count of media events (images, audio, video, files) + /// - Number of unique users involved + /// + /// ```dart + /// // Example usage: + /// final timeline = room.timeline; + /// final oneWeekAgo = DateTime.now().subtract(Duration(days: 7)); + /// + /// // Export last week's messages, excluding encrypted events + /// await for (final result in timeline.export( + /// from: oneWeekAgo, + /// filter: (event) => event?.type != EventTypes.Encrypted, + /// )) { + /// if (result is ExportProgress) { + /// print('Progress: ${result.totalEvents} events exported'); + /// } else if (result is ExportComplete) { + /// print('Export completed with ${result.events.length} events'); + /// } else if (result is ExportError) { + /// print('Export failed: ${result.error}'); + /// } + /// } + /// ``` + /// + /// [from] Optional start date to filter events. If null, exports from the beginning. + /// [until] Optional end date to filter events. If null, exports up to the latest event. + /// [filter] Optional function to filter events. Return true to include the event. + /// [requestHistoryCount] Optional. The number of events to request from the server at once. + /// + /// Returns a [Stream] of [ExportResult] which can be: + /// - [ExportProgress]: Provides progress updates during export + /// - [ExportComplete]: Contains the final list of exported events + /// - [ExportError]: Contains error information if export fails + Stream export({ + DateTime? from, + DateTime? until, + bool Function(Event)? filter, + int requestHistoryCount = 500, + }) async* { + final eventsToExport = []; + var utdEventsCount = 0; + var mediaEventsCount = 0; + final users = {}; + + try { + yield ExportProgress( + source: ExportSource.timeline, + totalEvents: 0, + utdEvents: 0, + mediaEvents: 0, + users: 0, + ); + + void exportEvent(Event event) { + eventsToExport.add(event); + + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted) { + utdEventsCount++; + } else if (event.type == EventTypes.Message && + { + MessageTypes.Sticker, + MessageTypes.Image, + MessageTypes.Audio, + MessageTypes.Video, + MessageTypes.File, + }.contains(event.messageType)) { + mediaEventsCount++; + } + users.add(event.senderId); + } + + // From the timeline + if (until == null || events.last.originServerTs.isBefore(until)) { + for (final event in events) { + if (from != null && event.originServerTs.isBefore(from)) break; + if (until != null && event.originServerTs.isAfter(until)) continue; + if (filter != null && !filter(event)) continue; + exportEvent(event); + } + } + yield ExportProgress( + source: ExportSource.timeline, + totalEvents: eventsToExport.length, + utdEvents: utdEventsCount, + mediaEvents: mediaEventsCount, + users: users.length, + ); + + if (from != null && events.last.originServerTs.isBefore(from)) { + yield ExportComplete( + events: eventsToExport, + totalEvents: eventsToExport.length, + utdEvents: utdEventsCount, + mediaEvents: mediaEventsCount, + users: users.length, + ); + return; + } + + // From the database + final eventsFromStore = await room.client.database + ?.getEventList(room, start: events.length) ?? + []; + if (eventsFromStore.isNotEmpty) { + if (until == null || + eventsFromStore.last.originServerTs.isBefore(until)) { + for (final event in eventsFromStore) { + if (from != null && event.originServerTs.isBefore(from)) break; + if (until != null && event.originServerTs.isAfter(until)) continue; + if (filter != null && !filter(event)) continue; + exportEvent(event); + } + } + yield ExportProgress( + source: ExportSource.database, + totalEvents: eventsToExport.length, + utdEvents: utdEventsCount, + mediaEvents: mediaEventsCount, + users: users.length, + ); + + if (from != null && + eventsFromStore.last.originServerTs.isBefore(from)) { + yield ExportComplete( + events: eventsToExport, + totalEvents: eventsToExport.length, + utdEvents: utdEventsCount, + mediaEvents: mediaEventsCount, + users: users.length, + ); + return; + } + } + + // From the server + var prevBatch = room.prev_batch; + final encryption = room.client.encryption; + do { + if (prevBatch == null) break; + try { + final resp = await room.client.getRoomEvents( + room.id, + Direction.b, + from: prevBatch, + limit: requestHistoryCount, + filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()), + ); + if (resp.chunk.isEmpty) break; + + for (final matrixEvent in resp.chunk) { + var event = Event.fromMatrixEvent(matrixEvent, room); + if (event.type == EventTypes.Encrypted && encryption != null) { + event = await encryption.decryptRoomEvent(event); + if (event.type == EventTypes.Encrypted && + event.messageType == MessageTypes.BadEncrypted && + event.content['can_request_session'] == true) { + // Await requestKey() here to ensure decrypted message bodies + await event.requestKey().catchError((_) {}); + } + } + if (from != null && event.originServerTs.isBefore(from)) break; + if (until != null && event.originServerTs.isAfter(until)) continue; + if (filter != null && !filter(event)) continue; + exportEvent(event); + } + yield ExportProgress( + source: ExportSource.server, + totalEvents: eventsToExport.length, + utdEvents: utdEventsCount, + mediaEvents: mediaEventsCount, + users: users.length, + ); + + prevBatch = resp.end; + if (resp.chunk.length < requestHistoryCount) break; + + if (from != null && resp.chunk.last.originServerTs.isBefore(from)) { + break; + } + } on MatrixException catch (e) { + // We have no permission anymore to request the history, so we stop here + // and return the events we have so far + if (e.error == MatrixError.M_FORBIDDEN) { + break; + } + // If it's not a forbidden error, we yield an [ExportError] + rethrow; + } + } while (true); + + yield ExportComplete( + events: eventsToExport, + totalEvents: eventsToExport.length, + utdEvents: utdEventsCount, + mediaEvents: mediaEventsCount, + users: users.length, + ); + } catch (e) { + yield ExportError( + error: e.toString(), + totalEvents: eventsToExport.length, + utdEvents: utdEventsCount, + mediaEvents: mediaEventsCount, + users: users.length, + ); + } + } +} + +/// Base class for export results +sealed class ExportResult { + /// Total events count + final int totalEvents; + + /// Unable-to-decrypt events count + final int utdEvents; + + /// Media events count + final int mediaEvents; + + /// Users count + final int users; + + ExportResult({ + required this.totalEvents, + required this.utdEvents, + required this.mediaEvents, + required this.users, + }); +} + +enum ExportSource { + timeline, + database, + server, +} + +/// Represents progress during export +final class ExportProgress extends ExportResult { + /// Export source + final ExportSource source; + + ExportProgress({ + required this.source, + required super.totalEvents, + required super.utdEvents, + required super.mediaEvents, + required super.users, + }); +} + +/// Represents successful completion with exported events +final class ExportComplete extends ExportResult { + final List events; + ExportComplete({ + required this.events, + required super.totalEvents, + required super.utdEvents, + required super.mediaEvents, + required super.users, + }); +} + +/// Represents an error during export +final class ExportError extends ExportResult { + final String error; + ExportError({ + required this.error, + required super.totalEvents, + required super.utdEvents, + required super.mediaEvents, + required super.users, + }); +} diff --git a/test/msc_extensions/timeline_export_test.dart b/test/msc_extensions/timeline_export_test.dart new file mode 100644 index 00000000..f0709ed4 --- /dev/null +++ b/test/msc_extensions/timeline_export_test.dart @@ -0,0 +1,352 @@ +import 'package:test/test.dart'; + +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/models/timeline_chunk.dart'; + +// Mock implementations + +// MockClient: Simulates Matrix server responses for `getRoomEvents` calls that +// are used by the `TimelineExportExtension.export` method. +// This also allows testing error scenarios by throwing exceptions on demand. +class MockClient extends Client { + List serverEvents, dbEvents; + final bool throwError; + + MockClient( + super.name, { + this.serverEvents = const [], + this.dbEvents = const [], + this.throwError = false, + }); + + @override + Future getRoomEvents( + String roomId, + Direction direction, { + String? from, + String? to, + int? limit, + String? filter, + }) async { + if (throwError) { + throw MatrixException.fromJson({'errcode': 'M_FORBIDDEN'}); + } + + final chunk = serverEvents + .skip(int.parse(from ?? '0')) + .take(limit ?? serverEvents.length) + .toList(); + return GetRoomEventsResponse( + chunk: chunk, + start: from ?? '0', + end: chunk.isEmpty + ? '0' + : (serverEvents.indexOf(chunk.last) + 1).toString(), + state: [], + ); + } + + @override + DatabaseApi? get database => MockDatabase(dbEvents); +} + +// MockDatabase: Simulates database access for the `TimelineExportExtension.export` +// method. +class MockDatabase implements DatabaseApi { + final List dbEvents; + + MockDatabase(this.dbEvents); + + @override + Future> getEventList( + Room room, { + int start = 0, + int? limit, + bool onlySending = false, + }) async { + if (start >= dbEvents.length) return []; + return dbEvents.skip(start).take(limit ?? 50).toList(); + } + + @override + dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation); +} + +// Test helpers +Event createTestEvent({ + required String eventId, + required String type, + required String msgtype, + required DateTime timestamp, + required Room room, +}) { + return Event( + eventId: eventId, + type: type, + content: { + 'msgtype': msgtype, + 'body': 'Test message $eventId', + }, + senderId: '@user:example.com', + originServerTs: timestamp, + room: room, + status: EventStatus.synced, + ); +} + +List createMockEvents({ + required int count, + required DateTime startTime, + required Room room, +}) { + return List.generate( + count, + (i) => createTestEvent( + eventId: 'event$i', + type: i % 3 == 0 ? EventTypes.Message : EventTypes.Encrypted, + msgtype: i % 3 == 0 ? MessageTypes.Text : MessageTypes.BadEncrypted, + timestamp: startTime.subtract(Duration(hours: i)), + room: room, + ), + ); +} + +void main() { + group('TimelineExportExtension', () { + late DateTime now; + late MockClient client; + late Room room; + late Timeline timeline; + + setUp(() { + now = DateTime.now(); + client = MockClient('testclient'); + room = Room(id: '!testroom:example.com', client: client); + timeline = Timeline(room: room, chunk: TimelineChunk(events: [])); + }); + + group('basic export functionality', () { + late List mockEvents; + + setUp(() { + mockEvents = createMockEvents(count: 20, startTime: now, room: room); + + // Set up initial state + timeline.events.addAll(mockEvents.take(5)); + client.dbEvents = mockEvents.take(10).toList(); + client.serverEvents = mockEvents; + room.prev_batch = '10'; + }); + + test('exports events from all sources in correct order', () async { + final results = []; + await for (final result in timeline.export()) { + results.add(result); + } + + expect(results.whereType().length, greaterThan(1)); + expect(results.first, isA()); + expect(results.last, isA()); + + final complete = results.last as ExportComplete; + expect(complete.events.length, mockEvents.length); + expect( + complete.events.map((e) => e.eventId).toSet(), + mockEvents.map((e) => e.eventId).toSet(), + ); + + // Verify events are in chronological order + for (int i = 1; i < complete.events.length; i++) { + expect( + complete.events[i].originServerTs + .isBefore(complete.events[i - 1].originServerTs) || + complete.events[i].originServerTs + .isAtSameMomentAs(complete.events[i - 1].originServerTs), + isTrue, + reason: 'Events should be in reverse chronological order', + ); + } + }); + + test('filters events by date range correctly', () async { + final from = now.subtract(const Duration(hours: 8, seconds: 1)); + final until = now.subtract(const Duration(hours: 3, seconds: 1)); + + final results = []; + await for (final result in timeline.export(from: from, until: until)) { + results.add(result); + } + + final complete = results.last as ExportComplete; + + expect( + complete.events.every( + (e) => + e.originServerTs.isAfter(from) && + e.originServerTs.isBefore(until), + ), + isTrue, + ); + }); + }); + + group('pagination handling', () { + test('handles server pagination with large event sets', () async { + final manyServerEvents = createMockEvents( + count: 150, + startTime: now.subtract(const Duration(hours: 10)), + room: room, + ); + + client = MockClient('testclient', serverEvents: manyServerEvents); + room = Room(id: '!testroom:example.com', client: client); + timeline = Timeline( + room: room, + chunk: TimelineChunk(events: manyServerEvents.take(10).toList()), + ); + room.prev_batch = '10'; + + final results = []; + var serverProgressUpdates = 0; + var lastTotalEvents = 0; + + await for (final result in timeline.export(requestHistoryCount: 100)) { + results.add(result); + if (result is ExportProgress && + result.source == ExportSource.server) { + serverProgressUpdates++; + expect(result.totalEvents, greaterThanOrEqualTo(lastTotalEvents)); + lastTotalEvents = result.totalEvents; + } + } + + final complete = results.last as ExportComplete; + expect(complete.events.length, 150); + expect(serverProgressUpdates, equals(2)); + expect( + complete.events.map((e) => e.eventId).toSet(), + manyServerEvents.map((e) => e.eventId).toSet(), + ); + }); + }); + + group('error handling', () { + test('continues export when server returns error', () async { + client = MockClient('testclient', throwError: true); + room = Room(id: '!testroom:example.com', client: client); + final initialEvents = + createMockEvents(count: 5, startTime: now, room: room); + client.dbEvents = initialEvents; + client.serverEvents = initialEvents; + timeline = Timeline( + room: room, + chunk: TimelineChunk(events: initialEvents), + ); + room.prev_batch = '5'; + + final results = []; + + await for (final result in timeline.export()) { + results.add(result); + + if (result is ExportProgress) { + switch (result.source) { + case ExportSource.timeline: + expect( + result.totalEvents == 0 || result.totalEvents == 5, + isTrue, + ); + break; + case ExportSource.database: + break; + case ExportSource.server: + // Should not see server progress due to error + fail( + 'Should not receive server progress updates when server throws error', + ); + } + } + } + + final complete = results.last as ExportComplete; + expect(complete.events.length, 5); + expect( + complete.events.map((e) => e.eventId).toSet(), + initialEvents.map((e) => e.eventId).toSet(), + ); + }); + }); + + group('event type counting', () { + test('correctly counts media and UTD events', () async { + final mixedEvents = [ + createTestEvent( + eventId: 'image1', + type: EventTypes.Message, + msgtype: MessageTypes.Image, + timestamp: now, + room: room, + ), + createTestEvent( + eventId: 'video1', + type: EventTypes.Message, + msgtype: MessageTypes.Video, + timestamp: now, + room: room, + ), + createTestEvent( + eventId: 'text1', + type: EventTypes.Message, + msgtype: MessageTypes.Text, + timestamp: now, + room: room, + ), + createTestEvent( + eventId: 'utd1', + type: EventTypes.Encrypted, + msgtype: MessageTypes.BadEncrypted, + timestamp: now, + room: room, + ), + ]; + + client = MockClient('testclient', serverEvents: mixedEvents); + room = Room(id: '!testroom:example.com', client: client); + timeline = Timeline(room: room, chunk: TimelineChunk(events: [])); + room.prev_batch = '0'; + + final results = []; + await for (final result in timeline.export()) { + results.add(result); + } + + final complete = results.last as ExportComplete; + expect(complete.mediaEvents, 2); + expect(complete.utdEvents, 1); + expect(complete.events.length, mixedEvents.length); + + final mediaEvents = complete.events.where( + (e) => + e.type == EventTypes.Message && + (e.messageType == MessageTypes.Image || + e.messageType == MessageTypes.Video), + ); + expect(mediaEvents.length, 2); + + final utdEvents = complete.events.where( + (e) => + e.type == EventTypes.Encrypted && + e.messageType == MessageTypes.BadEncrypted, + ); + expect(utdEvents.length, 1); + + final textEvents = complete.events.where( + (e) => + e.type == EventTypes.Message && + e.messageType == MessageTypes.Text, + ); + expect(textEvents.length, 1); + }); + }); + }); +}