Merge branch 'td/staleGroupCalls' into 'main'
feat: implement expire_ts in group calls and provide methods to terminate stale calls Closes #332 See merge request famedly/company/frontend/famedlysdk!1137
This commit is contained in:
commit
157b488255
|
|
@ -141,12 +141,17 @@ class IGroupCallRoomMemberCallState {
|
||||||
}
|
}
|
||||||
|
|
||||||
class IGroupCallRoomMemberState {
|
class IGroupCallRoomMemberState {
|
||||||
|
final DEFAULT_EXPIRE_TS = Duration(seconds: 300);
|
||||||
|
late int expireTs;
|
||||||
List<IGroupCallRoomMemberCallState> calls = [];
|
List<IGroupCallRoomMemberCallState> calls = [];
|
||||||
IGroupCallRoomMemberState.fromJson(Map<String, dynamic> json) {
|
IGroupCallRoomMemberState.fromJson(MatrixEvent event) {
|
||||||
if (json['m.calls'] != null) {
|
if (event.content['m.calls'] != null) {
|
||||||
(json['m.calls'] as List<dynamic>).forEach(
|
(event.content['m.calls'] as List<dynamic>).forEach(
|
||||||
(call) => calls.add(IGroupCallRoomMemberCallState.formJson(call)));
|
(call) => calls.add(IGroupCallRoomMemberCallState.formJson(call)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expireTs = event.content['m.expires_ts'] ??
|
||||||
|
event.originServerTs.add(DEFAULT_EXPIRE_TS).millisecondsSinceEpoch;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -168,6 +173,10 @@ abstract class ICallHandlers {
|
||||||
|
|
||||||
class GroupCall {
|
class GroupCall {
|
||||||
// Config
|
// Config
|
||||||
|
|
||||||
|
static const updateExpireTsTimerDuration = Duration(seconds: 15);
|
||||||
|
static const expireTsBumpDuration = Duration(seconds: 45);
|
||||||
|
|
||||||
var activeSpeakerInterval = 1000;
|
var activeSpeakerInterval = 1000;
|
||||||
var retryCallInterval = 5000;
|
var retryCallInterval = 5000;
|
||||||
var participantTimeout = 1000 * 15;
|
var participantTimeout = 1000 * 15;
|
||||||
|
|
@ -197,6 +206,8 @@ class GroupCall {
|
||||||
|
|
||||||
Timer? activeSpeakerLoopTimeout;
|
Timer? activeSpeakerLoopTimeout;
|
||||||
|
|
||||||
|
Timer? resendMemberStateEventTimer;
|
||||||
|
|
||||||
final CachedStreamController<GroupCall> onGroupCallFeedsChanged =
|
final CachedStreamController<GroupCall> onGroupCallFeedsChanged =
|
||||||
CachedStreamController();
|
CachedStreamController();
|
||||||
|
|
||||||
|
|
@ -255,30 +266,32 @@ class GroupCall {
|
||||||
return room.unsafeGetUserFromMemoryOrFallback(client.userID!);
|
return room.unsafeGetUserFromMemoryOrFallback(client.userID!);
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<List<MatrixEvent>> getStateEventsList(String type) async {
|
bool callMemberStateIsExpired(MatrixEvent event) {
|
||||||
|
final callMemberState = IGroupCallRoomMemberState.fromJson(event);
|
||||||
|
return callMemberState.expireTs < DateTime.now().millisecondsSinceEpoch;
|
||||||
|
}
|
||||||
|
|
||||||
|
Event? getMemberStateEvent(String userId) {
|
||||||
|
final event = room.getState(EventTypes.GroupCallMemberPrefix, userId);
|
||||||
|
if (event != null) {
|
||||||
|
return callMemberStateIsExpired(event) ? null : event;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<List<MatrixEvent>> getAllMemberStateEvents() async {
|
||||||
|
final List<MatrixEvent> events = [];
|
||||||
final roomStates = await client.getRoomState(room.id);
|
final roomStates = await client.getRoomState(room.id);
|
||||||
roomStates.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
|
roomStates.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
|
||||||
final events = <MatrixEvent>[];
|
roomStates.forEach((value) {
|
||||||
roomStates.forEach((evt) {
|
if (value.type == EventTypes.GroupCallMemberPrefix &&
|
||||||
if (evt.type == type) {
|
!callMemberStateIsExpired(value)) {
|
||||||
events.add(evt);
|
events.add(value);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<MatrixEvent?> getStateEvent(String type, [String? userId]) async {
|
|
||||||
final roomStates = await client.getRoomState(room.id);
|
|
||||||
roomStates.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
|
|
||||||
MatrixEvent? event;
|
|
||||||
roomStates.forEach((value) {
|
|
||||||
if (value.type == type && (userId == null || value.senderId == userId)) {
|
|
||||||
event = value;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return event;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setState(String newState) {
|
void setState(String newState) {
|
||||||
state = newState;
|
state = newState;
|
||||||
onGroupCallEvent.add(GroupCallEvent.GroupCallStateChanged);
|
onGroupCallEvent.add(GroupCallEvent.GroupCallStateChanged);
|
||||||
|
|
@ -420,8 +433,7 @@ class GroupCall {
|
||||||
// Set up participants for the members currently in the room.
|
// Set up participants for the members currently in the room.
|
||||||
// Other members will be picked up by the RoomState.members event.
|
// Other members will be picked up by the RoomState.members event.
|
||||||
|
|
||||||
final memberStateEvents =
|
final memberStateEvents = await getAllMemberStateEvents();
|
||||||
await getStateEventsList(EventTypes.GroupCallMemberPrefix);
|
|
||||||
|
|
||||||
memberStateEvents.forEach((stateEvent) {
|
memberStateEvents.forEach((stateEvent) {
|
||||||
onMemberStateChanged(stateEvent);
|
onMemberStateChanged(stateEvent);
|
||||||
|
|
@ -470,33 +482,34 @@ class GroupCall {
|
||||||
setState(GroupCallState.LocalCallFeedUninitialized);
|
setState(GroupCallState.LocalCallFeedUninitialized);
|
||||||
voip.currentGroupCID = null;
|
voip.currentGroupCID = null;
|
||||||
voip.delegate.handleGroupCallEnded(this);
|
voip.delegate.handleGroupCallEnded(this);
|
||||||
|
final justLeftGroupCall = voip.groupCalls.tryGet<GroupCall>(room.id);
|
||||||
|
// terminate group call if empty
|
||||||
|
if (justLeftGroupCall != null &&
|
||||||
|
justLeftGroupCall.intent != 'm.room' &&
|
||||||
|
justLeftGroupCall.participants.isEmpty &&
|
||||||
|
room.canCreateGroupCall) {
|
||||||
|
terminate();
|
||||||
|
} else {
|
||||||
|
Logs().d(
|
||||||
|
'[VOIP] left group call but cannot terminate. participants: ${participants.length}, pl: ${room.canCreateGroupCall}');
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// terminate group call.
|
/// terminate group call.
|
||||||
void terminate({bool emitStateEvent = true}) async {
|
void terminate({bool emitStateEvent = true}) async {
|
||||||
|
final existingStateEvent =
|
||||||
|
room.getState(EventTypes.GroupCallPrefix, groupCallId);
|
||||||
dispose();
|
dispose();
|
||||||
|
|
||||||
participants = [];
|
participants = [];
|
||||||
//TODO(duan): remove this
|
|
||||||
/* client.removeListener(
|
|
||||||
'RoomState.members',
|
|
||||||
onMemberStateChanged,
|
|
||||||
);
|
|
||||||
*/
|
|
||||||
voip.groupCalls.remove(room.id);
|
voip.groupCalls.remove(room.id);
|
||||||
voip.groupCalls.remove(groupCallId);
|
voip.groupCalls.remove(groupCallId);
|
||||||
|
|
||||||
if (emitStateEvent) {
|
if (emitStateEvent) {
|
||||||
final existingStateEvent = await getStateEvent(
|
|
||||||
EventTypes.GroupCallPrefix,
|
|
||||||
groupCallId,
|
|
||||||
);
|
|
||||||
|
|
||||||
await client.setRoomStateWithKey(
|
await client.setRoomStateWithKey(
|
||||||
room.id, EventTypes.GroupCallPrefix, groupCallId, {
|
room.id, EventTypes.GroupCallPrefix, groupCallId, {
|
||||||
...existingStateEvent!.content,
|
...existingStateEvent!.content,
|
||||||
'm.terminated': GroupCallTerminationReason.CallEnded,
|
'm.terminated': GroupCallTerminationReason.CallEnded,
|
||||||
});
|
});
|
||||||
|
Logs().d('[VOIP] Group call $groupCallId was killed');
|
||||||
}
|
}
|
||||||
voip.delegate.handleGroupCallEnded(this);
|
voip.delegate.handleGroupCallEnded(this);
|
||||||
setState(GroupCallState.Ended);
|
setState(GroupCallState.Ended);
|
||||||
|
|
@ -665,9 +678,9 @@ class GroupCall {
|
||||||
newCall.answerWithStreams(getLocalStreams());
|
newCall.answerWithStreams(getLocalStreams());
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> sendMemberStateEvent() {
|
Future<void> sendMemberStateEvent() async {
|
||||||
final deviceId = client.deviceID;
|
final deviceId = client.deviceID;
|
||||||
return updateMemberCallState(IGroupCallRoomMemberCallState.formJson({
|
await updateMemberCallState(IGroupCallRoomMemberCallState.formJson({
|
||||||
'm.call_id': groupCallId,
|
'm.call_id': groupCallId,
|
||||||
'm.devices': [
|
'm.devices': [
|
||||||
{
|
{
|
||||||
|
|
@ -683,9 +696,23 @@ class GroupCall {
|
||||||
],
|
],
|
||||||
// TODO 'm.foci'
|
// TODO 'm.foci'
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
if (resendMemberStateEventTimer != null) {
|
||||||
|
resendMemberStateEventTimer!.cancel();
|
||||||
|
}
|
||||||
|
resendMemberStateEventTimer =
|
||||||
|
Timer.periodic(updateExpireTsTimerDuration, ((timer) async {
|
||||||
|
Logs().d('updating member event with timer');
|
||||||
|
return await sendMemberStateEvent();
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> removeMemberStateEvent() {
|
Future<void> removeMemberStateEvent() {
|
||||||
|
if (resendMemberStateEventTimer != null) {
|
||||||
|
Logs().d('resend member event timer cancelled');
|
||||||
|
resendMemberStateEventTimer!.cancel();
|
||||||
|
resendMemberStateEventTimer = null;
|
||||||
|
}
|
||||||
return updateMemberCallState();
|
return updateMemberCallState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -693,13 +720,13 @@ class GroupCall {
|
||||||
[IGroupCallRoomMemberCallState? memberCallState]) async {
|
[IGroupCallRoomMemberCallState? memberCallState]) async {
|
||||||
final localUserId = client.userID;
|
final localUserId = client.userID;
|
||||||
|
|
||||||
final currentStateEvent =
|
final currentStateEvent = getMemberStateEvent(localUserId!);
|
||||||
await getStateEvent(EventTypes.GroupCallMemberPrefix, localUserId);
|
|
||||||
final eventContent = currentStateEvent?.content ?? {};
|
final eventContent = currentStateEvent?.content ?? {};
|
||||||
var calls = <IGroupCallRoomMemberCallState>[];
|
var calls = <IGroupCallRoomMemberCallState>[];
|
||||||
|
|
||||||
if (currentStateEvent != null) {
|
if (currentStateEvent != null) {
|
||||||
final memberStateEvent = IGroupCallRoomMemberState.fromJson(eventContent);
|
final memberStateEvent =
|
||||||
|
IGroupCallRoomMemberState.fromJson(currentStateEvent);
|
||||||
calls = memberStateEvent.calls;
|
calls = memberStateEvent.calls;
|
||||||
final existingCallIndex =
|
final existingCallIndex =
|
||||||
calls.indexWhere((element) => groupCallId == element.call_id);
|
calls.indexWhere((element) => groupCallId == element.call_id);
|
||||||
|
|
@ -716,13 +743,15 @@ class GroupCall {
|
||||||
} else if (memberCallState != null) {
|
} else if (memberCallState != null) {
|
||||||
calls.add(memberCallState);
|
calls.add(memberCallState);
|
||||||
}
|
}
|
||||||
|
|
||||||
final content = {
|
final content = {
|
||||||
'm.calls': calls.map((e) => e.toJson()).toList(),
|
'm.calls': calls.map((e) => e.toJson()).toList(),
|
||||||
|
'm.expires_ts': calls.isEmpty
|
||||||
|
? eventContent.tryGet('m.expires_ts')
|
||||||
|
: DateTime.now().add(expireTsBumpDuration).millisecondsSinceEpoch
|
||||||
};
|
};
|
||||||
|
|
||||||
await client.setRoomStateWithKey(
|
await client.setRoomStateWithKey(
|
||||||
room.id, EventTypes.GroupCallMemberPrefix, localUserId!, content);
|
room.id, EventTypes.GroupCallMemberPrefix, localUserId, content);
|
||||||
}
|
}
|
||||||
|
|
||||||
void onMemberStateChanged(MatrixEvent event) async {
|
void onMemberStateChanged(MatrixEvent event) async {
|
||||||
|
|
@ -737,7 +766,7 @@ class GroupCall {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final callsState = IGroupCallRoomMemberState.fromJson(event.content);
|
final callsState = IGroupCallRoomMemberState.fromJson(event);
|
||||||
|
|
||||||
if (callsState is List) {
|
if (callsState is List) {
|
||||||
Logs()
|
Logs()
|
||||||
|
|
@ -843,14 +872,12 @@ class GroupCall {
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<IGroupCallRoomMemberDevice?> getDeviceForMember(String userId) async {
|
Future<IGroupCallRoomMemberDevice?> getDeviceForMember(String userId) async {
|
||||||
final memberStateEvent =
|
final memberStateEvent = getMemberStateEvent(userId);
|
||||||
await getStateEvent(EventTypes.GroupCallMemberPrefix, userId);
|
|
||||||
if (memberStateEvent == null) {
|
if (memberStateEvent == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final memberState =
|
final memberState = IGroupCallRoomMemberState.fromJson(memberStateEvent);
|
||||||
IGroupCallRoomMemberState.fromJson(memberStateEvent.content);
|
|
||||||
|
|
||||||
final memberGroupCallState =
|
final memberGroupCallState =
|
||||||
memberState.calls.where(((call) => call.call_id == groupCallId));
|
memberState.calls.where(((call) => call.call_id == groupCallId));
|
||||||
|
|
|
||||||
|
|
@ -757,4 +757,110 @@ class VoIP {
|
||||||
groupCall.onMemberStateChanged(event);
|
groupCall.onMemberStateChanged(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool hasActiveCall(Room room) {
|
||||||
|
final groupCallStates =
|
||||||
|
room.states.tryGetMap<dynamic, Event>(EventTypes.GroupCallPrefix);
|
||||||
|
if (groupCallStates != null) {
|
||||||
|
groupCallStates.values
|
||||||
|
.toList()
|
||||||
|
.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
|
||||||
|
final latestGroupCallEvent = groupCallStates.values.last;
|
||||||
|
if (!latestGroupCallEvent.content.containsKey('m.terminated')) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future sendGroupCallTerminateEvent(Room room, String groupCallId) async {
|
||||||
|
try {
|
||||||
|
Logs().d('[VOIP] running sendterminator');
|
||||||
|
final existingStateEvent =
|
||||||
|
room.getState(EventTypes.GroupCallPrefix, groupCallId);
|
||||||
|
if (existingStateEvent == null) {
|
||||||
|
Logs().e('could not find group call with id $groupCallId');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await client.setRoomStateWithKey(
|
||||||
|
room.id, EventTypes.GroupCallPrefix, groupCallId, {
|
||||||
|
...existingStateEvent.content,
|
||||||
|
'm.terminated': GroupCallTerminationReason.CallEnded,
|
||||||
|
});
|
||||||
|
Logs().d('[VOIP] Group call $groupCallId was killed uwu');
|
||||||
|
} catch (e) {
|
||||||
|
Logs().i('killing stale call $groupCallId failed. reason: $e');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Timer> staleGroupCallsTimer = {};
|
||||||
|
|
||||||
|
/// stops the stale call checker timer
|
||||||
|
void stopStaleCallsChecker(String roomId) {
|
||||||
|
if (staleGroupCallsTimer.tryGet(roomId) != null) {
|
||||||
|
staleGroupCallsTimer[roomId]!.cancel();
|
||||||
|
} else {
|
||||||
|
Logs().w('[VOIP] no stale call checker for room found');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static const staleCallCheckerDuration = Duration(seconds: 30);
|
||||||
|
|
||||||
|
/// checks for stale calls in a room and sends `m.terminated` if all the
|
||||||
|
/// expires_ts are expired. Call when opening a room
|
||||||
|
void startStaleCallsChecker(String roomId) async {
|
||||||
|
staleGroupCallsTimer[roomId] = Timer.periodic(
|
||||||
|
staleCallCheckerDuration,
|
||||||
|
(timer) {
|
||||||
|
final room = client.getRoomById(roomId);
|
||||||
|
if (room == null) {
|
||||||
|
Logs().w('[VOIP] stale call checker got incorrect room id');
|
||||||
|
} else {
|
||||||
|
Logs().d('checking for stale group calls.');
|
||||||
|
final copyGroupCallIds =
|
||||||
|
room.states.tryGetMap<dynamic, Event>(EventTypes.GroupCallPrefix);
|
||||||
|
if (copyGroupCallIds == null) return;
|
||||||
|
copyGroupCallIds.forEach(
|
||||||
|
(groupCallId, groupCallEvent) async {
|
||||||
|
if (groupCallEvent.content.tryGet('m.intent') == 'm.room') return;
|
||||||
|
if (!groupCallEvent.content.containsKey('m.terminated')) {
|
||||||
|
if (groupCallId != null) {
|
||||||
|
Logs().i(
|
||||||
|
'found non terminated group call with id $groupCallId');
|
||||||
|
// call is not empty but check for stale participants (gone offline)
|
||||||
|
// with expire_ts
|
||||||
|
final Map<String, int> participants = {};
|
||||||
|
final callMemberEvents = room.states.tryGetMap<String, Event>(
|
||||||
|
EventTypes.GroupCallMemberPrefix);
|
||||||
|
Logs().e(
|
||||||
|
'callmemeberEvents length ${callMemberEvents?.length}');
|
||||||
|
if (callMemberEvents != null) {
|
||||||
|
callMemberEvents.forEach((userId, memberEvent) async {
|
||||||
|
final callMemberEvent = groupCallEvent.room.getState(
|
||||||
|
EventTypes.GroupCallMemberPrefix,
|
||||||
|
userId,
|
||||||
|
);
|
||||||
|
if (callMemberEvent != null) {
|
||||||
|
final event =
|
||||||
|
IGroupCallRoomMemberState.fromJson(callMemberEvent);
|
||||||
|
participants[userId] = event.expireTs;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Logs().e(participants.toString());
|
||||||
|
if (!participants.values.any((expire_ts) =>
|
||||||
|
expire_ts > DateTime.now().millisecondsSinceEpoch)) {
|
||||||
|
Logs().i(
|
||||||
|
'Group call with expired timestamps detected, terminating');
|
||||||
|
await sendGroupCallTerminateEvent(room, groupCallId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue