Merge branch 'td/ohno' into 'main'

fix: hasActiveGroup call now checks all group calls

See merge request famedly/company/frontend/famedlysdk!1236
This commit is contained in:
Nicolas Werner 2023-02-14 11:26:52 +00:00
commit 0e26e44d8c
9 changed files with 414 additions and 180 deletions

View File

@ -34,6 +34,7 @@ export 'src/voip/voip.dart';
export 'src/voip/voip_content.dart';
export 'src/voip/conn_tester.dart';
export 'src/voip/utils.dart';
export 'src/voip/voip_room_extension.dart';
export 'src/room.dart';
export 'src/timeline.dart';
export 'src/user.dart';

View File

@ -1417,7 +1417,8 @@ class Client extends MatrixApi {
await olm.init();
olm.get_library_version();
encryption = Encryption(client: this);
} catch (_) {
} catch (e) {
Logs().e('Error initializing encryption $e');
await encryption?.dispose();
encryption = null;
}

View File

@ -91,6 +91,9 @@ class Room {
/// Key-Value store for private account data only visible for this user.
Map<String, BasicRoomEvent> roomAccountData = {};
/// stores stale group call checking timers for rooms.
Map<String, Timer> staleGroupCallsTimer = {};
final _sendingQueue = <Completer>[];
Map<String, dynamic> toJson() => {
@ -140,6 +143,7 @@ class Room {
}
}
partial = false;
startStaleCallsChecker(id);
}
/// Returns the [Event] for the given [typeKey] and optional [stateKey].

View File

@ -184,8 +184,6 @@ class GroupCall {
final Room room;
final String intent;
final String type;
final bool dataChannelsEnabled;
final RTCDataChannelInit? dataChannelOptions;
String state = GroupCallState.LocalCallFeedUninitialized;
StreamSubscription<CallSession>? _callSubscription;
final Map<String, double> audioLevelsMap = {};
@ -229,8 +227,6 @@ class GroupCall {
required this.room,
required this.type,
required this.intent,
required this.dataChannelsEnabled,
required this.dataChannelOptions,
}) {
this.groupCallId = groupCallId ?? genCallID();
}
@ -246,16 +242,19 @@ class GroupCall {
{
'm.intent': intent,
'm.type': type,
// TODO: Specify datachannels
'dataChannelsEnabled': dataChannelsEnabled,
'dataChannelOptions': dataChannelOptions?.toMap() ?? {},
'groupCallId': groupCallId,
},
);
return this;
}
bool get terminated =>
room
.getState(EventTypes.GroupCallPrefix, groupCallId)
?.content
.containsKey('m.terminated') ??
false;
String get avatarName =>
getUser().calcDisplayname(mxidLocalPartFallback: false);
@ -268,7 +267,7 @@ class GroupCall {
Event? getMemberStateEvent(String userId) {
final event = room.getState(EventTypes.GroupCallMemberPrefix, userId);
if (event != null) {
return voip.callMemberStateIsExpired(event, groupCallId) ? null : event;
return room.callMemberStateIsExpired(event, groupCallId) ? null : event;
}
return null;
}
@ -279,7 +278,7 @@ class GroupCall {
roomStates.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
roomStates.forEach((value) {
if (value.type == EventTypes.GroupCallMemberPrefix &&
!voip.callMemberStateIsExpired(value, groupCallId)) {
!room.callMemberStateIsExpired(value, groupCallId)) {
events.add(value);
}
});
@ -868,10 +867,6 @@ class GroupCall {
await newCall.placeCallWithStreams(
getLocalStreams(), requestScreenshareFeed);
if (dataChannelsEnabled) {
newCall.createDataChannel('datachannel', dataChannelOptions!);
}
addCall(newCall);
}

View File

@ -1,3 +1,5 @@
import 'dart:async';
import 'package:random_string/random_string.dart';
import 'package:webrtc_interface/webrtc_interface.dart';

View File

@ -1,7 +1,6 @@
import 'dart:async';
import 'dart:core';
import 'package:collection/collection.dart';
import 'package:sdp_transform/sdp_transform.dart' as sdp_transform;
import 'package:webrtc_interface/webrtc_interface.dart';
@ -71,15 +70,17 @@ class VoIP {
client.onAssertedIdentityReceived.stream
.listen((event) => _handleEvent(event, onAssertedIdentityReceived));
client.onRoomState.stream.listen((event) {
if ([
EventTypes.GroupCallPrefix,
EventTypes.GroupCallMemberPrefix,
].contains(event.type)) {
Logs().v('[VOIP] onRoomState: type ${event.toJson()}.');
onRoomStateChanged(event);
}
});
client.onRoomState.stream.listen(
(event) {
if ([
EventTypes.GroupCallPrefix,
EventTypes.GroupCallMemberPrefix,
].contains(event.type)) {
Logs().v('[VOIP] onRoomState: type ${event.toJson()}.');
onRoomStateChanged(event);
}
},
);
client.onToDeviceEvent.stream.listen((event) {
Logs().v('[VOIP] onToDeviceEvent: type ${event.toJson()}.');
@ -134,6 +135,16 @@ class VoIP {
});
delegate.mediaDevices.ondevicechange = _onDeviceChange;
// to populate groupCalls with already present calls
client.rooms.forEach((room) {
if (room.activeGroupCallEvents.isNotEmpty) {
room.activeGroupCallEvents.forEach((element) {
createGroupCallFromRoomStateEvent(element,
emitHandleNewGroupCall: false);
});
}
});
}
Future<void> _onDeviceChange(dynamic _) async {
@ -566,13 +577,8 @@ class VoIP {
/// [type] The type of call to be made.
///
/// [intent] The intent of the call.
///
/// [dataChannelsEnabled] Whether data channels are enabled.
///
/// [dataChannelOptions] The data channel options.
Future<GroupCall?> newGroupCall(String roomId, String type, String intent,
[bool? dataChannelsEnabled,
RTCDataChannelInit? dataChannelOptions]) async {
Future<GroupCall?> newGroupCall(
String roomId, String type, String intent) async {
if (getGroupCallForRoom(roomId) != null) {
Logs().e('[VOIP] [$roomId] already has an existing group call.');
return null;
@ -590,8 +596,6 @@ class VoIP {
room: room,
type: type,
intent: intent,
dataChannelsEnabled: dataChannelsEnabled ?? false,
dataChannelOptions: dataChannelOptions ?? RTCDataChannelInit(),
).create();
groupCalls[groupId] = groupCall;
groupCalls[roomId] = groupCall;
@ -684,8 +688,8 @@ class VoIP {
}
/// Create a new group call from a room state event.
Future<GroupCall?> createGroupCallFromRoomStateEvent(
MatrixEvent event) async {
Future<GroupCall?> createGroupCallFromRoomStateEvent(MatrixEvent event,
{bool emitHandleNewGroupCall = true}) async {
final roomId = event.roomId;
final content = event.content;
@ -715,36 +719,22 @@ class VoIP {
return null;
}
final dataChannelOptionsMap = content['m.data_channel_options'];
var dataChannelsEnabled = false;
final dataChannelOptions = RTCDataChannelInit();
if (dataChannelOptionsMap != null) {
dataChannelsEnabled =
dataChannelOptionsMap['dataChannelsEnabled'] as bool;
dataChannelOptions.ordered = dataChannelOptionsMap['ordered'] as bool;
dataChannelOptions.maxRetransmits =
dataChannelOptionsMap['maxRetransmits'] as int;
dataChannelOptions.maxRetransmits =
dataChannelOptionsMap['maxRetransmits'] as int;
dataChannelOptions.protocol = dataChannelOptionsMap['protocol'] as String;
}
final groupCall = GroupCall(
client: client,
voip: this,
room: room,
groupCallId: groupCallId,
type: callType,
intent: callIntent,
dataChannelsEnabled: dataChannelsEnabled,
dataChannelOptions: dataChannelOptions);
client: client,
voip: this,
room: room,
groupCallId: groupCallId,
type: callType,
intent: callIntent,
);
groupCalls[groupCallId!] = groupCall;
groupCalls[room.id] = groupCall;
onIncomingGroupCall.add(groupCall);
delegate.handleNewGroupCall(groupCall);
if (emitHandleNewGroupCall) {
delegate.handleNewGroupCall(groupCall);
}
return groupCall;
}
@ -752,7 +742,7 @@ class VoIP {
final eventType = event.type;
final roomId = event.roomId;
if (eventType == EventTypes.GroupCallPrefix) {
final groupCallId = event.content['groupCallId'];
final groupCallId = event.stateKey;
final content = event.content;
final currentGroupCall = groupCalls[groupCallId];
if (currentGroupCall == null && content['m.terminated'] == null) {
@ -781,122 +771,6 @@ class VoIP {
}
}
bool hasActiveCall(Room room) {
final groupCallStates =
room.states.tryGetMap<dynamic, Event>(EventTypes.GroupCallPrefix);
if (groupCallStates != null) {
groupCallStates.values
.toList()
.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
final latestGroupCallEvent = groupCallStates.values.last;
if (!latestGroupCallEvent.content.containsKey('m.terminated')) {
return true;
}
}
return false;
}
Future sendGroupCallTerminateEvent(Room room, String groupCallId) async {
try {
Logs().d('[VOIP] running sendterminator');
final existingStateEvent =
room.getState(EventTypes.GroupCallPrefix, groupCallId);
if (existingStateEvent == null) {
Logs().e('could not find group call with id $groupCallId');
return;
}
await client.setRoomStateWithKey(
room.id, EventTypes.GroupCallPrefix, groupCallId, {
...existingStateEvent.content,
'm.terminated': GroupCallTerminationReason.CallEnded,
});
Logs().d('[VOIP] Group call $groupCallId was killed uwu');
} catch (e) {
Logs().i('killing stale call $groupCallId failed. reason: $e');
}
}
Map<String, Timer> staleGroupCallsTimer = {};
/// stops the stale call checker timer
void stopStaleCallsChecker(String roomId) {
if (staleGroupCallsTimer.tryGet(roomId) != null) {
staleGroupCallsTimer[roomId]!.cancel();
} else {
Logs().w('[VOIP] no stale call checker for room found');
}
}
static const staleCallCheckerDuration = Duration(seconds: 30);
bool callMemberStateIsExpired(
MatrixEvent groupCallMemberStateEvent, String groupCallId) {
final callMemberState =
IGroupCallRoomMemberState.fromJson(groupCallMemberStateEvent);
final calls = callMemberState.calls;
if (calls.isNotEmpty) {
final call =
calls.singleWhereOrNull((call) => call.call_id == groupCallId);
if (call != null) {
return call.devices.where((device) => device.expires_ts != null).every(
(device) =>
device.expires_ts! < DateTime.now().millisecondsSinceEpoch);
}
}
return true;
}
/// checks for stale calls in a room and sends `m.terminated` if all the
/// expires_ts are expired. Call when opening a room
void startStaleCallsChecker(String roomId) async {
staleGroupCallsTimer[roomId] = Timer.periodic(
staleCallCheckerDuration,
(timer) {
final room = client.getRoomById(roomId);
if (room == null) {
Logs().w('[VOIP] stale call checker got incorrect room id');
} else {
Logs().d('checking for stale group calls.');
final copyGroupCallIds =
room.states.tryGetMap<dynamic, Event>(EventTypes.GroupCallPrefix);
if (copyGroupCallIds == null) return;
copyGroupCallIds.forEach(
(groupCallId, groupCallEvent) async {
if (groupCallEvent.content.tryGet('m.intent') == 'm.room') return;
if (!groupCallEvent.content.containsKey('m.terminated')) {
if (groupCallId != null) {
Logs().i(
'found non terminated group call with id $groupCallId');
// call is not empty but check for stale participants (gone offline)
// with expire_ts
bool callExpired = true; // assume call is expired
final callMemberEvents = room.states.tryGetMap<String, Event>(
EventTypes.GroupCallMemberPrefix);
if (callMemberEvents != null) {
for (var i = 0; i < callMemberEvents.length; i++) {
final groupCallMemberEventMap =
callMemberEvents.entries.toList()[i];
final groupCallMemberEvent =
groupCallMemberEventMap.value;
callExpired = callMemberStateIsExpired(
groupCallMemberEvent, groupCallId);
// no need to iterate further even if one participant says call isn't expired
if (!callExpired) break;
}
}
if (callExpired) {
Logs().i(
'Group call with only expired timestamps detected, terminating');
await sendGroupCallTerminateEvent(room, groupCallId);
}
}
}
},
);
}
},
);
}
@Deprecated('Call `hasActiveGroupCall` on the room directly instead')
bool hasActiveCall(Room room) => room.hasActiveGroupCall;
}

View File

@ -0,0 +1,147 @@
import 'dart:async';
import 'package:collection/collection.dart';
import 'package:matrix/matrix.dart';
extension GroupCallUtils on Room {
/// returns the user count (not sessions, yet) for the group call with id: `groupCallId`.
/// returns 0 if group call not found
int? groupCallParticipantCount(String groupCallId) {
int participantCount = 0;
final groupCallMemberStates =
states.tryGetMap<String, Event>(EventTypes.GroupCallMemberPrefix);
if (groupCallMemberStates != null) {
groupCallMemberStates.forEach((userId, memberStateEvent) {
if (!callMemberStateIsExpired(memberStateEvent, groupCallId)) {
participantCount++;
}
});
}
return participantCount;
}
bool get hasActiveGroupCall {
if (activeGroupCallEvents.isNotEmpty) {
return true;
}
return false;
}
/// list of active group calls
List<Event> get activeGroupCallEvents {
final groupCallStates =
states.tryGetMap<String, Event>(EventTypes.GroupCallPrefix);
if (groupCallStates != null) {
groupCallStates.values
.toList()
.sort((a, b) => a.originServerTs.compareTo(b.originServerTs));
return groupCallStates.values
.where((element) => !element.content.containsKey('m.terminated'))
.toList();
}
return [];
}
/// stops the stale call checker timer
void stopStaleCallsChecker(String roomId) {
if (staleGroupCallsTimer.tryGet(roomId) != null) {
staleGroupCallsTimer[roomId]!.cancel();
} else {
Logs().w('[VOIP] no stale call checker for room found');
}
}
static const staleCallCheckerDuration = Duration(seconds: 30);
bool callMemberStateIsExpired(
MatrixEvent groupCallMemberStateEvent, String groupCallId) {
final callMemberState =
IGroupCallRoomMemberState.fromJson(groupCallMemberStateEvent);
final calls = callMemberState.calls;
if (calls.isNotEmpty) {
final call =
calls.singleWhereOrNull((call) => call.call_id == groupCallId);
if (call != null) {
return call.devices.where((device) => device.expires_ts != null).every(
(device) =>
device.expires_ts! < DateTime.now().millisecondsSinceEpoch);
}
}
return true;
}
/// checks for stale calls in a room and sends `m.terminated` if all the
/// expires_ts are expired. Call when opening a room
void startStaleCallsChecker(String roomId) async {
stopStaleCallsChecker(roomId);
await singleShotStaleCallCheckerOnRoom();
staleGroupCallsTimer[roomId] = Timer.periodic(
staleCallCheckerDuration,
(timer) async => await singleShotStaleCallCheckerOnRoom(),
);
}
Future<void> singleShotStaleCallCheckerOnRoom() async {
Logs().d('checking for stale group calls in room $id');
final copyGroupCallIds =
states.tryGetMap<String, Event>(EventTypes.GroupCallPrefix);
if (copyGroupCallIds == null) return;
copyGroupCallIds.forEach(
(groupCallId, groupCallEvent) async {
if (groupCallEvent.content.tryGet('m.intent') == 'm.room') return;
if (!groupCallEvent.content.containsKey('m.terminated')) {
Logs().i('found non terminated group call with id $groupCallId');
// call is not empty but check for stale participants (gone offline)
// with expire_ts
bool callExpired = true; // assume call is expired
final callMemberEvents =
states.tryGetMap<String, Event>(EventTypes.GroupCallMemberPrefix);
if (callMemberEvents != null) {
for (var i = 0; i < callMemberEvents.length; i++) {
final groupCallMemberEventMap =
callMemberEvents.entries.toList()[i];
final groupCallMemberEvent = groupCallMemberEventMap.value;
callExpired =
callMemberStateIsExpired(groupCallMemberEvent, groupCallId);
// no need to iterate further even if one participant says call isn't expired
if (!callExpired) break;
}
}
if (callExpired) {
Logs().i(
'Group call with only expired timestamps detected, terminating');
await sendGroupCallTerminateEvent(groupCallId);
}
}
},
);
}
/// returns the event_id if successful
Future<String?> sendGroupCallTerminateEvent(String groupCallId) async {
try {
Logs().d('[VOIP] running sendterminator');
final existingStateEvent =
getState(EventTypes.GroupCallPrefix, groupCallId);
if (existingStateEvent == null) {
Logs().e('could not find group call with id $groupCallId');
return null;
}
final req = await client
.setRoomStateWithKey(id, EventTypes.GroupCallPrefix, groupCallId, {
...existingStateEvent.content,
'm.terminated': GroupCallTerminationReason.CallEnded,
});
Logs().d('[VOIP] Group call $groupCallId was killed uwu');
return req;
} catch (e) {
Logs().i('killing stale call $groupCallId failed. reason: $e');
return null;
}
}
}

View File

@ -2535,6 +2535,10 @@ class FakeMatrixApi extends BaseClient {
'/client/unstable/org.matrix.msc3814.v1/dehydrated_device': (var _) => {
'device_id': 'DEHYDDEV',
},
'/client/v3/rooms/${Uri.encodeComponent("!localpart:server.abc")}/state/${Uri.encodeComponent("org.matrix.msc3401.call")}/${Uri.encodeComponent("1675856324414gzczMtfzTk0DKgEw")}':
(var req) => {
'event_id': 'groupCall',
},
},
'DELETE': {
'/unknown/token': (var req) => {'errcode': 'M_UNKNOWN_TOKEN'},

View File

@ -1359,6 +1359,212 @@ void main() {
expect(matrixToLink.toString(),
'https://matrix.to/#/!localpart%3Aserver.abc?via=example.org&via=example.com&via=test.abc');
});
test('callMemberStateIsExpired', () {
expect(
room.callMemberStateIsExpired(
Event(
senderId: '@test:example.com',
type: EventTypes.GroupCallMemberPrefix,
room: room,
eventId: '1231234124',
content: {
'm.calls': [
{
'm.call_id': '1674811248673789288k7d60n5976',
'm.devices': [
{
'device_id': 'ZEEGCGPTGI',
'session_id': 'cbAtVZdLBnJq',
'm.expires_ts': 1674813039415,
'feeds': [
{'purpose': 'm.usermedia'}
]
}
]
},
],
},
originServerTs: DateTime.now(),
stateKey: ''),
'1674811248673789288k7d60n5976'),
true);
expect(
room.callMemberStateIsExpired(
Event(
senderId: '@test:example.com',
type: EventTypes.GroupCallMemberPrefix,
room: room,
eventId: '1231234124',
content: {
'm.calls': [
{
'm.call_id': '1674811256006mfqnmsAbzqxjYtWZ',
'm.devices': [
{
'device_id': 'ZEEGCGPTGI',
'session_id': 'fhovqxwcasdfr',
'expires_ts': DateTime.now()
.add(Duration(minutes: 1))
.millisecondsSinceEpoch,
'feeds': [
{'purpose': 'm.usermedia'}
]
}
]
}
],
},
originServerTs: DateTime.now(),
stateKey: ''),
'1674811256006mfqnmsAbzqxjYtWZ'),
false);
});
test('stale call checker and terminator', () async {
room.setState(Event(
content: {'m.intent': 'm.prompt', 'm.type': 'm.video'},
type: EventTypes.GroupCallPrefix,
eventId: 'asdfasdf',
senderId: '@test:example.com',
originServerTs: DateTime.now(),
room: room,
stateKey: '1675856324414gzczMtfzTk0DKgEw'));
expect(room.hasActiveGroupCall, true);
expect(room.activeGroupCallEvents.length, 1);
expect(
await room
.sendGroupCallTerminateEvent('1675856324414gzczMtfzTk0DKgEw'),
'groupCall');
room.setState(Event(
content: {
'm.intent': 'm.prompt',
'm.type': 'm.video',
'm.terminated': 'call_ended'
},
type: EventTypes.GroupCallPrefix,
eventId: 'asdfasdf',
senderId: '@test:example.com',
originServerTs: DateTime.now(),
room: room,
stateKey: '1675856324414gzczMtfzTk0DKgEw'));
expect(room.hasActiveGroupCall, false);
expect(room.activeGroupCallEvents.length, 0);
});
test('group call participants count', () {
room.setState(
Event(
senderId: '@test:example.com',
type: EventTypes.GroupCallMemberPrefix,
room: room,
eventId: '1234177',
content: {
'm.calls': [
{
'm.call_id': '1674811256006mfqnmsAbzqxjYtWZ',
'm.devices': [
{
'device_id': 'ZEEGCGPTGI',
'session_id': 'fhovqxwcasdfr',
'expires_ts': DateTime.now()
.add(Duration(minutes: 1))
.millisecondsSinceEpoch,
'feeds': [
{'purpose': 'm.usermedia'}
]
},
]
}
],
},
originServerTs: DateTime.now(),
stateKey: '@test:example.com'),
);
room.setState(
Event(
senderId: '@test0:example.com',
type: EventTypes.GroupCallMemberPrefix,
room: room,
eventId: '1234177',
content: {
'm.calls': [
{
'm.call_id': '1674811256006mfqnmsAbzqxjYtWZ',
'm.devices': [
{
'device_id': 'ZEEGCGPTGI',
'session_id': 'fhovqxwcasdfr',
'expires_ts': DateTime.now()
.add(Duration(minutes: 2))
.millisecondsSinceEpoch,
'feeds': [
{'purpose': 'm.usermedia'}
]
},
]
}
],
},
originServerTs: DateTime.now(),
stateKey: '@test0:example.com'),
);
room.setState(
Event(
senderId: '@test2:example.com',
type: EventTypes.GroupCallMemberPrefix,
room: room,
eventId: '1231234124123',
content: {
'm.calls': [
{
'm.call_id': '1674811256006mfqnmsAbzqxjYtWZ',
'm.devices': [
{
'device_id': 'ZEEGCGPTGI',
'session_id': 'fhovqxwcasdfr',
'feeds': [
{'purpose': 'm.usermedia'}
]
},
]
}
],
},
originServerTs: DateTime.now(),
stateKey: '@test2:example.com'),
);
room.setState(
Event(
senderId: '@test3:example.com',
type: EventTypes.GroupCallMemberPrefix,
room: room,
eventId: '123123412445',
content: {
'm.calls': [
{
'm.call_id': '1674811256006mfqnmsAbzqxjYtWZ',
'm.devices': [
{
'device_id': 'ZEEGCGPTGI',
'session_id': 'fhovqxwcasdfr',
'expires_ts': DateTime.now()
.subtract(Duration(minutes: 1))
.millisecondsSinceEpoch,
'feeds': [
{'purpose': 'm.usermedia'}
]
},
]
}
],
},
originServerTs: DateTime.now(),
stateKey: '@test3:example.com'),
);
expect(
room.groupCallParticipantCount('1674811256006mfqnmsAbzqxjYtWZ'), 2);
});
test('logout', () async {
await matrix.logout();
});