Merge pull request #2040 from famedly/karthi/export-chat
feat: export timeline events
This commit is contained in:
commit
d8b262b629
|
|
@ -80,6 +80,7 @@ export 'msc_extensions/msc_3935_cute_events/msc_3935_cute_events.dart';
|
|||
export 'msc_extensions/msc_1236_widgets/msc_1236_widgets.dart';
|
||||
export 'msc_extensions/msc_2835_uia_login/msc_2835_uia_login.dart';
|
||||
export 'msc_extensions/msc_3814_dehydrated_devices/msc_3814_dehydrated_devices.dart';
|
||||
export 'msc_extensions/extension_timeline_export/timeline_export.dart';
|
||||
|
||||
export 'src/utils/web_worker/web_worker_stub.dart'
|
||||
if (dart.library.html) 'src/utils/web_worker/web_worker.dart';
|
||||
|
|
|
|||
|
|
@ -0,0 +1,286 @@
|
|||
import 'dart:convert';
|
||||
|
||||
import 'package:matrix/matrix_api_lite.dart';
|
||||
import 'package:matrix/src/event.dart';
|
||||
import 'package:matrix/src/timeline.dart';
|
||||
|
||||
extension TimelineExportExtension on Timeline {
|
||||
/// Exports timeline events from a Matrix room within a specified date range.
|
||||
///
|
||||
/// The export process provides progress updates through the returned stream with the following information:
|
||||
/// - Total number of events exported
|
||||
/// - Count of unable-to-decrypt (UTD) events
|
||||
/// - Count of media events (images, audio, video, files)
|
||||
/// - Number of unique users involved
|
||||
///
|
||||
/// ```dart
|
||||
/// // Example usage:
|
||||
/// final timeline = room.timeline;
|
||||
/// final oneWeekAgo = DateTime.now().subtract(Duration(days: 7));
|
||||
///
|
||||
/// // Export last week's messages, excluding encrypted events
|
||||
/// await for (final result in timeline.export(
|
||||
/// from: oneWeekAgo,
|
||||
/// filter: (event) => event?.type != EventTypes.Encrypted,
|
||||
/// )) {
|
||||
/// if (result is ExportProgress) {
|
||||
/// print('Progress: ${result.totalEvents} events exported');
|
||||
/// } else if (result is ExportComplete) {
|
||||
/// print('Export completed with ${result.events.length} events');
|
||||
/// } else if (result is ExportError) {
|
||||
/// print('Export failed: ${result.error}');
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// [from] Optional start date to filter events. If null, exports from the beginning.
|
||||
/// [until] Optional end date to filter events. If null, exports up to the latest event.
|
||||
/// [filter] Optional function to filter events. Return true to include the event.
|
||||
/// [requestHistoryCount] Optional. The number of events to request from the server at once.
|
||||
///
|
||||
/// Returns a [Stream] of [ExportResult] which can be:
|
||||
/// - [ExportProgress]: Provides progress updates during export
|
||||
/// - [ExportComplete]: Contains the final list of exported events
|
||||
/// - [ExportError]: Contains error information if export fails
|
||||
Stream<ExportResult> export({
|
||||
DateTime? from,
|
||||
DateTime? until,
|
||||
bool Function(Event)? filter,
|
||||
int requestHistoryCount = 500,
|
||||
}) async* {
|
||||
final eventsToExport = <Event>[];
|
||||
var utdEventsCount = 0;
|
||||
var mediaEventsCount = 0;
|
||||
final users = <String>{};
|
||||
|
||||
try {
|
||||
yield ExportProgress(
|
||||
source: ExportSource.timeline,
|
||||
totalEvents: 0,
|
||||
utdEvents: 0,
|
||||
mediaEvents: 0,
|
||||
users: 0,
|
||||
);
|
||||
|
||||
void exportEvent(Event event) {
|
||||
eventsToExport.add(event);
|
||||
|
||||
if (event.type == EventTypes.Encrypted &&
|
||||
event.messageType == MessageTypes.BadEncrypted) {
|
||||
utdEventsCount++;
|
||||
} else if (event.type == EventTypes.Message &&
|
||||
{
|
||||
MessageTypes.Sticker,
|
||||
MessageTypes.Image,
|
||||
MessageTypes.Audio,
|
||||
MessageTypes.Video,
|
||||
MessageTypes.File,
|
||||
}.contains(event.messageType)) {
|
||||
mediaEventsCount++;
|
||||
}
|
||||
users.add(event.senderId);
|
||||
}
|
||||
|
||||
// From the timeline
|
||||
if (until == null || events.last.originServerTs.isBefore(until)) {
|
||||
for (final event in events) {
|
||||
if (from != null && event.originServerTs.isBefore(from)) break;
|
||||
if (until != null && event.originServerTs.isAfter(until)) continue;
|
||||
if (filter != null && !filter(event)) continue;
|
||||
exportEvent(event);
|
||||
}
|
||||
}
|
||||
yield ExportProgress(
|
||||
source: ExportSource.timeline,
|
||||
totalEvents: eventsToExport.length,
|
||||
utdEvents: utdEventsCount,
|
||||
mediaEvents: mediaEventsCount,
|
||||
users: users.length,
|
||||
);
|
||||
|
||||
if (from != null && events.last.originServerTs.isBefore(from)) {
|
||||
yield ExportComplete(
|
||||
events: eventsToExport,
|
||||
totalEvents: eventsToExport.length,
|
||||
utdEvents: utdEventsCount,
|
||||
mediaEvents: mediaEventsCount,
|
||||
users: users.length,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// From the database
|
||||
final eventsFromStore = await room.client.database
|
||||
?.getEventList(room, start: events.length) ??
|
||||
[];
|
||||
if (eventsFromStore.isNotEmpty) {
|
||||
if (until == null ||
|
||||
eventsFromStore.last.originServerTs.isBefore(until)) {
|
||||
for (final event in eventsFromStore) {
|
||||
if (from != null && event.originServerTs.isBefore(from)) break;
|
||||
if (until != null && event.originServerTs.isAfter(until)) continue;
|
||||
if (filter != null && !filter(event)) continue;
|
||||
exportEvent(event);
|
||||
}
|
||||
}
|
||||
yield ExportProgress(
|
||||
source: ExportSource.database,
|
||||
totalEvents: eventsToExport.length,
|
||||
utdEvents: utdEventsCount,
|
||||
mediaEvents: mediaEventsCount,
|
||||
users: users.length,
|
||||
);
|
||||
|
||||
if (from != null &&
|
||||
eventsFromStore.last.originServerTs.isBefore(from)) {
|
||||
yield ExportComplete(
|
||||
events: eventsToExport,
|
||||
totalEvents: eventsToExport.length,
|
||||
utdEvents: utdEventsCount,
|
||||
mediaEvents: mediaEventsCount,
|
||||
users: users.length,
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// From the server
|
||||
var prevBatch = room.prev_batch;
|
||||
final encryption = room.client.encryption;
|
||||
do {
|
||||
if (prevBatch == null) break;
|
||||
try {
|
||||
final resp = await room.client.getRoomEvents(
|
||||
room.id,
|
||||
Direction.b,
|
||||
from: prevBatch,
|
||||
limit: requestHistoryCount,
|
||||
filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
|
||||
);
|
||||
if (resp.chunk.isEmpty) break;
|
||||
|
||||
for (final matrixEvent in resp.chunk) {
|
||||
var event = Event.fromMatrixEvent(matrixEvent, room);
|
||||
if (event.type == EventTypes.Encrypted && encryption != null) {
|
||||
event = await encryption.decryptRoomEvent(event);
|
||||
if (event.type == EventTypes.Encrypted &&
|
||||
event.messageType == MessageTypes.BadEncrypted &&
|
||||
event.content['can_request_session'] == true) {
|
||||
// Await requestKey() here to ensure decrypted message bodies
|
||||
await event.requestKey().catchError((_) {});
|
||||
}
|
||||
}
|
||||
if (from != null && event.originServerTs.isBefore(from)) break;
|
||||
if (until != null && event.originServerTs.isAfter(until)) continue;
|
||||
if (filter != null && !filter(event)) continue;
|
||||
exportEvent(event);
|
||||
}
|
||||
yield ExportProgress(
|
||||
source: ExportSource.server,
|
||||
totalEvents: eventsToExport.length,
|
||||
utdEvents: utdEventsCount,
|
||||
mediaEvents: mediaEventsCount,
|
||||
users: users.length,
|
||||
);
|
||||
|
||||
prevBatch = resp.end;
|
||||
if (resp.chunk.length < requestHistoryCount) break;
|
||||
|
||||
if (from != null && resp.chunk.last.originServerTs.isBefore(from)) {
|
||||
break;
|
||||
}
|
||||
} on MatrixException catch (e) {
|
||||
// We have no permission anymore to request the history, so we stop here
|
||||
// and return the events we have so far
|
||||
if (e.error == MatrixError.M_FORBIDDEN) {
|
||||
break;
|
||||
}
|
||||
// If it's not a forbidden error, we yield an [ExportError]
|
||||
rethrow;
|
||||
}
|
||||
} while (true);
|
||||
|
||||
yield ExportComplete(
|
||||
events: eventsToExport,
|
||||
totalEvents: eventsToExport.length,
|
||||
utdEvents: utdEventsCount,
|
||||
mediaEvents: mediaEventsCount,
|
||||
users: users.length,
|
||||
);
|
||||
} catch (e) {
|
||||
yield ExportError(
|
||||
error: e.toString(),
|
||||
totalEvents: eventsToExport.length,
|
||||
utdEvents: utdEventsCount,
|
||||
mediaEvents: mediaEventsCount,
|
||||
users: users.length,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Base class for export results
|
||||
sealed class ExportResult {
|
||||
/// Total events count
|
||||
final int totalEvents;
|
||||
|
||||
/// Unable-to-decrypt events count
|
||||
final int utdEvents;
|
||||
|
||||
/// Media events count
|
||||
final int mediaEvents;
|
||||
|
||||
/// Users count
|
||||
final int users;
|
||||
|
||||
ExportResult({
|
||||
required this.totalEvents,
|
||||
required this.utdEvents,
|
||||
required this.mediaEvents,
|
||||
required this.users,
|
||||
});
|
||||
}
|
||||
|
||||
enum ExportSource {
|
||||
timeline,
|
||||
database,
|
||||
server,
|
||||
}
|
||||
|
||||
/// Represents progress during export
|
||||
final class ExportProgress extends ExportResult {
|
||||
/// Export source
|
||||
final ExportSource source;
|
||||
|
||||
ExportProgress({
|
||||
required this.source,
|
||||
required super.totalEvents,
|
||||
required super.utdEvents,
|
||||
required super.mediaEvents,
|
||||
required super.users,
|
||||
});
|
||||
}
|
||||
|
||||
/// Represents successful completion with exported events
|
||||
final class ExportComplete extends ExportResult {
|
||||
final List<Event> events;
|
||||
ExportComplete({
|
||||
required this.events,
|
||||
required super.totalEvents,
|
||||
required super.utdEvents,
|
||||
required super.mediaEvents,
|
||||
required super.users,
|
||||
});
|
||||
}
|
||||
|
||||
/// Represents an error during export
|
||||
final class ExportError extends ExportResult {
|
||||
final String error;
|
||||
ExportError({
|
||||
required this.error,
|
||||
required super.totalEvents,
|
||||
required super.utdEvents,
|
||||
required super.mediaEvents,
|
||||
required super.users,
|
||||
});
|
||||
}
|
||||
|
|
@ -0,0 +1,352 @@
|
|||
import 'package:test/test.dart';
|
||||
|
||||
import 'package:matrix/matrix.dart';
|
||||
import 'package:matrix/src/models/timeline_chunk.dart';
|
||||
|
||||
// Mock implementations
|
||||
|
||||
// MockClient: Simulates Matrix server responses for `getRoomEvents` calls that
|
||||
// are used by the `TimelineExportExtension.export` method.
|
||||
// This also allows testing error scenarios by throwing exceptions on demand.
|
||||
class MockClient extends Client {
|
||||
List<Event> serverEvents, dbEvents;
|
||||
final bool throwError;
|
||||
|
||||
MockClient(
|
||||
super.name, {
|
||||
this.serverEvents = const [],
|
||||
this.dbEvents = const [],
|
||||
this.throwError = false,
|
||||
});
|
||||
|
||||
@override
|
||||
Future<GetRoomEventsResponse> getRoomEvents(
|
||||
String roomId,
|
||||
Direction direction, {
|
||||
String? from,
|
||||
String? to,
|
||||
int? limit,
|
||||
String? filter,
|
||||
}) async {
|
||||
if (throwError) {
|
||||
throw MatrixException.fromJson({'errcode': 'M_FORBIDDEN'});
|
||||
}
|
||||
|
||||
final chunk = serverEvents
|
||||
.skip(int.parse(from ?? '0'))
|
||||
.take(limit ?? serverEvents.length)
|
||||
.toList();
|
||||
return GetRoomEventsResponse(
|
||||
chunk: chunk,
|
||||
start: from ?? '0',
|
||||
end: chunk.isEmpty
|
||||
? '0'
|
||||
: (serverEvents.indexOf(chunk.last) + 1).toString(),
|
||||
state: [],
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
DatabaseApi? get database => MockDatabase(dbEvents);
|
||||
}
|
||||
|
||||
// MockDatabase: Simulates database access for the `TimelineExportExtension.export`
|
||||
// method.
|
||||
class MockDatabase implements DatabaseApi {
|
||||
final List<Event> dbEvents;
|
||||
|
||||
MockDatabase(this.dbEvents);
|
||||
|
||||
@override
|
||||
Future<List<Event>> getEventList(
|
||||
Room room, {
|
||||
int start = 0,
|
||||
int? limit,
|
||||
bool onlySending = false,
|
||||
}) async {
|
||||
if (start >= dbEvents.length) return [];
|
||||
return dbEvents.skip(start).take(limit ?? 50).toList();
|
||||
}
|
||||
|
||||
@override
|
||||
dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation);
|
||||
}
|
||||
|
||||
// Test helpers
|
||||
Event createTestEvent({
|
||||
required String eventId,
|
||||
required String type,
|
||||
required String msgtype,
|
||||
required DateTime timestamp,
|
||||
required Room room,
|
||||
}) {
|
||||
return Event(
|
||||
eventId: eventId,
|
||||
type: type,
|
||||
content: {
|
||||
'msgtype': msgtype,
|
||||
'body': 'Test message $eventId',
|
||||
},
|
||||
senderId: '@user:example.com',
|
||||
originServerTs: timestamp,
|
||||
room: room,
|
||||
status: EventStatus.synced,
|
||||
);
|
||||
}
|
||||
|
||||
List<Event> createMockEvents({
|
||||
required int count,
|
||||
required DateTime startTime,
|
||||
required Room room,
|
||||
}) {
|
||||
return List.generate(
|
||||
count,
|
||||
(i) => createTestEvent(
|
||||
eventId: 'event$i',
|
||||
type: i % 3 == 0 ? EventTypes.Message : EventTypes.Encrypted,
|
||||
msgtype: i % 3 == 0 ? MessageTypes.Text : MessageTypes.BadEncrypted,
|
||||
timestamp: startTime.subtract(Duration(hours: i)),
|
||||
room: room,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
void main() {
|
||||
group('TimelineExportExtension', () {
|
||||
late DateTime now;
|
||||
late MockClient client;
|
||||
late Room room;
|
||||
late Timeline timeline;
|
||||
|
||||
setUp(() {
|
||||
now = DateTime.now();
|
||||
client = MockClient('testclient');
|
||||
room = Room(id: '!testroom:example.com', client: client);
|
||||
timeline = Timeline(room: room, chunk: TimelineChunk(events: []));
|
||||
});
|
||||
|
||||
group('basic export functionality', () {
|
||||
late List<Event> mockEvents;
|
||||
|
||||
setUp(() {
|
||||
mockEvents = createMockEvents(count: 20, startTime: now, room: room);
|
||||
|
||||
// Set up initial state
|
||||
timeline.events.addAll(mockEvents.take(5));
|
||||
client.dbEvents = mockEvents.take(10).toList();
|
||||
client.serverEvents = mockEvents;
|
||||
room.prev_batch = '10';
|
||||
});
|
||||
|
||||
test('exports events from all sources in correct order', () async {
|
||||
final results = <ExportResult>[];
|
||||
await for (final result in timeline.export()) {
|
||||
results.add(result);
|
||||
}
|
||||
|
||||
expect(results.whereType<ExportProgress>().length, greaterThan(1));
|
||||
expect(results.first, isA<ExportProgress>());
|
||||
expect(results.last, isA<ExportComplete>());
|
||||
|
||||
final complete = results.last as ExportComplete;
|
||||
expect(complete.events.length, mockEvents.length);
|
||||
expect(
|
||||
complete.events.map((e) => e.eventId).toSet(),
|
||||
mockEvents.map((e) => e.eventId).toSet(),
|
||||
);
|
||||
|
||||
// Verify events are in chronological order
|
||||
for (int i = 1; i < complete.events.length; i++) {
|
||||
expect(
|
||||
complete.events[i].originServerTs
|
||||
.isBefore(complete.events[i - 1].originServerTs) ||
|
||||
complete.events[i].originServerTs
|
||||
.isAtSameMomentAs(complete.events[i - 1].originServerTs),
|
||||
isTrue,
|
||||
reason: 'Events should be in reverse chronological order',
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
test('filters events by date range correctly', () async {
|
||||
final from = now.subtract(const Duration(hours: 8, seconds: 1));
|
||||
final until = now.subtract(const Duration(hours: 3, seconds: 1));
|
||||
|
||||
final results = <ExportResult>[];
|
||||
await for (final result in timeline.export(from: from, until: until)) {
|
||||
results.add(result);
|
||||
}
|
||||
|
||||
final complete = results.last as ExportComplete;
|
||||
|
||||
expect(
|
||||
complete.events.every(
|
||||
(e) =>
|
||||
e.originServerTs.isAfter(from) &&
|
||||
e.originServerTs.isBefore(until),
|
||||
),
|
||||
isTrue,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
group('pagination handling', () {
|
||||
test('handles server pagination with large event sets', () async {
|
||||
final manyServerEvents = createMockEvents(
|
||||
count: 150,
|
||||
startTime: now.subtract(const Duration(hours: 10)),
|
||||
room: room,
|
||||
);
|
||||
|
||||
client = MockClient('testclient', serverEvents: manyServerEvents);
|
||||
room = Room(id: '!testroom:example.com', client: client);
|
||||
timeline = Timeline(
|
||||
room: room,
|
||||
chunk: TimelineChunk(events: manyServerEvents.take(10).toList()),
|
||||
);
|
||||
room.prev_batch = '10';
|
||||
|
||||
final results = <ExportResult>[];
|
||||
var serverProgressUpdates = 0;
|
||||
var lastTotalEvents = 0;
|
||||
|
||||
await for (final result in timeline.export(requestHistoryCount: 100)) {
|
||||
results.add(result);
|
||||
if (result is ExportProgress &&
|
||||
result.source == ExportSource.server) {
|
||||
serverProgressUpdates++;
|
||||
expect(result.totalEvents, greaterThanOrEqualTo(lastTotalEvents));
|
||||
lastTotalEvents = result.totalEvents;
|
||||
}
|
||||
}
|
||||
|
||||
final complete = results.last as ExportComplete;
|
||||
expect(complete.events.length, 150);
|
||||
expect(serverProgressUpdates, equals(2));
|
||||
expect(
|
||||
complete.events.map((e) => e.eventId).toSet(),
|
||||
manyServerEvents.map((e) => e.eventId).toSet(),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
group('error handling', () {
|
||||
test('continues export when server returns error', () async {
|
||||
client = MockClient('testclient', throwError: true);
|
||||
room = Room(id: '!testroom:example.com', client: client);
|
||||
final initialEvents =
|
||||
createMockEvents(count: 5, startTime: now, room: room);
|
||||
client.dbEvents = initialEvents;
|
||||
client.serverEvents = initialEvents;
|
||||
timeline = Timeline(
|
||||
room: room,
|
||||
chunk: TimelineChunk(events: initialEvents),
|
||||
);
|
||||
room.prev_batch = '5';
|
||||
|
||||
final results = <ExportResult>[];
|
||||
|
||||
await for (final result in timeline.export()) {
|
||||
results.add(result);
|
||||
|
||||
if (result is ExportProgress) {
|
||||
switch (result.source) {
|
||||
case ExportSource.timeline:
|
||||
expect(
|
||||
result.totalEvents == 0 || result.totalEvents == 5,
|
||||
isTrue,
|
||||
);
|
||||
break;
|
||||
case ExportSource.database:
|
||||
break;
|
||||
case ExportSource.server:
|
||||
// Should not see server progress due to error
|
||||
fail(
|
||||
'Should not receive server progress updates when server throws error',
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final complete = results.last as ExportComplete;
|
||||
expect(complete.events.length, 5);
|
||||
expect(
|
||||
complete.events.map((e) => e.eventId).toSet(),
|
||||
initialEvents.map((e) => e.eventId).toSet(),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
group('event type counting', () {
|
||||
test('correctly counts media and UTD events', () async {
|
||||
final mixedEvents = [
|
||||
createTestEvent(
|
||||
eventId: 'image1',
|
||||
type: EventTypes.Message,
|
||||
msgtype: MessageTypes.Image,
|
||||
timestamp: now,
|
||||
room: room,
|
||||
),
|
||||
createTestEvent(
|
||||
eventId: 'video1',
|
||||
type: EventTypes.Message,
|
||||
msgtype: MessageTypes.Video,
|
||||
timestamp: now,
|
||||
room: room,
|
||||
),
|
||||
createTestEvent(
|
||||
eventId: 'text1',
|
||||
type: EventTypes.Message,
|
||||
msgtype: MessageTypes.Text,
|
||||
timestamp: now,
|
||||
room: room,
|
||||
),
|
||||
createTestEvent(
|
||||
eventId: 'utd1',
|
||||
type: EventTypes.Encrypted,
|
||||
msgtype: MessageTypes.BadEncrypted,
|
||||
timestamp: now,
|
||||
room: room,
|
||||
),
|
||||
];
|
||||
|
||||
client = MockClient('testclient', serverEvents: mixedEvents);
|
||||
room = Room(id: '!testroom:example.com', client: client);
|
||||
timeline = Timeline(room: room, chunk: TimelineChunk(events: []));
|
||||
room.prev_batch = '0';
|
||||
|
||||
final results = <ExportResult>[];
|
||||
await for (final result in timeline.export()) {
|
||||
results.add(result);
|
||||
}
|
||||
|
||||
final complete = results.last as ExportComplete;
|
||||
expect(complete.mediaEvents, 2);
|
||||
expect(complete.utdEvents, 1);
|
||||
expect(complete.events.length, mixedEvents.length);
|
||||
|
||||
final mediaEvents = complete.events.where(
|
||||
(e) =>
|
||||
e.type == EventTypes.Message &&
|
||||
(e.messageType == MessageTypes.Image ||
|
||||
e.messageType == MessageTypes.Video),
|
||||
);
|
||||
expect(mediaEvents.length, 2);
|
||||
|
||||
final utdEvents = complete.events.where(
|
||||
(e) =>
|
||||
e.type == EventTypes.Encrypted &&
|
||||
e.messageType == MessageTypes.BadEncrypted,
|
||||
);
|
||||
expect(utdEvents.length, 1);
|
||||
|
||||
final textEvents = complete.events.where(
|
||||
(e) =>
|
||||
e.type == EventTypes.Message &&
|
||||
e.messageType == MessageTypes.Text,
|
||||
);
|
||||
expect(textEvents.length, 1);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
Loading…
Reference in New Issue