Merge branch 'soru/lazy-send-keys' into 'main'

feat: Lazy-send room keys, chunked and sorted by importance

Closes #125

See merge request famedly/famedlysdk!580
This commit is contained in:
Sorunome 2020-12-29 13:50:39 +00:00
commit bbcc883777
8 changed files with 234 additions and 45 deletions

View File

@ -384,7 +384,7 @@ class KeyManager {
sess.outboundGroupSession.session_id());
}
// send out the key
await client.sendToDeviceEncrypted(
await client.sendToDeviceEncryptedChunked(
devicesToReceive, EventTypes.RoomKey, rawSession);
}
} catch (e, s) {
@ -484,7 +484,7 @@ class KeyManager {
key: client.userID,
);
try {
await client.sendToDeviceEncrypted(
await client.sendToDeviceEncryptedChunked(
deviceKeys, EventTypes.RoomKey, rawSession);
await storeOutboundGroupSession(roomId, sess);
_outboundGroupSessions[roomId] = sess;

View File

@ -227,7 +227,7 @@ class OlmManager {
if (client.database == null) {
return;
}
_olmSessions[session.identityKey] ??= [];
_olmSessions[session.identityKey] ??= <OlmSession>[];
final ix = _olmSessions[session.identityKey]
.indexWhere((s) => s.sessionId == session.sessionId);
if (ix == -1) {
@ -262,10 +262,22 @@ class OlmManager {
if (type != 0 && type != 1) {
throw ('Unknown message type');
}
final device = client.userDeviceKeys[event.sender]?.deviceKeys?.values
?.firstWhere((d) => d.curve25519Key == senderKey, orElse: () => null);
final existingSessions = olmSessions[senderKey];
final updateSessionUsage = (OlmSession session) => runInRoot(() {
final updateSessionUsage = ([OlmSession session]) => runInRoot(() {
if (session != null) {
session.lastReceived = DateTime.now();
storeOlmSession(session);
}
if (device != null) {
device.lastActive = DateTime.now();
client.database?.setLastActiveUserDeviceKey(
device.lastActive.millisecondsSinceEpoch,
client.id,
device.userId,
device.deviceId);
}
});
if (existingSessions != null) {
for (var session in existingSessions) {
@ -302,6 +314,7 @@ class OlmManager {
session: newSession,
lastReceived: DateTime.now(),
)));
updateSessionUsage();
} catch (_) {
newSession?.free();
rethrow;
@ -416,7 +429,6 @@ class OlmManager {
Future<void> startOutgoingOlmSessions(List<DeviceKeys> deviceKeys) async {
Logs().v(
'[OlmManager] Starting session with ${deviceKeys.length} devices...');
var requestingKeysFrom = <String, Map<String, String>>{};
for (var device in deviceKeys) {
if (requestingKeysFrom[device.userId] == null) {
@ -468,7 +480,6 @@ class OlmManager {
if (sess.isEmpty) {
throw ('No olm session found for ${device.userId}:${device.deviceId}');
}
final fullPayload = {
'type': type,
'content': payload,

View File

@ -1426,8 +1426,8 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
final deviceId = rawDeviceKeyEntry.key;
// Set the new device key for this device
final entry =
DeviceKeys.fromMatrixDeviceKeys(rawDeviceKeyEntry.value, this);
final entry = DeviceKeys.fromMatrixDeviceKeys(
rawDeviceKeyEntry.value, this, oldKeys[deviceId]?.lastActive);
if (entry.isValid) {
// is this a new key or the same one as an old one?
// better store an update - the signatures might have changed!
@ -1453,6 +1453,7 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
json.encode(entry.toJson()),
entry.directVerified,
entry.blocked,
entry.lastActive.millisecondsSinceEpoch,
));
}
} else if (oldKeys.containsKey(deviceId)) {
@ -1621,6 +1622,43 @@ sort order of ${prevState.sortOrder}. This should never happen...''');
eventType, messageId ?? generateUniqueTransactionId(), data);
}
/// Sends an encrypted [message] of this [type] to these [deviceKeys]. This request happens
/// partly in the background and partly in the foreground. It automatically chunks sending
/// to device keys based on activity
Future<void> sendToDeviceEncryptedChunked(
List<DeviceKeys> deviceKeys,
String eventType,
Map<String, dynamic> message,
) async {
if (!encryptionEnabled) return;
deviceKeys.removeWhere((DeviceKeys k) =>
k.blocked || (k.userId == userID && k.deviceId == deviceID));
if (deviceKeys.isEmpty) return;
message =
json.decode(json.encode(message)); // make sure we deep-copy the message
// make sure all the olm sessions are loaded from database
Logs().v('Sending to device chunked... (${deviceKeys.length} devices)');
// sort so that devices we last received messages from get our message first
deviceKeys.sort((keyA, keyB) => keyB.lastActive.compareTo(keyA.lastActive));
Logs().v(deviceKeys.map((k) => '${k.userId}:${k.deviceId}').toList());
// and now send out in chunks of 20
const chunkSize = 20;
for (var i = 0; i < deviceKeys.length; i += chunkSize) {
Logs().v('Sending chunk $i...');
final chunk = deviceKeys.sublist(
i,
i + chunkSize > deviceKeys.length
? deviceKeys.length
: i + chunkSize);
// and send
Logs().v(chunk.map((k) => '${k.userId}:${k.deviceId}').toList());
final future = sendToDeviceEncrypted(chunk, eventType, message);
if (i == 0) {
await future;
}
}
}
/// Whether all push notifications are muted using the [.m.rule.master]
/// rule of the push rules: https://matrix.org/docs/spec/client_server/r0.6.0#m-rule-master
bool get allPushNotificationsMuted {

View File

@ -54,7 +54,7 @@ class Database extends _$Database {
Database.connect(DatabaseConnection connection) : super.connect(connection);
@override
int get schemaVersion => 8;
int get schemaVersion => 9;
int get maxFileSize => 1 * 1024 * 1024;
@ -143,6 +143,11 @@ class Database extends _$Database {
inboundGroupSessions, inboundGroupSessions.allowedAtIndex);
from++;
}
if (from == 8) {
await m.addColumnIfNotExists(
userDeviceKeysKey, userDeviceKeysKey.lastActive);
from++;
}
} catch (e, s) {
Logs().e('Database migration failed', e, s);
onError.add(SdkError(exception: e, stackTrace: s));

View File

@ -768,13 +768,15 @@ class DbUserDeviceKeysKey extends DataClass
final String content;
final bool verified;
final bool blocked;
final int lastActive;
DbUserDeviceKeysKey(
{@required this.clientId,
@required this.userId,
@required this.deviceId,
@required this.content,
this.verified,
this.blocked});
this.blocked,
this.lastActive});
factory DbUserDeviceKeysKey.fromData(
Map<String, dynamic> data, GeneratedDatabase db,
{String prefix}) {
@ -795,6 +797,8 @@ class DbUserDeviceKeysKey extends DataClass
boolType.mapFromDatabaseResponse(data['${effectivePrefix}verified']),
blocked:
boolType.mapFromDatabaseResponse(data['${effectivePrefix}blocked']),
lastActive: intType
.mapFromDatabaseResponse(data['${effectivePrefix}last_active']),
);
}
@override
@ -818,6 +822,9 @@ class DbUserDeviceKeysKey extends DataClass
if (!nullToAbsent || blocked != null) {
map['blocked'] = Variable<bool>(blocked);
}
if (!nullToAbsent || lastActive != null) {
map['last_active'] = Variable<int>(lastActive);
}
return map;
}
@ -840,6 +847,9 @@ class DbUserDeviceKeysKey extends DataClass
blocked: blocked == null && nullToAbsent
? const Value.absent()
: Value(blocked),
lastActive: lastActive == null && nullToAbsent
? const Value.absent()
: Value(lastActive),
);
}
@ -853,6 +863,7 @@ class DbUserDeviceKeysKey extends DataClass
content: serializer.fromJson<String>(json['content']),
verified: serializer.fromJson<bool>(json['verified']),
blocked: serializer.fromJson<bool>(json['blocked']),
lastActive: serializer.fromJson<int>(json['last_active']),
);
}
@override
@ -865,6 +876,7 @@ class DbUserDeviceKeysKey extends DataClass
'content': serializer.toJson<String>(content),
'verified': serializer.toJson<bool>(verified),
'blocked': serializer.toJson<bool>(blocked),
'last_active': serializer.toJson<int>(lastActive),
};
}
@ -874,7 +886,8 @@ class DbUserDeviceKeysKey extends DataClass
String deviceId,
String content,
bool verified,
bool blocked}) =>
bool blocked,
int lastActive}) =>
DbUserDeviceKeysKey(
clientId: clientId ?? this.clientId,
userId: userId ?? this.userId,
@ -882,6 +895,7 @@ class DbUserDeviceKeysKey extends DataClass
content: content ?? this.content,
verified: verified ?? this.verified,
blocked: blocked ?? this.blocked,
lastActive: lastActive ?? this.lastActive,
);
@override
String toString() {
@ -891,7 +905,8 @@ class DbUserDeviceKeysKey extends DataClass
..write('deviceId: $deviceId, ')
..write('content: $content, ')
..write('verified: $verified, ')
..write('blocked: $blocked')
..write('blocked: $blocked, ')
..write('lastActive: $lastActive')
..write(')'))
.toString();
}
@ -903,8 +918,10 @@ class DbUserDeviceKeysKey extends DataClass
userId.hashCode,
$mrjc(
deviceId.hashCode,
$mrjc(content.hashCode,
$mrjc(verified.hashCode, blocked.hashCode))))));
$mrjc(
content.hashCode,
$mrjc(verified.hashCode,
$mrjc(blocked.hashCode, lastActive.hashCode)))))));
@override
bool operator ==(dynamic other) =>
identical(this, other) ||
@ -914,7 +931,8 @@ class DbUserDeviceKeysKey extends DataClass
other.deviceId == this.deviceId &&
other.content == this.content &&
other.verified == this.verified &&
other.blocked == this.blocked);
other.blocked == this.blocked &&
other.lastActive == this.lastActive);
}
class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
@ -924,6 +942,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
final Value<String> content;
final Value<bool> verified;
final Value<bool> blocked;
final Value<int> lastActive;
const UserDeviceKeysKeyCompanion({
this.clientId = const Value.absent(),
this.userId = const Value.absent(),
@ -931,6 +950,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
this.content = const Value.absent(),
this.verified = const Value.absent(),
this.blocked = const Value.absent(),
this.lastActive = const Value.absent(),
});
UserDeviceKeysKeyCompanion.insert({
@required int clientId,
@ -939,6 +959,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
@required String content,
this.verified = const Value.absent(),
this.blocked = const Value.absent(),
this.lastActive = const Value.absent(),
}) : clientId = Value(clientId),
userId = Value(userId),
deviceId = Value(deviceId),
@ -950,6 +971,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
Expression<String> content,
Expression<bool> verified,
Expression<bool> blocked,
Expression<int> lastActive,
}) {
return RawValuesInsertable({
if (clientId != null) 'client_id': clientId,
@ -958,6 +980,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
if (content != null) 'content': content,
if (verified != null) 'verified': verified,
if (blocked != null) 'blocked': blocked,
if (lastActive != null) 'last_active': lastActive,
});
}
@ -967,7 +990,8 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
Value<String> deviceId,
Value<String> content,
Value<bool> verified,
Value<bool> blocked}) {
Value<bool> blocked,
Value<int> lastActive}) {
return UserDeviceKeysKeyCompanion(
clientId: clientId ?? this.clientId,
userId: userId ?? this.userId,
@ -975,6 +999,7 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
content: content ?? this.content,
verified: verified ?? this.verified,
blocked: blocked ?? this.blocked,
lastActive: lastActive ?? this.lastActive,
);
}
@ -999,6 +1024,9 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
if (blocked.present) {
map['blocked'] = Variable<bool>(blocked.value);
}
if (lastActive.present) {
map['last_active'] = Variable<int>(lastActive.value);
}
return map;
}
@ -1010,7 +1038,8 @@ class UserDeviceKeysKeyCompanion extends UpdateCompanion<DbUserDeviceKeysKey> {
..write('deviceId: $deviceId, ')
..write('content: $content, ')
..write('verified: $verified, ')
..write('blocked: $blocked')
..write('blocked: $blocked, ')
..write('lastActive: $lastActive')
..write(')'))
.toString();
}
@ -1071,9 +1100,17 @@ class UserDeviceKeysKey extends Table
defaultValue: const CustomExpression<bool>('false'));
}
final VerificationMeta _lastActiveMeta = const VerificationMeta('lastActive');
GeneratedIntColumn _lastActive;
GeneratedIntColumn get lastActive => _lastActive ??= _constructLastActive();
GeneratedIntColumn _constructLastActive() {
return GeneratedIntColumn('last_active', $tableName, true,
$customConstraints: '');
}
@override
List<GeneratedColumn> get $columns =>
[clientId, userId, deviceId, content, verified, blocked];
[clientId, userId, deviceId, content, verified, blocked, lastActive];
@override
UserDeviceKeysKey get asDslTable => this;
@override
@ -1118,6 +1155,12 @@ class UserDeviceKeysKey extends Table
context.handle(_blockedMeta,
blocked.isAcceptableOrUnknown(data['blocked'], _blockedMeta));
}
if (data.containsKey('last_active')) {
context.handle(
_lastActiveMeta,
lastActive.isAcceptableOrUnknown(
data['last_active'], _lastActiveMeta));
}
return context;
}
@ -6462,17 +6505,24 @@ abstract class _$Database extends GeneratedDatabase {
);
}
Future<int> storeUserDeviceKey(int client_id, String user_id,
String device_id, String content, bool verified, bool blocked) {
Future<int> storeUserDeviceKey(
int client_id,
String user_id,
String device_id,
String content,
bool verified,
bool blocked,
int last_active) {
return customInsert(
'INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked)',
'INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked, last_active) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked, :last_active)',
variables: [
Variable.withInt(client_id),
Variable.withString(user_id),
Variable.withString(device_id),
Variable.withString(content),
Variable.withBool(verified),
Variable.withBool(blocked)
Variable.withBool(blocked),
Variable.withInt(last_active)
],
updates: {userDeviceKeysKey},
);
@ -6492,6 +6542,21 @@ abstract class _$Database extends GeneratedDatabase {
);
}
Future<int> setLastActiveUserDeviceKey(
int last_active, int client_id, String user_id, String device_id) {
return customUpdate(
'UPDATE user_device_keys_key SET last_active = :last_active WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id',
variables: [
Variable.withInt(last_active),
Variable.withInt(client_id),
Variable.withString(user_id),
Variable.withString(device_id)
],
updates: {userDeviceKeysKey},
updateKind: UpdateKind.update,
);
}
Future<int> setVerifiedUserCrossSigningKey(
bool verified, int client_id, String user_id, String public_key) {
return customUpdate(

View File

@ -28,6 +28,7 @@ CREATE TABLE user_device_keys_key (
content TEXT NOT NULL,
verified BOOLEAN DEFAULT false,
blocked BOOLEAN DEFAULT false,
last_active BIGINT,
UNIQUE(client_id, user_id, device_id)
) as DbUserDeviceKeysKey;
CREATE INDEX user_device_keys_key_index ON user_device_keys_key(client_id);
@ -199,8 +200,9 @@ markInboundGroupSessionsAsNeedingUpload: UPDATE inbound_group_sessions SET uploa
storeUserDeviceKeysInfo: INSERT OR REPLACE INTO user_device_keys (client_id, user_id, outdated) VALUES (:client_id, :user_id, :outdated);
setVerifiedUserDeviceKey: UPDATE user_device_keys_key SET verified = :verified WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id;
setBlockedUserDeviceKey: UPDATE user_device_keys_key SET blocked = :blocked WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id;
storeUserDeviceKey: INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked);
storeUserDeviceKey: INSERT OR REPLACE INTO user_device_keys_key (client_id, user_id, device_id, content, verified, blocked, last_active) VALUES (:client_id, :user_id, :device_id, :content, :verified, :blocked, :last_active);
removeUserDeviceKey: DELETE FROM user_device_keys_key WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id;
setLastActiveUserDeviceKey: UPDATE user_device_keys_key SET last_active = :last_active WHERE client_id = :client_id AND user_id = :user_id AND device_id = :device_id;
setVerifiedUserCrossSigningKey: UPDATE user_cross_signing_keys SET verified = :verified WHERE client_id = :client_id AND user_id = :user_id AND public_key = :public_key;
setBlockedUserCrossSigningKey: UPDATE user_cross_signing_keys SET blocked = :blocked WHERE client_id = :client_id AND user_id = :user_id AND public_key = :public_key;
storeUserCrossSigningKey: INSERT OR REPLACE INTO user_cross_signing_keys (client_id, user_id, public_key, content, verified, blocked) VALUES (:client_id, :user_id, :public_key, :content, :verified, :blocked);

View File

@ -363,6 +363,7 @@ class CrossSigningKey extends SignableKey {
class DeviceKeys extends SignableKey {
String get deviceId => identifier;
List<String> algorithms;
DateTime lastActive;
String get curve25519Key => keys['curve25519:$deviceId'];
String get deviceDisplayName =>
@ -407,11 +408,13 @@ class DeviceKeys extends SignableKey {
?.setBlockedUserDeviceKey(newBlocked, client.id, userId, deviceId);
}
DeviceKeys.fromMatrixDeviceKeys(MatrixDeviceKeys k, Client cl)
DeviceKeys.fromMatrixDeviceKeys(MatrixDeviceKeys k, Client cl,
[DateTime lastActiveTs])
: super.fromJson(k.toJson().copy(), cl) {
final json = toJson();
identifier = k.deviceId;
algorithms = json['algorithms'].cast<String>();
lastActive = lastActiveTs ?? DateTime.now();
}
DeviceKeys.fromDb(DbUserDeviceKeysKey dbEntry, Client cl)
@ -421,6 +424,7 @@ class DeviceKeys extends SignableKey {
algorithms = json['algorithms'].cast<String>();
_verified = dbEntry.verified;
blocked = dbEntry.blocked;
lastActive = DateTime.fromMillisecondsSinceEpoch(dbEntry.lastActive ?? 0);
}
DeviceKeys.fromJson(Map<String, dynamic> json, Client cl)
@ -428,6 +432,7 @@ class DeviceKeys extends SignableKey {
final json = toJson();
identifier = json['device_id'];
algorithms = json['algorithms'].cast<String>();
lastActive = DateTime.fromMillisecondsSinceEpoch(0);
}
KeyVerification startVerification() {

View File

@ -17,6 +17,7 @@
*/
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:famedlysdk/famedlysdk.dart';
@ -29,6 +30,7 @@ import 'package:logger/logger.dart';
import 'package:famedlysdk/src/utils/room_update.dart';
import 'package:olm/olm.dart' as olm;
import 'package:test/test.dart';
import 'package:canonical_json/canonical_json.dart';
import 'fake_client.dart';
import 'fake_database.dart';
@ -359,32 +361,93 @@ void main() {
'mxc://example.org/SEsfnsuifSDFSSEF');
expect(aliceProfile.displayname, 'Alice Margatroid');
});
var deviceKeys = DeviceKeys.fromJson({
'user_id': '@alice:example.com',
'device_id': 'JLAFKJWSCS',
'algorithms': [
AlgorithmTypes.olmV1Curve25519AesSha2,
AlgorithmTypes.megolmV1AesSha2
],
'keys': {
'curve25519:JLAFKJWSCS': '3C5BFWi2Y8MaVvjM8M22DBmh24PmgR0nPvJOIArzgyI',
'ed25519:JLAFKJWSCS': 'lEuiRJBit0IG6nUf5pUzWTUEsRVVe/HJkoKuEww9ULI'
},
'signatures': {
'@alice:example.com': {
'ed25519:JLAFKJWSCS':
'dSO80A01XiigH3uBiDVx/EjzaoycHcjq9lfQX0uWsqxl2giMIiSPR8a4d291W1ihKJL/a+myXS367WT6NAIcBA'
}
}
}, matrix);
test('sendToDeviceEncrypted', () async {
if (!olmEnabled) {
return;
}
FakeMatrixApi.calledEndpoints.clear();
await matrix.sendToDeviceEncrypted(
[deviceKeys],
matrix.userDeviceKeys['@alice:example.com'].deviceKeys.values
.toList(),
'm.message',
{
'msgtype': 'm.text',
'body': 'Hello world',
});
expect(
FakeMatrixApi.calledEndpoints.keys.any(
(k) => k.startsWith('/client/r0/sendToDevice/m.room.encrypted')),
true);
});
test('sendToDeviceEncryptedChunked', () async {
if (!olmEnabled) {
return;
}
FakeMatrixApi.calledEndpoints.clear();
await matrix.sendToDeviceEncryptedChunked(
matrix.userDeviceKeys['@alice:example.com'].deviceKeys.values
.toList(),
'm.message',
{
'msgtype': 'm.text',
'body': 'Hello world',
});
await Future.delayed(Duration(milliseconds: 50));
expect(
FakeMatrixApi.calledEndpoints.keys
.where((k) =>
k.startsWith('/client/r0/sendToDevice/m.room.encrypted'))
.length,
1);
final deviceKeys = <DeviceKeys>[];
for (var i = 0; i < 30; i++) {
final account = olm.Account();
account.create();
final keys = json.decode(account.identity_keys());
final userId = '@testuser:example.org';
final deviceId = 'DEVICE$i';
final keyObj = {
'user_id': userId,
'device_id': deviceId,
'algorithms': [
'm.olm.v1.curve25519-aes-sha2',
'm.megolm.v1.aes-sha2',
],
'keys': {
'curve25519:$deviceId': keys['curve25519'],
'ed25519:$deviceId': keys['ed25519'],
},
};
final signature =
account.sign(String.fromCharCodes(canonicalJson.encode(keyObj)));
keyObj['signatures'] = {
userId: {
'ed25519:$deviceId': signature,
},
};
account.free();
deviceKeys.add(DeviceKeys.fromJson(keyObj, matrix));
}
FakeMatrixApi.calledEndpoints.clear();
await matrix.sendToDeviceEncryptedChunked(deviceKeys, 'm.message', {
'msgtype': 'm.text',
'body': 'Hello world',
});
// it should send the first chunk right away
expect(
FakeMatrixApi.calledEndpoints.keys
.where((k) =>
k.startsWith('/client/r0/sendToDevice/m.room.encrypted'))
.length,
1);
await Future.delayed(Duration(milliseconds: 50));
expect(
FakeMatrixApi.calledEndpoints.keys
.where((k) =>
k.startsWith('/client/r0/sendToDevice/m.room.encrypted'))
.length,
2);
});
test('Test the fake store api', () async {
final database = await getDatabase(null);