Merge branch 'soru/better-chunked-sending' into 'main'
fix: Smoothen up sending to rooms with extremely many devices See merge request famedly/famedlysdk!614
This commit is contained in:
commit
4ca6560607
|
|
@ -1635,7 +1635,6 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
|
||||||
bool onlyVerified = false,
|
bool onlyVerified = false,
|
||||||
}) async {
|
}) async {
|
||||||
if (!encryptionEnabled) return;
|
if (!encryptionEnabled) return;
|
||||||
Logs().v('Sending to device message... (${deviceKeys.length} pre-devices)');
|
|
||||||
// Don't send this message to blocked devices, and if specified onlyVerified
|
// Don't send this message to blocked devices, and if specified onlyVerified
|
||||||
// then only send it to verified devices
|
// then only send it to verified devices
|
||||||
if (deviceKeys.isNotEmpty) {
|
if (deviceKeys.isNotEmpty) {
|
||||||
|
|
@ -1645,8 +1644,6 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
|
||||||
(onlyVerified && !deviceKeys.verified));
|
(onlyVerified && !deviceKeys.verified));
|
||||||
if (deviceKeys.isEmpty) return;
|
if (deviceKeys.isEmpty) return;
|
||||||
}
|
}
|
||||||
Logs().v('Sending to device message... (${deviceKeys.length} pre-devices)');
|
|
||||||
Logs().v(deviceKeys.map((k) => '${k.userId}:${k.deviceId}').toList());
|
|
||||||
|
|
||||||
// Send with send-to-device messaging
|
// Send with send-to-device messaging
|
||||||
var data = <String, Map<String, Map<String, dynamic>>>{};
|
var data = <String, Map<String, Map<String, dynamic>>>{};
|
||||||
|
|
@ -1666,19 +1663,24 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
|
||||||
Map<String, dynamic> message,
|
Map<String, dynamic> message,
|
||||||
) async {
|
) async {
|
||||||
if (!encryptionEnabled) return;
|
if (!encryptionEnabled) return;
|
||||||
|
// be sure to copy our device keys list
|
||||||
|
deviceKeys = List<DeviceKeys>.from(deviceKeys);
|
||||||
deviceKeys.removeWhere((DeviceKeys k) =>
|
deviceKeys.removeWhere((DeviceKeys k) =>
|
||||||
k.blocked || (k.userId == userID && k.deviceId == deviceID));
|
k.blocked || (k.userId == userID && k.deviceId == deviceID));
|
||||||
if (deviceKeys.isEmpty) return;
|
if (deviceKeys.isEmpty) return;
|
||||||
message =
|
message = message.copy(); // make sure we deep-copy the message
|
||||||
json.decode(json.encode(message)); // make sure we deep-copy the message
|
|
||||||
// make sure all the olm sessions are loaded from database
|
// make sure all the olm sessions are loaded from database
|
||||||
Logs().v('Sending to device chunked... (${deviceKeys.length} devices)');
|
Logs().v('Sending to device chunked... (${deviceKeys.length} devices)');
|
||||||
// sort so that devices we last received messages from get our message first
|
// sort so that devices we last received messages from get our message first
|
||||||
deviceKeys.sort((keyA, keyB) => keyB.lastActive.compareTo(keyA.lastActive));
|
deviceKeys.sort((keyA, keyB) => keyB.lastActive.compareTo(keyA.lastActive));
|
||||||
Logs().v(deviceKeys.map((k) => '${k.userId}:${k.deviceId}').toList());
|
|
||||||
// and now send out in chunks of 20
|
// and now send out in chunks of 20
|
||||||
const chunkSize = 20;
|
const chunkSize = 20;
|
||||||
for (var i = 0; i < deviceKeys.length; i += chunkSize) {
|
|
||||||
|
// first we send out all the chunks that we await
|
||||||
|
var i = 0;
|
||||||
|
// we leave this in a for-loop for now, so that we can easily adjust the break condition
|
||||||
|
// based on other things, if we want to hard-`await` more devices in the future
|
||||||
|
for (; i < deviceKeys.length && i <= 0; i += chunkSize) {
|
||||||
Logs().v('Sending chunk $i...');
|
Logs().v('Sending chunk $i...');
|
||||||
final chunk = deviceKeys.sublist(
|
final chunk = deviceKeys.sublist(
|
||||||
i,
|
i,
|
||||||
|
|
@ -1686,11 +1688,24 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
|
||||||
? deviceKeys.length
|
? deviceKeys.length
|
||||||
: i + chunkSize);
|
: i + chunkSize);
|
||||||
// and send
|
// and send
|
||||||
Logs().v(chunk.map((k) => '${k.userId}:${k.deviceId}').toList());
|
await sendToDeviceEncrypted(chunk, eventType, message);
|
||||||
final future = sendToDeviceEncrypted(chunk, eventType, message);
|
}
|
||||||
if (i == 0) {
|
// now send out the background chunks
|
||||||
await future;
|
if (i < deviceKeys.length) {
|
||||||
}
|
unawaited(() async {
|
||||||
|
for (; i < deviceKeys.length; i += chunkSize) {
|
||||||
|
// wait 50ms to not freeze the UI
|
||||||
|
await Future.delayed(Duration(milliseconds: 50));
|
||||||
|
Logs().v('Sending chunk $i...');
|
||||||
|
final chunk = deviceKeys.sublist(
|
||||||
|
i,
|
||||||
|
i + chunkSize > deviceKeys.length
|
||||||
|
? deviceKeys.length
|
||||||
|
: i + chunkSize);
|
||||||
|
// and send
|
||||||
|
unawaited(sendToDeviceEncrypted(chunk, eventType, message));
|
||||||
|
}
|
||||||
|
}());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -391,7 +391,7 @@ void main() {
|
||||||
'msgtype': 'm.text',
|
'msgtype': 'm.text',
|
||||||
'body': 'Hello world',
|
'body': 'Hello world',
|
||||||
});
|
});
|
||||||
await Future.delayed(Duration(milliseconds: 50));
|
await Future.delayed(Duration(milliseconds: 100));
|
||||||
expect(
|
expect(
|
||||||
FakeMatrixApi.calledEndpoints.keys
|
FakeMatrixApi.calledEndpoints.keys
|
||||||
.where((k) =>
|
.where((k) =>
|
||||||
|
|
@ -440,7 +440,7 @@ void main() {
|
||||||
k.startsWith('/client/r0/sendToDevice/m.room.encrypted'))
|
k.startsWith('/client/r0/sendToDevice/m.room.encrypted'))
|
||||||
.length,
|
.length,
|
||||||
1);
|
1);
|
||||||
await Future.delayed(Duration(milliseconds: 50));
|
await Future.delayed(Duration(milliseconds: 100));
|
||||||
expect(
|
expect(
|
||||||
FakeMatrixApi.calledEndpoints.keys
|
FakeMatrixApi.calledEndpoints.keys
|
||||||
.where((k) =>
|
.where((k) =>
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue