Merge pull request #1956 from famedly/karthi/register-listener-callback

fix: BREAKING! missed initial updates for stream listener callbacks in P2P & mesh calls
This commit is contained in:
td 2024-12-17 14:06:03 +01:00 committed by GitHub
commit 9ae238403e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 127 additions and 34 deletions

View File

@ -119,14 +119,25 @@ class MyVoipApp implements WebRTCDelegate {
VideoRenderer createRenderer() => RTCVideoRenderer(); VideoRenderer createRenderer() => RTCVideoRenderer();
@override @override
void playRingtone(){ Future<void> playRingtone() async {
// play ringtone // play ringtone
} }
void stopRingtone() { Future<void> stopRingtone() async {
// stop ringtone // stop ringtone
} }
void handleNewCall(CallSession session) { Future<void> registerListeners(CallSession session) async {
// register all listeners here
session.onCallStreamsChanged.stream.listen((CallStateChange event) async {});
session.onCallReplaced.stream.listen((CallStateChange event) async {});
session.onCallHangupNotifierForGroupCalls.stream.listen((CallStateChange event) async {});
session.onCallStateChanged.stream.listen((CallStateChange event) async {});
session.onCallEventChanged.stream.listen((CallStateChange event) async {});
session.onStreamAdd.stream.listen((CallStateChange event) async {});
session.onStreamRemoved.stream.listen((CallStateChange event) async {});
}
Future<void> handleNewCall(CallSession session) async {
// handle new call incoming or outgoing // handle new call incoming or outgoing
switch(session.direction) { switch(session.direction) {
case CallDirection.kIncoming: case CallDirection.kIncoming:
@ -138,7 +149,7 @@ class MyVoipApp implements WebRTCDelegate {
} }
} }
void handleCallEnded(CallSession session) { Future<void> handleCallEnded(CallSession session) async {
// handle call ended by local or remote // handle call ended by local or remote
} }
} }
@ -170,7 +181,7 @@ newCall.onCallStateChanged.stream.listen((state) {
/// Then you can pop up the incoming call window at MyVoipApp.handleNewCall. /// Then you can pop up the incoming call window at MyVoipApp.handleNewCall.
class MyVoipApp implements WebRTCDelegate { class MyVoipApp implements WebRTCDelegate {
... ...
void handleNewCall(CallSession session) { Future<void> handleNewCall(CallSession session) async {
switch(session.direction) { switch(session.direction) {
case CallDirection.kOutgoing: case CallDirection.kOutgoing:
// show outgoing call window // show outgoing call window
@ -185,13 +196,13 @@ newCall.hangup();
### 4.Answer a incoming call ### 4.Answer a incoming call
When a new incoming call comes in, handleNewCall will be called, and the answering interface can pop up at this time, and use `onCallStateChanged` to listen to the call state. When a new incoming call comes in, registerListeners will be called right before handleNewCall is called, and the answering interface can pop up at this time, and use `onCallStateChanged` to listen to the call state.
The incoming call window need display `answer` and `reject` buttons, by calling `newCall.answer();` or `newCall.reject();` to decide whether to connect the call. The incoming call window need display `answer` and `reject` buttons, by calling `newCall.answer();` or `newCall.reject();` to decide whether to connect the call.
```dart ```dart
... ...
void handleNewCall(CallSession newCall) { Future<void> registerListeners(CallSession newCall) async {
switch(newCall.direction) { switch(newCall.direction) {
case CallDirection.kIncoming: case CallDirection.kIncoming:
/// show incoming call window /// show incoming call window

View File

@ -20,7 +20,11 @@ class MeshBackend extends CallBackend {
/// participant:volume /// participant:volume
final Map<CallParticipant, double> _audioLevelsMap = {}; final Map<CallParticipant, double> _audioLevelsMap = {};
StreamSubscription<CallSession>? _callSubscription; /// The stream is used to prepare for incoming peer calls like registering listeners
StreamSubscription<CallSession>? _callSetupSubscription;
/// The stream is used to signal the start of an incoming peer call
StreamSubscription<CallSession>? _callStartSubscription;
Timer? _activeSpeakerLoopTimeout; Timer? _activeSpeakerLoopTimeout;
@ -109,14 +113,32 @@ class MeshBackend extends CallBackend {
); );
} }
/// Register listeners for a peer call to use for the group calls, that is
/// needed before even call is added to `_callSessions`.
/// We do this here for onStreamAdd and onStreamRemoved to make sure we don't
/// miss any events that happen before the call is completely started.
void _registerListenersBeforeCallAdd(CallSession call) {
call.onStreamAdd.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamAdd.add(stream);
}
});
call.onStreamRemoved.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamRemoved.add(stream);
}
});
}
Future<void> _addCall(GroupCallSession groupCall, CallSession call) async { Future<void> _addCall(GroupCallSession groupCall, CallSession call) async {
_callSessions.add(call); _callSessions.add(call);
await _initCall(groupCall, call); _initCall(groupCall, call);
groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged); groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged);
} }
/// init a peer call from group calls. /// init a peer call from group calls.
Future<void> _initCall(GroupCallSession groupCall, CallSession call) async { void _initCall(GroupCallSession groupCall, CallSession call) {
if (call.remoteUserId == null) { if (call.remoteUserId == null) {
throw MatrixSDKVoipException( throw MatrixSDKVoipException(
'Cannot init call without proper invitee user and device Id', 'Cannot init call without proper invitee user and device Id',
@ -141,18 +163,6 @@ class MeshBackend extends CallBackend {
call.onCallHangupNotifierForGroupCalls.stream.listen((event) async { call.onCallHangupNotifierForGroupCalls.stream.listen((event) async {
await _onCallHangup(groupCall, call); await _onCallHangup(groupCall, call);
}); });
call.onStreamAdd.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamAdd.add(stream);
}
});
call.onStreamRemoved.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamRemoved.add(stream);
}
});
} }
Future<void> _replaceCall( Future<void> _replaceCall(
@ -171,7 +181,8 @@ class MeshBackend extends CallBackend {
_callSessions.add(replacementCall); _callSessions.add(replacementCall);
await _disposeCall(groupCall, existingCall, CallErrorCode.replaced); await _disposeCall(groupCall, existingCall, CallErrorCode.replaced);
await _initCall(groupCall, replacementCall); _registerListenersBeforeCallAdd(replacementCall);
_initCall(groupCall, replacementCall);
groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged); groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged);
} }
@ -657,7 +668,49 @@ class MeshBackend extends CallBackend {
return; return;
} }
Future<void> _onIncomingCall( void _onIncomingCallInMeshSetup(
GroupCallSession groupCall,
CallSession newCall,
) {
// The incoming calls may be for another room, which we will ignore.
if (newCall.room.id != groupCall.room.id) return;
if (newCall.state != CallState.kRinging) {
Logs().v(
'[_onIncomingCallInMeshSetup] Incoming call no longer in ringing state. Ignoring.',
);
return;
}
if (newCall.groupCallId == null ||
newCall.groupCallId != groupCall.groupCallId) {
Logs().v(
'[_onIncomingCallInMeshSetup] Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call',
);
return;
}
final existingCall = _getCallForParticipant(
groupCall,
CallParticipant(
groupCall.voip,
userId: newCall.remoteUserId!,
deviceId: newCall.remoteDeviceId,
),
);
// if it's an existing call, `_registerListenersForCall` will be called in
// `_replaceCall` that is used in `_onIncomingCallStart`.
if (existingCall != null) return;
Logs().v(
'[_onIncomingCallInMeshSetup] GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}',
);
_registerListenersBeforeCallAdd(newCall);
}
Future<void> _onIncomingCallInMeshStart(
GroupCallSession groupCall, GroupCallSession groupCall,
CallSession newCall, CallSession newCall,
) async { ) async {
@ -667,14 +720,16 @@ class MeshBackend extends CallBackend {
} }
if (newCall.state != CallState.kRinging) { if (newCall.state != CallState.kRinging) {
Logs().w('Incoming call no longer in ringing state. Ignoring.'); Logs().v(
'[_onIncomingCallInMeshStart] Incoming call no longer in ringing state. Ignoring.',
);
return; return;
} }
if (newCall.groupCallId == null || if (newCall.groupCallId == null ||
newCall.groupCallId != groupCall.groupCallId) { newCall.groupCallId != groupCall.groupCallId) {
Logs().v( Logs().v(
'Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call', '[_onIncomingCallInMeshStart] Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call',
); );
await newCall.reject(); await newCall.reject();
return; return;
@ -694,7 +749,7 @@ class MeshBackend extends CallBackend {
} }
Logs().v( Logs().v(
'GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}', '[_onIncomingCallInMeshStart] GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}',
); );
// Check if the user calling has an existing call and use this call instead. // Check if the user calling has an existing call and use this call instead.
@ -800,7 +855,8 @@ class MeshBackend extends CallBackend {
_activeSpeaker = null; _activeSpeaker = null;
_activeSpeakerLoopTimeout?.cancel(); _activeSpeakerLoopTimeout?.cancel();
await _callSubscription?.cancel(); await _callSetupSubscription?.cancel();
await _callStartSubscription?.cancel();
} }
@override @override
@ -826,11 +882,16 @@ class MeshBackend extends CallBackend {
GroupCallSession groupCall, GroupCallSession groupCall,
) async { ) async {
for (final call in _callSessions) { for (final call in _callSessions) {
await _onIncomingCall(groupCall, call); _onIncomingCallInMeshSetup(groupCall, call);
await _onIncomingCallInMeshStart(groupCall, call);
} }
_callSubscription = groupCall.voip.onIncomingCall.stream.listen( _callSetupSubscription = groupCall.voip.onIncomingCallSetup.stream.listen(
(newCall) => _onIncomingCall(groupCall, newCall), (newCall) => _onIncomingCallInMeshSetup(groupCall, newCall),
);
_callStartSubscription = groupCall.voip.onIncomingCallStart.stream.listen(
(newCall) => _onIncomingCallInMeshStart(groupCall, newCall),
); );
_onActiveSpeakerLoop(groupCall); _onActiveSpeakerLoop(groupCall);
@ -883,6 +944,8 @@ class MeshBackend extends CallBackend {
// party id set to when answered // party id set to when answered
newCall.remoteSessionId = mem.membershipId; newCall.remoteSessionId = mem.membershipId;
_registerListenersBeforeCallAdd(newCall);
await newCall.placeCallWithStreams( await newCall.placeCallWithStreams(
_getLocalStreams(), _getLocalStreams(),
requestScreenSharing: mem.feeds?.any( requestScreenSharing: mem.feeds?.any(

View File

@ -11,6 +11,7 @@ abstract class WebRTCDelegate {
]); ]);
Future<void> playRingtone(); Future<void> playRingtone();
Future<void> stopRingtone(); Future<void> stopRingtone();
Future<void> registerListeners(CallSession session);
Future<void> handleNewCall(CallSession session); Future<void> handleNewCall(CallSession session);
Future<void> handleCallEnded(CallSession session); Future<void> handleCallEnded(CallSession session);
Future<void> handleMissedCall(CallSession session); Future<void> handleMissedCall(CallSession session);

View File

@ -36,7 +36,13 @@ class VoIP {
Map<VoipId, GroupCallSession> get groupCalls => _groupCalls; Map<VoipId, GroupCallSession> get groupCalls => _groupCalls;
final Map<VoipId, GroupCallSession> _groupCalls = {}; final Map<VoipId, GroupCallSession> _groupCalls = {};
final CachedStreamController<CallSession> onIncomingCall = /// The stream is used to prepare for incoming peer calls in a mesh call
/// For example, registering listeners
final CachedStreamController<CallSession> onIncomingCallSetup =
CachedStreamController();
/// The stream is used to signal the start of an incoming peer call in a mesh call
final CachedStreamController<CallSession> onIncomingCallStart =
CachedStreamController(); CachedStreamController();
VoipId? currentCID; VoipId? currentCID;
@ -479,6 +485,12 @@ class VoIP {
// by terminate. // by terminate.
currentCID = VoipId(roomId: room.id, callId: callId); currentCID = VoipId(roomId: room.id, callId: callId);
if (confId == null) {
await delegate.registerListeners(newCall);
} else {
onIncomingCallSetup.add(newCall);
}
await newCall.initWithInvite( await newCall.initWithInvite(
callType, callType,
offer, offer,
@ -493,8 +505,7 @@ class VoIP {
} }
if (confId != null) { if (confId != null) {
// the stream is used to monitor incoming peer calls in a mesh call onIncomingCallStart.add(newCall);
onIncomingCall.add(newCall);
} }
} }
@ -768,6 +779,8 @@ class VoIP {
newCall.remoteUserId = userId; newCall.remoteUserId = userId;
newCall.remoteDeviceId = deviceId; newCall.remoteDeviceId = deviceId;
await delegate.registerListeners(newCall);
currentCID = VoipId(roomId: roomId, callId: callId); currentCID = VoipId(roomId: roomId, callId: callId);
await newCall.initOutboundCall(type).then((_) { await newCall.initOutboundCall(type).then((_) {
delegate.handleNewCall(newCall); delegate.handleNewCall(newCall);

View File

@ -15,6 +15,11 @@ class MockWebRTCDelegate implements WebRTCDelegate {
]) async => ]) async =>
MockRTCPeerConnection(); MockRTCPeerConnection();
@override
Future<void> registerListeners(CallSession session) async {
Logs().i('registerListeners called in MockWebRTCDelegate');
}
@override @override
Future<void> handleCallEnded(CallSession session) async { Future<void> handleCallEnded(CallSession session) async {
Logs().i('handleCallEnded called in MockWebRTCDelegate'); Logs().i('handleCallEnded called in MockWebRTCDelegate');