feat: Implement sync status stream

This new sync status stream gives the current status of the sync to make it possible
to display in the UI where the sync currently hangs and
what the progress is while updating 1000 rooms. So the app can display a
progress bar.
This commit is contained in:
Christian Pauly 2021-06-20 15:59:59 +02:00
parent c734dc5b9f
commit 768b0623da
1 changed files with 37 additions and 8 deletions

View File

@ -730,9 +730,6 @@ class Client extends MatrixApi {
/// Called when the local cache is reset
final StreamController<bool> onCacheCleared = StreamController.broadcast();
/// Synchronization errors are coming here.
final StreamController<SdkError> onSyncError = StreamController.broadcast();
/// Encryption errors are coming here.
final StreamController<SdkError> onEncryptionError =
StreamController.broadcast();
@ -743,6 +740,10 @@ class Client extends MatrixApi {
/// When a new sync response is coming in, this gives the complete payload.
final StreamController<SyncUpdate> onSync = StreamController.broadcast();
/// This gives the current status of the synchronization
final StreamController<SyncStatusUpdate> onSyncStatus =
StreamController.broadcast();
/// Callback will be called on presences.
final StreamController<Presence> onPresence = StreamController.broadcast();
@ -1043,7 +1044,9 @@ class Client extends MatrixApi {
return null;
});
_currentSyncId = syncRequest.hashCode;
onSyncStatus.add(SyncStatusUpdate(SyncStatus.waitingForResponse));
final syncResp = await syncRequest;
onSyncStatus.add(SyncStatusUpdate(SyncStatus.processing));
if (syncResp == null) throw syncError ?? 'Unknown sync error';
if (_currentSyncId != syncRequest.hashCode) {
Logs()
@ -1058,6 +1061,7 @@ class Client extends MatrixApi {
}
});
await _currentTransaction;
onSyncStatus.add(SyncStatusUpdate(SyncStatus.cleaningUp));
} else {
await handleSync(syncResp);
}
@ -1068,7 +1072,8 @@ class Client extends MatrixApi {
_sortRooms();
}
prevBatch = syncResp.nextBatch;
await database?.deleteOldFiles(
// ignore: unawaited_futures
database?.deleteOldFiles(
DateTime.now().subtract(Duration(days: 30)).millisecondsSinceEpoch);
await _updateUserDeviceKeys();
if (encryptionEnabled) {
@ -1081,20 +1086,24 @@ class Client extends MatrixApi {
} catch (_) {} // we want to dispose any errors this throws
_retryDelay = Future.value();
onSyncStatus.add(SyncStatusUpdate(SyncStatus.finished));
} on MatrixException catch (e, s) {
onSyncError.add(SdkError(exception: e, stackTrace: s));
onSyncStatus.add(SyncStatusUpdate(SyncStatus.error,
error: SdkError(exception: e, stackTrace: s)));
if (e.error == MatrixError.M_UNKNOWN_TOKEN) {
Logs().w('The user has been logged out!');
await clear();
}
} on MatrixConnectionException catch (e, s) {
Logs().w('Synchronization connection failed');
onSyncError.add(SdkError(exception: e, stackTrace: s));
onSyncStatus.add(SyncStatusUpdate(SyncStatus.error,
error: SdkError(exception: e, stackTrace: s)));
} catch (e, s) {
if (!isLogged() || _disposed || _aborted) return;
Logs().e('Error during processing events', e, s);
onSyncError.add(SdkError(
exception: e is Exception ? e : Exception(e), stackTrace: s));
onSyncStatus.add(SyncStatusUpdate(SyncStatus.error,
error: SdkError(
exception: e is Exception ? e : Exception(e), stackTrace: s)));
}
}
@ -1185,7 +1194,12 @@ class Client extends MatrixApi {
Future<void> _handleRooms(
Map<String, SyncRoomUpdate> rooms, Membership membership,
{bool sortAtTheEnd = false}) async {
var handledRooms = 0;
for (final entry in rooms.entries) {
onSyncStatus.add(SyncStatusUpdate(
SyncStatus.processing,
progress: ++handledRooms / rooms.length,
));
final id = entry.key;
final room = entry.value;
@ -2181,6 +2195,21 @@ class SdkError {
SdkError({this.exception, this.stackTrace});
}
class SyncStatusUpdate {
final SyncStatus status;
final SdkError error;
final double progress;
const SyncStatusUpdate(this.status, {this.error, this.progress});
}
enum SyncStatus {
waitingForResponse,
processing,
cleaningUp,
finished,
error,
}
class BadServerVersionsException implements Exception {
final Set<String> serverVersions, supportedVersions;
BadServerVersionsException(this.serverVersions, this.supportedVersions);