feat: Implement CachedStreamController

This makes it possible to access the last
value of a stream at any time.
This commit is contained in:
Christian Pauly 2022-06-30 09:22:53 +02:00
parent 248aba1199
commit 9628095ac9
7 changed files with 109 additions and 91 deletions

View File

@ -22,6 +22,7 @@ import 'dart:core';
import 'dart:typed_data';
import 'package:http/http.dart' as http;
import 'package:matrix/src/utils/cached_stream_controller.dart';
import 'package:matrix/src/utils/run_in_root.dart';
import 'package:matrix/src/utils/sync_update_item_count.dart';
import 'package:mime/mime.dart';
@ -176,7 +177,6 @@ class Client extends MatrixApi {
roomPreviewLastEvents = roomPreviewLastEvents ??= {},
supportedLoginTypes =
supportedLoginTypes ?? {AuthenticationTypes.password},
__loginState = LoginState.loggedOut,
verificationMethods = verificationMethods ?? <KeyVerificationMethod>{},
super(
httpClient:
@ -228,13 +228,9 @@ class Client extends MatrixApi {
String? _groupCallSessionId;
/// Returns the current login state.
LoginState get loginState => __loginState;
LoginState __loginState;
set _loginState(LoginState state) {
__loginState = state;
onLoginStateChanged.add(state);
}
@Deprecated('Use [onLoginStateChanged.value] instead')
LoginState get loginState =>
onLoginStateChanged.value ?? LoginState.loggedOut;
bool isLogged() => accessToken != null;
@ -952,97 +948,96 @@ class Client extends MatrixApi {
/// the app receives a new synchronization, this event is called for every signal
/// to update the GUI. For example, for a new message, it is called:
/// onRoomEvent( "m.room.message", "!chat_id:server.com", "timeline", {sender: "@bob:server.com", body: "Hello world"} )
final StreamController<EventUpdate> onEvent = StreamController.broadcast();
final CachedStreamController<EventUpdate> onEvent = CachedStreamController();
/// The onToDeviceEvent is called when there comes a new to device event. It is
/// already decrypted if necessary.
final StreamController<ToDeviceEvent> onToDeviceEvent =
StreamController.broadcast();
final CachedStreamController<ToDeviceEvent> onToDeviceEvent =
CachedStreamController();
/// Called when the login state e.g. user gets logged out.
final StreamController<LoginState> onLoginStateChanged =
StreamController.broadcast();
final CachedStreamController<LoginState> onLoginStateChanged =
CachedStreamController();
/// Called when the local cache is reset
final StreamController<bool> onCacheCleared = StreamController.broadcast();
final CachedStreamController<bool> onCacheCleared = CachedStreamController();
/// Encryption errors are coming here.
final StreamController<SdkError> onEncryptionError =
StreamController.broadcast();
/// This is called once, when the first sync has been processed.
final StreamController<bool> onFirstSync = StreamController.broadcast();
final CachedStreamController<SdkError> onEncryptionError =
CachedStreamController();
/// When a new sync response is coming in, this gives the complete payload.
final StreamController<SyncUpdate> onSync = StreamController.broadcast();
final CachedStreamController<SyncUpdate> onSync = CachedStreamController();
/// This gives the current status of the synchronization
final StreamController<SyncStatusUpdate> onSyncStatus =
StreamController.broadcast();
final CachedStreamController<SyncStatusUpdate> onSyncStatus =
CachedStreamController();
/// Callback will be called on presences.
@Deprecated(
'Deprecated, use onPresenceChanged instead which has a timestamp.')
final StreamController<Presence> onPresence = StreamController.broadcast();
final CachedStreamController<Presence> onPresence = CachedStreamController();
/// Callback will be called on presence updates.
final StreamController<CachedPresence> onPresenceChanged =
StreamController.broadcast();
final CachedStreamController<CachedPresence> onPresenceChanged =
CachedStreamController();
/// Callback will be called on account data updates.
final StreamController<BasicEvent> onAccountData =
StreamController.broadcast();
final CachedStreamController<BasicEvent> onAccountData =
CachedStreamController();
/// Will be called on call invites.
final StreamController<Event> onCallInvite = StreamController.broadcast();
final CachedStreamController<Event> onCallInvite = CachedStreamController();
/// Will be called on call hangups.
final StreamController<Event> onCallHangup = StreamController.broadcast();
final CachedStreamController<Event> onCallHangup = CachedStreamController();
/// Will be called on call candidates.
final StreamController<Event> onCallCandidates = StreamController.broadcast();
final CachedStreamController<Event> onCallCandidates =
CachedStreamController();
/// Will be called on call answers.
final StreamController<Event> onCallAnswer = StreamController.broadcast();
final CachedStreamController<Event> onCallAnswer = CachedStreamController();
/// Will be called on call replaces.
final StreamController<Event> onCallReplaces = StreamController.broadcast();
final CachedStreamController<Event> onCallReplaces = CachedStreamController();
/// Will be called on select answers.
final StreamController<Event> onCallSelectAnswer =
StreamController.broadcast();
final CachedStreamController<Event> onCallSelectAnswer =
CachedStreamController();
/// Will be called on rejects.
final StreamController<Event> onCallReject = StreamController.broadcast();
final CachedStreamController<Event> onCallReject = CachedStreamController();
/// Will be called on negotiates.
final StreamController<Event> onCallNegotiate = StreamController.broadcast();
final CachedStreamController<Event> onCallNegotiate =
CachedStreamController();
/// Will be called on Asserted Identity received.
final StreamController<Event> onAssertedIdentityReceived =
StreamController.broadcast();
final CachedStreamController<Event> onAssertedIdentityReceived =
CachedStreamController();
/// Will be called on SDPStream Metadata changed.
final StreamController<Event> onSDPStreamMetadataChangedReceived =
StreamController.broadcast();
final CachedStreamController<Event> onSDPStreamMetadataChangedReceived =
CachedStreamController();
/// Will be called when another device is requesting session keys for a room.
final StreamController<RoomKeyRequest> onRoomKeyRequest =
StreamController.broadcast();
final CachedStreamController<RoomKeyRequest> onRoomKeyRequest =
CachedStreamController();
/// Will be called when another device is requesting verification with this device.
final StreamController<KeyVerification> onKeyVerificationRequest =
StreamController.broadcast();
final CachedStreamController<KeyVerification> onKeyVerificationRequest =
CachedStreamController();
/// When the library calls an endpoint that needs UIA the `UiaRequest` is passed down this screen.
/// The client can open a UIA prompt based on this.
final StreamController<UiaRequest> onUiaRequest =
StreamController.broadcast();
final CachedStreamController<UiaRequest> onUiaRequest =
CachedStreamController();
final StreamController<Event> onGroupCallRequest =
StreamController.broadcast();
final CachedStreamController<Event> onGroupCallRequest =
CachedStreamController();
final StreamController<Event> onGroupMember = StreamController.broadcast();
final CachedStreamController<Event> onGroupMember = CachedStreamController();
/// How long should the app wait until it retrys the synchronisation after
/// an error?
@ -1317,7 +1312,7 @@ class Client extends MatrixApi {
// we aren't logged in
encryption?.dispose();
encryption = null;
_loginState = LoginState.loggedOut;
onLoginStateChanged.add(LoginState.loggedOut);
Logs().i('User is not logged in.');
_initLock = false;
return;
@ -1376,7 +1371,7 @@ class Client extends MatrixApi {
}
}
_initLock = false;
_loginState = LoginState.loggedIn;
onLoginStateChanged.add(LoginState.loggedIn);
Logs().i(
'Successfully connected as ${userID?.localpart} with ${homeserver.toString()}',
);
@ -1428,7 +1423,7 @@ class Client extends MatrixApi {
await databaseDestroyer(this);
_database = null;
}
_loginState = LoginState.loggedOut;
onLoginStateChanged.add(LoginState.loggedOut);
}
bool _backgroundSync = true;
@ -1530,10 +1525,6 @@ class Client extends MatrixApi {
await _handleSync(syncResp, direction: Direction.f);
}
if (_disposed || _aborted) return;
if (prevBatch == null) {
onFirstSync.add(true);
prevBatch = syncResp.nextBatch;
}
prevBatch = syncResp.nextBatch;
// ignore: unawaited_futures
database?.deleteOldFiles(

View File

@ -25,6 +25,7 @@ import 'package:html_unescape/html_unescape.dart';
import 'package:matrix/src/models/timeline_chunk.dart';
import 'package:matrix/src/utils/crypto/crypto.dart';
import 'package:matrix/src/utils/file_send_request_credentials.dart';
import 'package:matrix/src/utils/cached_stream_controller.dart';
import 'package:matrix/src/utils/space_child.dart';
import '../matrix.dart';
@ -202,12 +203,12 @@ class Room {
/// If something changes, this callback will be triggered. Will return the
/// room id.
final StreamController<String> onUpdate = StreamController.broadcast();
final CachedStreamController<String> onUpdate = CachedStreamController();
/// If there is a new session key received, this will be triggered with
/// the session ID.
final StreamController<String> onSessionKeyReceived =
StreamController.broadcast();
final CachedStreamController<String> onSessionKeyReceived =
CachedStreamController();
/// The name of the room if set by a participant.
String get name {

View File

@ -0,0 +1,26 @@
import 'dart:async';
class CachedStreamController<T> {
T? _value;
Object? _lastError;
final StreamController<T> _streamController = StreamController.broadcast();
CachedStreamController([T? value]) : _value = value;
T? get value => _value;
Object? get lastError => _lastError;
Stream<T> get stream => _streamController.stream;
void add(T value) {
_value = value;
_streamController.add(value);
}
void addError(Object error, [StackTrace? stackTrace]) {
_lastError = value;
_streamController.addError(error, stackTrace);
}
Future close() => _streamController.close();
bool get isClosed => _streamController.isClosed;
}

View File

@ -19,6 +19,7 @@
import 'dart:async';
import 'dart:core';
import 'package:matrix/src/utils/cached_stream_controller.dart';
import 'package:webrtc_interface/webrtc_interface.dart';
import '../../matrix.dart';
@ -52,8 +53,8 @@ class WrappedMediaStream {
String get title => '$displayName:$purpose:a[$audioMuted]:v[$videoMuted]';
bool stopped = false;
final StreamController<WrappedMediaStream> onMuteStateChanged =
StreamController.broadcast();
final CachedStreamController<WrappedMediaStream> onMuteStateChanged =
CachedStreamController();
void Function(MediaStream stream)? onNewStream;
@ -320,26 +321,26 @@ class CallSession {
bool waitForLocalAVStream = false;
int toDeviceSeq = 0;
final StreamController<CallSession> onCallStreamsChanged =
StreamController.broadcast();
final CachedStreamController<CallSession> onCallStreamsChanged =
CachedStreamController();
final StreamController<CallSession> onCallReplaced =
StreamController.broadcast();
final CachedStreamController<CallSession> onCallReplaced =
CachedStreamController();
final StreamController<CallSession> onCallHangup =
StreamController.broadcast();
final CachedStreamController<CallSession> onCallHangup =
CachedStreamController();
final StreamController<CallState> onCallStateChanged =
StreamController.broadcast();
final CachedStreamController<CallState> onCallStateChanged =
CachedStreamController();
final StreamController<CallEvent> onCallEventChanged =
StreamController.broadcast();
final CachedStreamController<CallEvent> onCallEventChanged =
CachedStreamController();
final StreamController<WrappedMediaStream> onStreamAdd =
StreamController.broadcast();
final CachedStreamController<WrappedMediaStream> onStreamAdd =
CachedStreamController();
final StreamController<WrappedMediaStream> onStreamRemoved =
StreamController.broadcast();
final CachedStreamController<WrappedMediaStream> onStreamRemoved =
CachedStreamController();
SDPStreamMetadata? remoteSDPStreamMetadata;
List<RTCRtpSender> usermediaSenders = [];

View File

@ -20,6 +20,7 @@ import 'dart:async';
import 'dart:core';
import 'package:matrix/matrix.dart';
import 'package:matrix/src/utils/cached_stream_controller.dart';
import 'package:webrtc_interface/webrtc_interface.dart';
/// TODO(@duan): Need to add voice activity detection mechanism
@ -198,20 +199,20 @@ class GroupCall {
Timer? retryCallLoopTimeout;
Map<String, num> retryCallCounts = {};
final StreamController<GroupCall> onGroupCallFeedsChanged =
StreamController.broadcast();
final CachedStreamController<GroupCall> onGroupCallFeedsChanged =
CachedStreamController();
final StreamController<GroupCallState> onGroupCallState =
StreamController.broadcast();
final CachedStreamController<GroupCallState> onGroupCallState =
CachedStreamController();
final StreamController<String> onGroupCallEvent =
StreamController.broadcast();
final CachedStreamController<String> onGroupCallEvent =
CachedStreamController();
final StreamController<WrappedMediaStream> onStreamAdd =
StreamController.broadcast();
final CachedStreamController<WrappedMediaStream> onStreamAdd =
CachedStreamController();
final StreamController<WrappedMediaStream> onStreamRemoved =
StreamController.broadcast();
final CachedStreamController<WrappedMediaStream> onStreamRemoved =
CachedStreamController();
GroupCall({
String? groupCallId,

View File

@ -1,6 +1,6 @@
import 'dart:async';
import 'dart:core';
import 'package:matrix/src/utils/cached_stream_controller.dart';
import 'package:webrtc_interface/webrtc_interface.dart';
import 'package:sdp_transform/sdp_transform.dart' as sdp_transform;
@ -29,8 +29,8 @@ class VoIP {
TurnServerCredentials? _turnServerCredentials;
Map<String, CallSession> calls = <String, CallSession>{};
Map<String, GroupCall> groupCalls = <String, GroupCall>{};
final StreamController<CallSession> onIncomingCall =
StreamController.broadcast();
final CachedStreamController<CallSession> onIncomingCall =
CachedStreamController();
String? currentCID;
String? currentGroupCID;
String? get localPartyId => client.deviceID;

View File

@ -94,7 +94,6 @@ void main() {
expect(available, true);
final loginStateFuture = matrix.onLoginStateChanged.stream.first;
final firstSyncFuture = matrix.onFirstSync.stream.first;
final syncFuture = matrix.onSync.stream.first;
await matrix.init(
@ -109,11 +108,10 @@ void main() {
await Future.delayed(Duration(milliseconds: 50));
final loginState = await loginStateFuture;
final firstSync = await firstSyncFuture;
final sync = await syncFuture;
expect(loginState, LoginState.loggedIn);
expect(firstSync, true);
expect(matrix.onSync.value != null, true);
expect(matrix.encryptionEnabled, olmEnabled);
if (olmEnabled) {
expect(matrix.identityKey, identityKey);