feat: implement expire_ts in group calls and provide methods to terminate stale calls

This commit is contained in:
td 2022-10-07 15:08:38 +05:30
parent ddb7cf8e30
commit da4174c91a
No known key found for this signature in database
GPG Key ID: F6D9E9BF14C7D103
2 changed files with 180 additions and 47 deletions

View File

@ -141,12 +141,17 @@ class IGroupCallRoomMemberCallState {
} }
class IGroupCallRoomMemberState { class IGroupCallRoomMemberState {
final DEFAULT_EXPIRE_TS = Duration(seconds: 300);
late int expireTs;
List<IGroupCallRoomMemberCallState> calls = []; List<IGroupCallRoomMemberCallState> calls = [];
IGroupCallRoomMemberState.fromJson(Map<String, dynamic> json) { IGroupCallRoomMemberState.fromJson(MatrixEvent event) {
if (json['m.calls'] != null) { if (event.content['m.calls'] != null) {
(json['m.calls'] as List<dynamic>).forEach( (event.content['m.calls'] as List<dynamic>).forEach(
(call) => calls.add(IGroupCallRoomMemberCallState.formJson(call))); (call) => calls.add(IGroupCallRoomMemberCallState.formJson(call)));
} }
expireTs = event.content['m.expires_ts'] ??
event.originServerTs.add(DEFAULT_EXPIRE_TS).millisecondsSinceEpoch;
} }
} }
@ -168,6 +173,10 @@ abstract class ICallHandlers {
class GroupCall { class GroupCall {
// Config // Config
static const updateExpireTsTimerDuration = Duration(seconds: 15);
static const expireTsBumpDuration = Duration(seconds: 45);
var activeSpeakerInterval = 1000; var activeSpeakerInterval = 1000;
var retryCallInterval = 5000; var retryCallInterval = 5000;
var participantTimeout = 1000 * 15; var participantTimeout = 1000 * 15;
@ -197,6 +206,8 @@ class GroupCall {
Timer? activeSpeakerLoopTimeout; Timer? activeSpeakerLoopTimeout;
Timer? resendMemberStateEventTimer;
final CachedStreamController<GroupCall> onGroupCallFeedsChanged = final CachedStreamController<GroupCall> onGroupCallFeedsChanged =
CachedStreamController(); CachedStreamController();
@ -255,30 +266,32 @@ class GroupCall {
return room.unsafeGetUserFromMemoryOrFallback(client.userID!); return room.unsafeGetUserFromMemoryOrFallback(client.userID!);
} }
Future<List<MatrixEvent>> getStateEventsList(String type) async { bool callMemberStateIsExpired(MatrixEvent event) {
final callMemberState = IGroupCallRoomMemberState.fromJson(event);
return callMemberState.expireTs < DateTime.now().millisecondsSinceEpoch;
}
Event? getMemberStateEvent(String userId) {
final event = room.getState(EventTypes.GroupCallMemberPrefix, userId);
if (event != null) {
return callMemberStateIsExpired(event) ? null : event;
}
return null;
}
Future<List<MatrixEvent>> getAllMemberStateEvents() async {
final List<MatrixEvent> events = [];
final roomStates = await client.getRoomState(room.id); final roomStates = await client.getRoomState(room.id);
roomStates.sort((a, b) => a.originServerTs.compareTo(b.originServerTs)); roomStates.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
final events = <MatrixEvent>[]; roomStates.forEach((value) {
roomStates.forEach((evt) { if (value.type == EventTypes.GroupCallMemberPrefix &&
if (evt.type == type) { !callMemberStateIsExpired(value)) {
events.add(evt); events.add(value);
} }
}); });
return events; return events;
} }
Future<MatrixEvent?> getStateEvent(String type, [String? userId]) async {
final roomStates = await client.getRoomState(room.id);
roomStates.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
MatrixEvent? event;
roomStates.forEach((value) {
if (value.type == type && (userId == null || value.senderId == userId)) {
event = value;
}
});
return event;
}
void setState(String newState) { void setState(String newState) {
state = newState; state = newState;
onGroupCallEvent.add(GroupCallEvent.GroupCallStateChanged); onGroupCallEvent.add(GroupCallEvent.GroupCallStateChanged);
@ -420,8 +433,7 @@ class GroupCall {
// Set up participants for the members currently in the room. // Set up participants for the members currently in the room.
// Other members will be picked up by the RoomState.members event. // Other members will be picked up by the RoomState.members event.
final memberStateEvents = final memberStateEvents = await getAllMemberStateEvents();
await getStateEventsList(EventTypes.GroupCallMemberPrefix);
memberStateEvents.forEach((stateEvent) { memberStateEvents.forEach((stateEvent) {
onMemberStateChanged(stateEvent); onMemberStateChanged(stateEvent);
@ -470,33 +482,34 @@ class GroupCall {
setState(GroupCallState.LocalCallFeedUninitialized); setState(GroupCallState.LocalCallFeedUninitialized);
voip.currentGroupCID = null; voip.currentGroupCID = null;
voip.delegate.handleGroupCallEnded(this); voip.delegate.handleGroupCallEnded(this);
final justLeftGroupCall = voip.groupCalls.tryGet<GroupCall>(room.id);
// terminate group call if empty
if (justLeftGroupCall != null &&
justLeftGroupCall.intent != 'm.room' &&
justLeftGroupCall.participants.isEmpty &&
room.canCreateGroupCall) {
terminate();
} else {
Logs().d(
'[VOIP] left group call but cannot terminate. participants: ${participants.length}, pl: ${room.canCreateGroupCall}');
}
} }
/// terminate group call. /// terminate group call.
void terminate({bool emitStateEvent = true}) async { void terminate({bool emitStateEvent = true}) async {
final existingStateEvent =
room.getState(EventTypes.GroupCallPrefix, groupCallId);
dispose(); dispose();
participants = []; participants = [];
//TODO(duan): remove this
/* client.removeListener(
'RoomState.members',
onMemberStateChanged,
);
*/
voip.groupCalls.remove(room.id); voip.groupCalls.remove(room.id);
voip.groupCalls.remove(groupCallId); voip.groupCalls.remove(groupCallId);
if (emitStateEvent) { if (emitStateEvent) {
final existingStateEvent = await getStateEvent(
EventTypes.GroupCallPrefix,
groupCallId,
);
await client.setRoomStateWithKey( await client.setRoomStateWithKey(
room.id, EventTypes.GroupCallPrefix, groupCallId, { room.id, EventTypes.GroupCallPrefix, groupCallId, {
...existingStateEvent!.content, ...existingStateEvent!.content,
'm.terminated': GroupCallTerminationReason.CallEnded, 'm.terminated': GroupCallTerminationReason.CallEnded,
}); });
Logs().d('[VOIP] Group call $groupCallId was killed');
} }
voip.delegate.handleGroupCallEnded(this); voip.delegate.handleGroupCallEnded(this);
setState(GroupCallState.Ended); setState(GroupCallState.Ended);
@ -665,9 +678,9 @@ class GroupCall {
newCall.answerWithStreams(getLocalStreams()); newCall.answerWithStreams(getLocalStreams());
} }
Future<void> sendMemberStateEvent() { Future<void> sendMemberStateEvent() async {
final deviceId = client.deviceID; final deviceId = client.deviceID;
return updateMemberCallState(IGroupCallRoomMemberCallState.formJson({ await updateMemberCallState(IGroupCallRoomMemberCallState.formJson({
'm.call_id': groupCallId, 'm.call_id': groupCallId,
'm.devices': [ 'm.devices': [
{ {
@ -683,9 +696,23 @@ class GroupCall {
], ],
// TODO 'm.foci' // TODO 'm.foci'
})); }));
if (resendMemberStateEventTimer != null) {
resendMemberStateEventTimer!.cancel();
}
resendMemberStateEventTimer =
Timer.periodic(updateExpireTsTimerDuration, ((timer) async {
Logs().d('updating member event with timer');
return await sendMemberStateEvent();
}));
} }
Future<void> removeMemberStateEvent() { Future<void> removeMemberStateEvent() {
if (resendMemberStateEventTimer != null) {
Logs().d('resend member event timer cancelled');
resendMemberStateEventTimer!.cancel();
resendMemberStateEventTimer = null;
}
return updateMemberCallState(); return updateMemberCallState();
} }
@ -693,13 +720,13 @@ class GroupCall {
[IGroupCallRoomMemberCallState? memberCallState]) async { [IGroupCallRoomMemberCallState? memberCallState]) async {
final localUserId = client.userID; final localUserId = client.userID;
final currentStateEvent = final currentStateEvent = getMemberStateEvent(localUserId!);
await getStateEvent(EventTypes.GroupCallMemberPrefix, localUserId);
final eventContent = currentStateEvent?.content ?? {}; final eventContent = currentStateEvent?.content ?? {};
var calls = <IGroupCallRoomMemberCallState>[]; var calls = <IGroupCallRoomMemberCallState>[];
if (currentStateEvent != null) { if (currentStateEvent != null) {
final memberStateEvent = IGroupCallRoomMemberState.fromJson(eventContent); final memberStateEvent =
IGroupCallRoomMemberState.fromJson(currentStateEvent);
calls = memberStateEvent.calls; calls = memberStateEvent.calls;
final existingCallIndex = final existingCallIndex =
calls.indexWhere((element) => groupCallId == element.call_id); calls.indexWhere((element) => groupCallId == element.call_id);
@ -716,13 +743,15 @@ class GroupCall {
} else if (memberCallState != null) { } else if (memberCallState != null) {
calls.add(memberCallState); calls.add(memberCallState);
} }
final content = { final content = {
'm.calls': calls.map((e) => e.toJson()).toList(), 'm.calls': calls.map((e) => e.toJson()).toList(),
'm.expires_ts': calls.isEmpty
? eventContent.tryGet('m.expires_ts')
: DateTime.now().add(expireTsBumpDuration).millisecondsSinceEpoch
}; };
await client.setRoomStateWithKey( await client.setRoomStateWithKey(
room.id, EventTypes.GroupCallMemberPrefix, localUserId!, content); room.id, EventTypes.GroupCallMemberPrefix, localUserId, content);
} }
void onMemberStateChanged(MatrixEvent event) async { void onMemberStateChanged(MatrixEvent event) async {
@ -737,7 +766,7 @@ class GroupCall {
return; return;
} }
final callsState = IGroupCallRoomMemberState.fromJson(event.content); final callsState = IGroupCallRoomMemberState.fromJson(event);
if (callsState is List) { if (callsState is List) {
Logs() Logs()
@ -843,14 +872,12 @@ class GroupCall {
} }
Future<IGroupCallRoomMemberDevice?> getDeviceForMember(String userId) async { Future<IGroupCallRoomMemberDevice?> getDeviceForMember(String userId) async {
final memberStateEvent = final memberStateEvent = getMemberStateEvent(userId);
await getStateEvent(EventTypes.GroupCallMemberPrefix, userId);
if (memberStateEvent == null) { if (memberStateEvent == null) {
return null; return null;
} }
final memberState = final memberState = IGroupCallRoomMemberState.fromJson(memberStateEvent);
IGroupCallRoomMemberState.fromJson(memberStateEvent.content);
final memberGroupCallState = final memberGroupCallState =
memberState.calls.where(((call) => call.call_id == groupCallId)); memberState.calls.where(((call) => call.call_id == groupCallId));

View File

@ -757,4 +757,110 @@ class VoIP {
groupCall.onMemberStateChanged(event); groupCall.onMemberStateChanged(event);
} }
} }
bool hasActiveCall(Room room) {
final groupCallStates =
room.states.tryGetMap<dynamic, Event>(EventTypes.GroupCallPrefix);
if (groupCallStates != null) {
groupCallStates.values
.toList()
.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
final latestGroupCallEvent = groupCallStates.values.last;
if (!latestGroupCallEvent.content.containsKey('m.terminated')) {
return true;
}
}
return false;
}
Future sendGroupCallTerminateEvent(Room room, String groupCallId) async {
try {
Logs().d('[VOIP] running sendterminator');
final existingStateEvent =
room.getState(EventTypes.GroupCallPrefix, groupCallId);
if (existingStateEvent == null) {
Logs().e('could not find group call with id $groupCallId');
return;
}
await client.setRoomStateWithKey(
room.id, EventTypes.GroupCallPrefix, groupCallId, {
...existingStateEvent.content,
'm.terminated': GroupCallTerminationReason.CallEnded,
});
Logs().d('[VOIP] Group call $groupCallId was killed uwu');
} catch (e) {
Logs().i('killing stale call $groupCallId failed. reason: $e');
}
}
Map<String, Timer> staleGroupCallsTimer = {};
/// stops the stale call checker timer
void stopStaleCallsChecker(String roomId) {
if (staleGroupCallsTimer.tryGet(roomId) != null) {
staleGroupCallsTimer[roomId]!.cancel();
} else {
Logs().w('[VOIP] no stale call checker for room found');
}
}
static const staleCallCheckerDuration = Duration(seconds: 30);
/// checks for stale calls in a room and sends `m.terminated` if all the
/// expires_ts are expired. Call when opening a room
void startStaleCallsChecker(String roomId) async {
staleGroupCallsTimer[roomId] = Timer.periodic(
staleCallCheckerDuration,
(timer) {
final room = client.getRoomById(roomId);
if (room == null) {
Logs().w('[VOIP] stale call checker got incorrect room id');
} else {
Logs().d('checking for stale group calls.');
final copyGroupCallIds =
room.states.tryGetMap<dynamic, Event>(EventTypes.GroupCallPrefix);
if (copyGroupCallIds == null) return;
copyGroupCallIds.forEach(
(groupCallId, groupCallEvent) async {
if (groupCallEvent.content.tryGet('m.intent') == 'm.room') return;
if (!groupCallEvent.content.containsKey('m.terminated')) {
if (groupCallId != null) {
Logs().i(
'found non terminated group call with id $groupCallId');
// call is not empty but check for stale participants (gone offline)
// with expire_ts
final Map<String, int> participants = {};
final callMemberEvents = room.states.tryGetMap<String, Event>(
EventTypes.GroupCallMemberPrefix);
Logs().e(
'callmemeberEvents length ${callMemberEvents?.length}');
if (callMemberEvents != null) {
callMemberEvents.forEach((userId, memberEvent) async {
final callMemberEvent = groupCallEvent.room.getState(
EventTypes.GroupCallMemberPrefix,
userId,
);
if (callMemberEvent != null) {
final event =
IGroupCallRoomMemberState.fromJson(callMemberEvent);
participants[userId] = event.expireTs;
}
});
}
Logs().e(participants.toString());
if (!participants.values.any((expire_ts) =>
expire_ts > DateTime.now().millisecondsSinceEpoch)) {
Logs().i(
'Group call with expired timestamps detected, terminating');
await sendGroupCallTerminateEvent(room, groupCallId);
}
}
}
},
);
}
},
);
}
} }