From 9628095ac94c09dc5f01f15197b3fd4a76da3b5a Mon Sep 17 00:00:00 2001 From: Christian Pauly Date: Thu, 30 Jun 2022 09:22:53 +0200 Subject: [PATCH] feat: Implement CachedStreamController This makes it possible to access the last value of a stream at any time. --- lib/src/client.dart | 103 +++++++++----------- lib/src/room.dart | 7 +- lib/src/utils/cached_stream_controller.dart | 26 +++++ lib/src/voip/call.dart | 33 ++++--- lib/src/voip/group_call.dart | 21 ++-- lib/src/voip/voip.dart | 6 +- test/client_test.dart | 4 +- 7 files changed, 109 insertions(+), 91 deletions(-) create mode 100644 lib/src/utils/cached_stream_controller.dart diff --git a/lib/src/client.dart b/lib/src/client.dart index def88969..e007ddb0 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -22,6 +22,7 @@ import 'dart:core'; import 'dart:typed_data'; import 'package:http/http.dart' as http; +import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:matrix/src/utils/run_in_root.dart'; import 'package:matrix/src/utils/sync_update_item_count.dart'; import 'package:mime/mime.dart'; @@ -176,7 +177,6 @@ class Client extends MatrixApi { roomPreviewLastEvents = roomPreviewLastEvents ??= {}, supportedLoginTypes = supportedLoginTypes ?? {AuthenticationTypes.password}, - __loginState = LoginState.loggedOut, verificationMethods = verificationMethods ?? {}, super( httpClient: @@ -228,13 +228,9 @@ class Client extends MatrixApi { String? _groupCallSessionId; /// Returns the current login state. - LoginState get loginState => __loginState; - LoginState __loginState; - - set _loginState(LoginState state) { - __loginState = state; - onLoginStateChanged.add(state); - } + @Deprecated('Use [onLoginStateChanged.value] instead') + LoginState get loginState => + onLoginStateChanged.value ?? LoginState.loggedOut; bool isLogged() => accessToken != null; @@ -952,97 +948,96 @@ class Client extends MatrixApi { /// the app receives a new synchronization, this event is called for every signal /// to update the GUI. For example, for a new message, it is called: /// onRoomEvent( "m.room.message", "!chat_id:server.com", "timeline", {sender: "@bob:server.com", body: "Hello world"} ) - final StreamController onEvent = StreamController.broadcast(); + final CachedStreamController onEvent = CachedStreamController(); /// The onToDeviceEvent is called when there comes a new to device event. It is /// already decrypted if necessary. - final StreamController onToDeviceEvent = - StreamController.broadcast(); + final CachedStreamController onToDeviceEvent = + CachedStreamController(); /// Called when the login state e.g. user gets logged out. - final StreamController onLoginStateChanged = - StreamController.broadcast(); + final CachedStreamController onLoginStateChanged = + CachedStreamController(); /// Called when the local cache is reset - final StreamController onCacheCleared = StreamController.broadcast(); + final CachedStreamController onCacheCleared = CachedStreamController(); /// Encryption errors are coming here. - final StreamController onEncryptionError = - StreamController.broadcast(); - - /// This is called once, when the first sync has been processed. - final StreamController onFirstSync = StreamController.broadcast(); + final CachedStreamController onEncryptionError = + CachedStreamController(); /// When a new sync response is coming in, this gives the complete payload. - final StreamController onSync = StreamController.broadcast(); + final CachedStreamController onSync = CachedStreamController(); /// This gives the current status of the synchronization - final StreamController onSyncStatus = - StreamController.broadcast(); + final CachedStreamController onSyncStatus = + CachedStreamController(); /// Callback will be called on presences. @Deprecated( 'Deprecated, use onPresenceChanged instead which has a timestamp.') - final StreamController onPresence = StreamController.broadcast(); + final CachedStreamController onPresence = CachedStreamController(); /// Callback will be called on presence updates. - final StreamController onPresenceChanged = - StreamController.broadcast(); + final CachedStreamController onPresenceChanged = + CachedStreamController(); /// Callback will be called on account data updates. - final StreamController onAccountData = - StreamController.broadcast(); + final CachedStreamController onAccountData = + CachedStreamController(); /// Will be called on call invites. - final StreamController onCallInvite = StreamController.broadcast(); + final CachedStreamController onCallInvite = CachedStreamController(); /// Will be called on call hangups. - final StreamController onCallHangup = StreamController.broadcast(); + final CachedStreamController onCallHangup = CachedStreamController(); /// Will be called on call candidates. - final StreamController onCallCandidates = StreamController.broadcast(); + final CachedStreamController onCallCandidates = + CachedStreamController(); /// Will be called on call answers. - final StreamController onCallAnswer = StreamController.broadcast(); + final CachedStreamController onCallAnswer = CachedStreamController(); /// Will be called on call replaces. - final StreamController onCallReplaces = StreamController.broadcast(); + final CachedStreamController onCallReplaces = CachedStreamController(); /// Will be called on select answers. - final StreamController onCallSelectAnswer = - StreamController.broadcast(); + final CachedStreamController onCallSelectAnswer = + CachedStreamController(); /// Will be called on rejects. - final StreamController onCallReject = StreamController.broadcast(); + final CachedStreamController onCallReject = CachedStreamController(); /// Will be called on negotiates. - final StreamController onCallNegotiate = StreamController.broadcast(); + final CachedStreamController onCallNegotiate = + CachedStreamController(); /// Will be called on Asserted Identity received. - final StreamController onAssertedIdentityReceived = - StreamController.broadcast(); + final CachedStreamController onAssertedIdentityReceived = + CachedStreamController(); /// Will be called on SDPStream Metadata changed. - final StreamController onSDPStreamMetadataChangedReceived = - StreamController.broadcast(); + final CachedStreamController onSDPStreamMetadataChangedReceived = + CachedStreamController(); /// Will be called when another device is requesting session keys for a room. - final StreamController onRoomKeyRequest = - StreamController.broadcast(); + final CachedStreamController onRoomKeyRequest = + CachedStreamController(); /// Will be called when another device is requesting verification with this device. - final StreamController onKeyVerificationRequest = - StreamController.broadcast(); + final CachedStreamController onKeyVerificationRequest = + CachedStreamController(); /// When the library calls an endpoint that needs UIA the `UiaRequest` is passed down this screen. /// The client can open a UIA prompt based on this. - final StreamController onUiaRequest = - StreamController.broadcast(); + final CachedStreamController onUiaRequest = + CachedStreamController(); - final StreamController onGroupCallRequest = - StreamController.broadcast(); + final CachedStreamController onGroupCallRequest = + CachedStreamController(); - final StreamController onGroupMember = StreamController.broadcast(); + final CachedStreamController onGroupMember = CachedStreamController(); /// How long should the app wait until it retrys the synchronisation after /// an error? @@ -1317,7 +1312,7 @@ class Client extends MatrixApi { // we aren't logged in encryption?.dispose(); encryption = null; - _loginState = LoginState.loggedOut; + onLoginStateChanged.add(LoginState.loggedOut); Logs().i('User is not logged in.'); _initLock = false; return; @@ -1376,7 +1371,7 @@ class Client extends MatrixApi { } } _initLock = false; - _loginState = LoginState.loggedIn; + onLoginStateChanged.add(LoginState.loggedIn); Logs().i( 'Successfully connected as ${userID?.localpart} with ${homeserver.toString()}', ); @@ -1428,7 +1423,7 @@ class Client extends MatrixApi { await databaseDestroyer(this); _database = null; } - _loginState = LoginState.loggedOut; + onLoginStateChanged.add(LoginState.loggedOut); } bool _backgroundSync = true; @@ -1530,10 +1525,6 @@ class Client extends MatrixApi { await _handleSync(syncResp, direction: Direction.f); } if (_disposed || _aborted) return; - if (prevBatch == null) { - onFirstSync.add(true); - prevBatch = syncResp.nextBatch; - } prevBatch = syncResp.nextBatch; // ignore: unawaited_futures database?.deleteOldFiles( diff --git a/lib/src/room.dart b/lib/src/room.dart index 3e243f0a..b2de11fa 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -25,6 +25,7 @@ import 'package:html_unescape/html_unescape.dart'; import 'package:matrix/src/models/timeline_chunk.dart'; import 'package:matrix/src/utils/crypto/crypto.dart'; import 'package:matrix/src/utils/file_send_request_credentials.dart'; +import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:matrix/src/utils/space_child.dart'; import '../matrix.dart'; @@ -202,12 +203,12 @@ class Room { /// If something changes, this callback will be triggered. Will return the /// room id. - final StreamController onUpdate = StreamController.broadcast(); + final CachedStreamController onUpdate = CachedStreamController(); /// If there is a new session key received, this will be triggered with /// the session ID. - final StreamController onSessionKeyReceived = - StreamController.broadcast(); + final CachedStreamController onSessionKeyReceived = + CachedStreamController(); /// The name of the room if set by a participant. String get name { diff --git a/lib/src/utils/cached_stream_controller.dart b/lib/src/utils/cached_stream_controller.dart new file mode 100644 index 00000000..3c898409 --- /dev/null +++ b/lib/src/utils/cached_stream_controller.dart @@ -0,0 +1,26 @@ +import 'dart:async'; + +class CachedStreamController { + T? _value; + Object? _lastError; + final StreamController _streamController = StreamController.broadcast(); + + CachedStreamController([T? value]) : _value = value; + + T? get value => _value; + Object? get lastError => _lastError; + Stream get stream => _streamController.stream; + + void add(T value) { + _value = value; + _streamController.add(value); + } + + void addError(Object error, [StackTrace? stackTrace]) { + _lastError = value; + _streamController.addError(error, stackTrace); + } + + Future close() => _streamController.close(); + bool get isClosed => _streamController.isClosed; +} diff --git a/lib/src/voip/call.dart b/lib/src/voip/call.dart index a1f9970c..4c2ed37e 100644 --- a/lib/src/voip/call.dart +++ b/lib/src/voip/call.dart @@ -19,6 +19,7 @@ import 'dart:async'; import 'dart:core'; +import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:webrtc_interface/webrtc_interface.dart'; import '../../matrix.dart'; @@ -52,8 +53,8 @@ class WrappedMediaStream { String get title => '$displayName:$purpose:a[$audioMuted]:v[$videoMuted]'; bool stopped = false; - final StreamController onMuteStateChanged = - StreamController.broadcast(); + final CachedStreamController onMuteStateChanged = + CachedStreamController(); void Function(MediaStream stream)? onNewStream; @@ -320,26 +321,26 @@ class CallSession { bool waitForLocalAVStream = false; int toDeviceSeq = 0; - final StreamController onCallStreamsChanged = - StreamController.broadcast(); + final CachedStreamController onCallStreamsChanged = + CachedStreamController(); - final StreamController onCallReplaced = - StreamController.broadcast(); + final CachedStreamController onCallReplaced = + CachedStreamController(); - final StreamController onCallHangup = - StreamController.broadcast(); + final CachedStreamController onCallHangup = + CachedStreamController(); - final StreamController onCallStateChanged = - StreamController.broadcast(); + final CachedStreamController onCallStateChanged = + CachedStreamController(); - final StreamController onCallEventChanged = - StreamController.broadcast(); + final CachedStreamController onCallEventChanged = + CachedStreamController(); - final StreamController onStreamAdd = - StreamController.broadcast(); + final CachedStreamController onStreamAdd = + CachedStreamController(); - final StreamController onStreamRemoved = - StreamController.broadcast(); + final CachedStreamController onStreamRemoved = + CachedStreamController(); SDPStreamMetadata? remoteSDPStreamMetadata; List usermediaSenders = []; diff --git a/lib/src/voip/group_call.dart b/lib/src/voip/group_call.dart index 7666e542..3646509c 100644 --- a/lib/src/voip/group_call.dart +++ b/lib/src/voip/group_call.dart @@ -20,6 +20,7 @@ import 'dart:async'; import 'dart:core'; import 'package:matrix/matrix.dart'; +import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:webrtc_interface/webrtc_interface.dart'; /// TODO(@duan): Need to add voice activity detection mechanism @@ -198,20 +199,20 @@ class GroupCall { Timer? retryCallLoopTimeout; Map retryCallCounts = {}; - final StreamController onGroupCallFeedsChanged = - StreamController.broadcast(); + final CachedStreamController onGroupCallFeedsChanged = + CachedStreamController(); - final StreamController onGroupCallState = - StreamController.broadcast(); + final CachedStreamController onGroupCallState = + CachedStreamController(); - final StreamController onGroupCallEvent = - StreamController.broadcast(); + final CachedStreamController onGroupCallEvent = + CachedStreamController(); - final StreamController onStreamAdd = - StreamController.broadcast(); + final CachedStreamController onStreamAdd = + CachedStreamController(); - final StreamController onStreamRemoved = - StreamController.broadcast(); + final CachedStreamController onStreamRemoved = + CachedStreamController(); GroupCall({ String? groupCallId, diff --git a/lib/src/voip/voip.dart b/lib/src/voip/voip.dart index 90eb6f47..5db996a8 100644 --- a/lib/src/voip/voip.dart +++ b/lib/src/voip/voip.dart @@ -1,6 +1,6 @@ -import 'dart:async'; import 'dart:core'; +import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:webrtc_interface/webrtc_interface.dart'; import 'package:sdp_transform/sdp_transform.dart' as sdp_transform; @@ -29,8 +29,8 @@ class VoIP { TurnServerCredentials? _turnServerCredentials; Map calls = {}; Map groupCalls = {}; - final StreamController onIncomingCall = - StreamController.broadcast(); + final CachedStreamController onIncomingCall = + CachedStreamController(); String? currentCID; String? currentGroupCID; String? get localPartyId => client.deviceID; diff --git a/test/client_test.dart b/test/client_test.dart index e61a9777..38019cdc 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -94,7 +94,6 @@ void main() { expect(available, true); final loginStateFuture = matrix.onLoginStateChanged.stream.first; - final firstSyncFuture = matrix.onFirstSync.stream.first; final syncFuture = matrix.onSync.stream.first; await matrix.init( @@ -109,11 +108,10 @@ void main() { await Future.delayed(Duration(milliseconds: 50)); final loginState = await loginStateFuture; - final firstSync = await firstSyncFuture; final sync = await syncFuture; expect(loginState, LoginState.loggedIn); - expect(firstSync, true); + expect(matrix.onSync.value != null, true); expect(matrix.encryptionEnabled, olmEnabled); if (olmEnabled) { expect(matrix.identityKey, identityKey);