From 6db019ae231bde795d8e44302ca60fb4ddf66572 Mon Sep 17 00:00:00 2001 From: Christian Pauly Date: Sun, 28 Aug 2022 08:40:05 +0200 Subject: [PATCH] feat: Implement new Matrix Dart SDK Database fix: Edit last event breaks db feat: Add native sqflite indexeddb database feat: Split up preload and nonpreload room state boxes --- .github/workflows/app.yml | 2 +- .gitlab-ci.yml | 2 +- lib/matrix.dart | 1 + lib/src/database/indexeddb_box.dart | 216 +++ lib/src/database/matrix_sdk_database.dart | 1610 +++++++++++++++++++++ lib/src/database/sqflite_box.dart | 343 +++++ pubspec.yaml | 2 + test/box_test.dart | 80 + test/database_api_test.dart | 6 +- test/fake_database.dart | 9 + 10 files changed, 2266 insertions(+), 5 deletions(-) create mode 100644 lib/src/database/indexeddb_box.dart create mode 100644 lib/src/database/matrix_sdk_database.dart create mode 100644 lib/src/database/sqflite_box.dart create mode 100644 test/box_test.dart diff --git a/.github/workflows/app.yml b/.github/workflows/app.yml index 95e7d19f..a73b9605 100644 --- a/.github/workflows/app.yml +++ b/.github/workflows/app.yml @@ -64,7 +64,7 @@ jobs: - uses: actions/checkout@v3 - name: Run tests run: | - apt-get update && apt-get install --no-install-recommends --no-install-suggests -y curl lcov python3 python3-distutils + apt-get update && apt-get install --no-install-recommends --no-install-suggests -y curl lcov python3 python3-distutils libsqlite3-dev curl -o /bin/lcov_cobertura.py https://raw.githubusercontent.com/eriwen/lcov-to-cobertura-xml/master/lcov_cobertura/lcov_cobertura.py && sed 's/env python/env python3/' -i /bin/lcov_cobertura.py && chmod +x /bin/lcov_cobertura.py dart pub get ./scripts/test.sh diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 03683437..65580ac5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -46,7 +46,7 @@ coverage_without_olm: variables: NO_OLM: 1 before_script: - - apt-get update && apt-get install --no-install-recommends --no-install-suggests -y curl lcov python3 python3-distutils + - apt-get update && apt-get install --no-install-recommends --no-install-suggests -y curl lcov python3 python3-distutils libsqlite3-dev - curl -o /bin/lcov_cobertura.py https://raw.githubusercontent.com/eriwen/lcov-to-cobertura-xml/master/lcov_cobertura/lcov_cobertura.py && sed 's/env python/env python3/' -i /bin/lcov_cobertura.py && chmod +x /bin/lcov_cobertura.py script: - dart pub get diff --git a/lib/matrix.dart b/lib/matrix.dart index 786894ff..ead7ac1c 100644 --- a/lib/matrix.dart +++ b/lib/matrix.dart @@ -24,6 +24,7 @@ export 'package:matrix_api_lite/matrix_api_lite.dart'; export 'src/client.dart'; export 'src/database/database_api.dart'; export 'src/database/hive_database.dart'; +export 'src/database/matrix_sdk_database.dart'; export 'src/database/hive_collections_database.dart'; export 'src/event.dart'; export 'src/presence.dart'; diff --git a/lib/src/database/indexeddb_box.dart b/lib/src/database/indexeddb_box.dart new file mode 100644 index 00000000..b67c1106 --- /dev/null +++ b/lib/src/database/indexeddb_box.dart @@ -0,0 +1,216 @@ +import 'dart:async'; +import 'dart:html'; +import 'dart:indexed_db'; + +/// Key-Value store abstraction over IndexedDB so that the sdk database can use +/// a single interface for all platforms. API is inspired by Hive. +class BoxCollection { + final Database _db; + final Set boxNames; + + BoxCollection(this._db, this.boxNames); + + static Future open( + String name, + Set boxNames, { + Object? sqfliteDatabase, + IdbFactory? idbFactory, + }) async { + idbFactory ??= window.indexedDB!; + final db = await idbFactory.open(name, version: 1, + onUpgradeNeeded: (VersionChangeEvent event) { + final db = event.target.result; + for (final name in boxNames) { + db.createObjectStore(name, autoIncrement: true); + } + }); + return BoxCollection(db, boxNames); + } + + Box openBox(String name) { + if (!boxNames.contains(name)) { + throw ('Box with name $name is not in the known box names of this collection.'); + } + return Box(name, this); + } + + List Function(Transaction txn)>? _txnCache; + + Future transaction( + Future Function() action, { + List? boxNames, + bool readOnly = false, + }) async { + boxNames ??= _db.objectStoreNames!.toList(); + _txnCache = []; + await action(); + final cache = List Function(Transaction txn)>.from(_txnCache!); + _txnCache = null; + if (cache.isEmpty) return; + final txn = _db.transaction(boxNames, readOnly ? 'readonly' : 'readwrite'); + for (final fun in cache) { + // The IDB methods return a Future in Dart but must not be awaited in + // order to have an actual transaction. They must only be performed and + // then the transaction object must call `txn.completed;` which then + // returns the actual future. + // https://developer.mozilla.org/en-US/docs/Web/API/IDBTransaction + unawaited(fun(txn)); + } + await txn.completed; + return; + } + + Future clear() async { + for (final name in boxNames) { + _db.deleteObjectStore(name); + } + } + + Future close() async { + assert(_txnCache == null, 'Database closed while in transaction!'); + return _db.close(); + } +} + +class Box { + final String name; + final BoxCollection boxCollection; + final Map _cache = {}; + + /// _cachedKeys is only used to make sure that if you fetch all keys from a + /// box, you do not need to have an expensive read operation twice. There is + /// no other usage for this at the moment. So the cache is never partial. + /// Once the keys are cached, they need to be updated when changed in put and + /// delete* so that the cache does not become outdated. + Set? _cachedKeys; + + bool get _keysCached => _cachedKeys != null; + + Box(this.name, this.boxCollection); + + Future> getAllKeys([Transaction? txn]) async { + if (_keysCached) return _cachedKeys!.toList(); + txn ??= boxCollection._db.transaction(name, 'readonly'); + final store = txn.objectStore(name); + final request = store.getAllKeys(null); + await request.onSuccess.first; + final keys = request.result.cast(); + _cachedKeys = keys.toSet(); + return keys; + } + + Future> getAllValues([Transaction? txn]) async { + txn ??= boxCollection._db.transaction(name, 'readonly'); + final store = txn.objectStore(name); + final map = {}; + final cursorStream = store.openCursor(autoAdvance: true); + await for (final cursor in cursorStream) { + map[cursor.key as String] = _fromValue(cursor.value) as V; + } + return map; + } + + Future get(String key, [Transaction? txn]) async { + if (_cache.containsKey(key)) return _cache[key]; + txn ??= boxCollection._db.transaction(name, 'readonly'); + final store = txn.objectStore(name); + _cache[key] = await store.getObject(key).then(_fromValue); + return _cache[key]; + } + + Future> getAll(List keys, [Transaction? txn]) async { + if (keys.every((key) => _cache.containsKey(key))) { + return keys.map((key) => _cache[key]).toList(); + } + txn ??= boxCollection._db.transaction(name, 'readonly'); + final store = txn.objectStore(name); + final list = await Future.wait( + keys.map((key) => store.getObject(key).then(_fromValue))); + for (var i = 0; i < keys.length; i++) { + _cache[keys[i]] = list[i]; + } + return list; + } + + Future put(String key, V val, [Transaction? txn]) async { + if (boxCollection._txnCache != null) { + boxCollection._txnCache!.add((txn) => put(key, val, txn)); + _cache[key] = val; + _cachedKeys?.add(key); + return; + } + + txn ??= boxCollection._db.transaction(name, 'readwrite'); + final store = txn.objectStore(name); + await store.put(val as Object, key); + _cache[key] = val; + _cachedKeys?.add(key); + return; + } + + Future delete(String key, [Transaction? txn]) async { + if (boxCollection._txnCache != null) { + boxCollection._txnCache!.add((txn) => delete(key, txn)); + _cache.remove(key); + _cachedKeys?.remove(key); + return; + } + + txn ??= boxCollection._db.transaction(name, 'readwrite'); + final store = txn.objectStore(name); + await store.delete(key); + _cache.remove(key); + _cachedKeys?.remove(key); + return; + } + + Future deleteAll(List keys, [Transaction? txn]) async { + if (boxCollection._txnCache != null) { + boxCollection._txnCache!.add((txn) => deleteAll(keys, txn)); + keys.forEach(_cache.remove); + _cachedKeys?.removeAll(keys); + return; + } + + txn ??= boxCollection._db.transaction(name, 'readwrite'); + final store = txn.objectStore(name); + for (final key in keys) { + await store.delete(key); + _cache.remove(key); + _cachedKeys?.remove(key); + } + return; + } + + Future clear([Transaction? txn]) async { + if (boxCollection._txnCache != null) { + boxCollection._txnCache!.add((txn) => clear(txn)); + _cache.clear(); + _cachedKeys = null; + return; + } + + txn ??= boxCollection._db.transaction(name, 'readwrite'); + final store = txn.objectStore(name); + await store.clear(); + _cache.clear(); + _cachedKeys = null; + return; + } + + V? _fromValue(Object? value) { + if (value == null) return null; + switch (V) { + case const (List): + return List.unmodifiable(value as List) as V; + case const (Map): + return Map.unmodifiable(value as Map) as V; + case const (int): + case const (double): + case const (bool): + case const (String): + default: + return value as V; + } + } +} diff --git a/lib/src/database/matrix_sdk_database.dart b/lib/src/database/matrix_sdk_database.dart new file mode 100644 index 00000000..67c5e83f --- /dev/null +++ b/lib/src/database/matrix_sdk_database.dart @@ -0,0 +1,1610 @@ +/* + * Famedly Matrix SDK + * Copyright (C) 2019, 2020, 2021 Famedly GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:math'; +import 'dart:typed_data'; + +import 'package:sqflite_common/sqflite.dart'; + +import 'package:matrix/encryption/utils/olm_session.dart'; +import 'package:matrix/encryption/utils/outbound_group_session.dart'; +import 'package:matrix/encryption/utils/ssss_cache.dart'; +import 'package:matrix/encryption/utils/stored_inbound_group_session.dart'; +import 'package:matrix/matrix.dart'; +import 'package:matrix/src/utils/queued_to_device_event.dart'; +import 'package:matrix/src/utils/run_benchmarked.dart'; + +import 'package:matrix/src/database/indexeddb_box.dart' + if (dart.library.io) 'package:matrix/src/database/sqflite_box.dart'; + +class MatrixSdkDatabase extends DatabaseApi { + static const int version = 6; + final String name; + late BoxCollection _collection; + late Box _clientBox; + late Box _accountDataBox; + late Box _roomsBox; + late Box _toDeviceQueueBox; + + /// Key is a tuple as TupleKey(roomId, type) where stateKey can be + /// an empty string. Must contain only states of type + /// client.importantRoomStates. + late Box _preloadRoomStateBox; + + /// Key is a tuple as TupleKey(roomId, type) where stateKey can be + /// an empty string. Must NOT contain states of a type from + /// client.importantRoomStates. + late Box _nonPreloadRoomStateBox; + + /// Key is a tuple as TupleKey(roomId, userId) + late Box _roomMembersBox; + + /// Key is a tuple as TupleKey(roomId, type) + late Box _roomAccountDataBox; + late Box _inboundGroupSessionsBox; + late Box _outboundGroupSessionsBox; + late Box _olmSessionsBox; + + /// Key is a tuple as TupleKey(userId, deviceId) + late Box _userDeviceKeysBox; + + /// Key is the user ID as a String + late Box _userDeviceKeysOutdatedBox; + + /// Key is a tuple as TupleKey(userId, publicKey) + late Box _userCrossSigningKeysBox; + late Box _ssssCacheBox; + late Box _presencesBox; + + /// Key is a tuple as Multikey(roomId, fragmentId) while the default + /// fragmentId is an empty String + late Box _timelineFragmentsBox; + + /// Key is a tuple as TupleKey(roomId, eventId) + late Box _eventsBox; + + /// Key is a tuple as TupleKey(userId, deviceId) + late Box _seenDeviceIdsBox; + + late Box _seenDeviceKeysBox; + @override + bool get supportsFileStoring => fileStoragePath != null; + @override + final int maxFileSize; + final Directory? fileStoragePath; + final Duration? deleteFilesAfterDuration; + + static const String _clientBoxName = 'box_client'; + + static const String _accountDataBoxName = 'box_account_data'; + + static const String _roomsBoxName = 'box_rooms'; + + static const String _toDeviceQueueBoxName = 'box_to_device_queue'; + + static const String _preloadRoomStateBoxName = 'box_preload_room_states'; + + static const String _nonPreloadRoomStateBoxName = + 'box_non_preload_room_states'; + + static const String _roomMembersBoxName = 'box_room_members'; + + static const String _roomAccountDataBoxName = 'box_room_account_data'; + + static const String _inboundGroupSessionsBoxName = + 'box_inbound_group_session'; + + static const String _outboundGroupSessionsBoxName = + 'box_outbound_group_session'; + + static const String _olmSessionsBoxName = 'box_olm_session'; + + static const String _userDeviceKeysBoxName = 'box_user_device_keys'; + + static const String _userDeviceKeysOutdatedBoxName = + 'box_user_device_keys_outdated'; + + static const String _userCrossSigningKeysBoxName = 'box_cross_signing_keys'; + + static const String _ssssCacheBoxName = 'box_ssss_cache'; + + static const String _presencesBoxName = 'box_presences'; + + static const String _timelineFragmentsBoxName = 'box_timeline_fragments'; + + static const String _eventsBoxName = 'box_events'; + + static const String _seenDeviceIdsBoxName = 'box_seen_device_ids'; + + static const String _seenDeviceKeysBoxName = 'box_seen_device_keys'; + + Database? database; + + /// Custom IdbFactory used to create the indexedDB. On IO platforms it would + /// lead to an error to import "dart:indexed_db" so this is dynamically + /// typed. + final dynamic idbFactory; + + MatrixSdkDatabase( + this.name, { + this.database, + this.idbFactory, + this.maxFileSize = 0, + this.fileStoragePath, + this.deleteFilesAfterDuration, + }); + + Future open() async { + _collection = await BoxCollection.open( + name, + { + _clientBoxName, + _accountDataBoxName, + _roomsBoxName, + _toDeviceQueueBoxName, + _preloadRoomStateBoxName, + _nonPreloadRoomStateBoxName, + _roomMembersBoxName, + _roomAccountDataBoxName, + _inboundGroupSessionsBoxName, + _outboundGroupSessionsBoxName, + _olmSessionsBoxName, + _userDeviceKeysBoxName, + _userDeviceKeysOutdatedBoxName, + _userCrossSigningKeysBoxName, + _ssssCacheBoxName, + _presencesBoxName, + _timelineFragmentsBoxName, + _eventsBoxName, + _seenDeviceIdsBoxName, + _seenDeviceKeysBoxName, + }, + sqfliteDatabase: database, + idbFactory: idbFactory, + ); + _clientBox = _collection.openBox( + _clientBoxName, + ); + _accountDataBox = _collection.openBox( + _accountDataBoxName, + ); + _roomsBox = _collection.openBox( + _roomsBoxName, + ); + _preloadRoomStateBox = _collection.openBox( + _preloadRoomStateBoxName, + ); + _nonPreloadRoomStateBox = _collection.openBox( + _nonPreloadRoomStateBoxName, + ); + _roomMembersBox = _collection.openBox( + _roomMembersBoxName, + ); + _toDeviceQueueBox = _collection.openBox( + _toDeviceQueueBoxName, + ); + _roomAccountDataBox = _collection.openBox( + _roomAccountDataBoxName, + ); + _inboundGroupSessionsBox = _collection.openBox( + _inboundGroupSessionsBoxName, + ); + _outboundGroupSessionsBox = _collection.openBox( + _outboundGroupSessionsBoxName, + ); + _olmSessionsBox = _collection.openBox( + _olmSessionsBoxName, + ); + _userDeviceKeysBox = _collection.openBox( + _userDeviceKeysBoxName, + ); + _userDeviceKeysOutdatedBox = _collection.openBox( + _userDeviceKeysOutdatedBoxName, + ); + _userCrossSigningKeysBox = _collection.openBox( + _userCrossSigningKeysBoxName, + ); + _ssssCacheBox = _collection.openBox( + _ssssCacheBoxName, + ); + _presencesBox = _collection.openBox( + _presencesBoxName, + ); + _timelineFragmentsBox = _collection.openBox( + _timelineFragmentsBoxName, + ); + _eventsBox = _collection.openBox( + _eventsBoxName, + ); + _seenDeviceIdsBox = _collection.openBox( + _seenDeviceIdsBoxName, + ); + _seenDeviceKeysBox = _collection.openBox( + _seenDeviceKeysBoxName, + ); + + // Check version and check if we need a migration + final currentVersion = int.tryParse(await _clientBox.get('version') ?? ''); + if (currentVersion == null) { + await _clientBox.put('version', version.toString()); + } else if (currentVersion != version) { + await _migrateFromVersion(currentVersion); + } + + return; + } + + Future _migrateFromVersion(int currentVersion) async { + Logs().i('Migrate store database from version $currentVersion to $version'); + await clearCache(); + await _clientBox.put('version', version.toString()); + } + + @override + Future clear() => _collection.clear(); + + @override + Future clearCache() => transaction(() async { + await _roomsBox.clear(); + await _accountDataBox.clear(); + await _preloadRoomStateBox.clear(); + await _nonPreloadRoomStateBox.clear(); + await _roomMembersBox.clear(); + await _eventsBox.clear(); + await _timelineFragmentsBox.clear(); + await _outboundGroupSessionsBox.clear(); + await _presencesBox.clear(); + await _clientBox.delete('prev_batch'); + }); + + @override + Future clearSSSSCache() => _ssssCacheBox.clear(); + + @override + Future close() async => _collection.close(); + + @override + Future deleteFromToDeviceQueue(int id) async { + await _toDeviceQueueBox.delete(id.toString()); + return; + } + + @override + Future deleteOldFiles(int savedAt) async { + final dir = fileStoragePath; + final deleteFilesAfterDuration = this.deleteFilesAfterDuration; + if (!supportsFileStoring || + dir == null || + deleteFilesAfterDuration == null) { + return; + } + final entities = await dir.list().toList(); + for (final file in entities) { + if (file is! File) continue; + final stat = await file.stat(); + if (DateTime.now().difference(stat.modified) > deleteFilesAfterDuration) { + Logs().v('Delete old file', file.path); + await file.delete(); + } + } + } + + @override + Future forgetRoom(String roomId) async { + await _timelineFragmentsBox.delete(TupleKey(roomId, '').toString()); + final eventsBoxKeys = await _eventsBox.getAllKeys(); + for (final key in eventsBoxKeys) { + final multiKey = TupleKey.fromString(key); + if (multiKey.parts.first != roomId) continue; + await _eventsBox.delete(key); + } + final preloadRoomStateBoxKeys = await _preloadRoomStateBox.getAllKeys(); + for (final key in preloadRoomStateBoxKeys) { + final multiKey = TupleKey.fromString(key); + if (multiKey.parts.first != roomId) continue; + await _preloadRoomStateBox.delete(key); + } + final nonPreloadRoomStateBoxKeys = + await _nonPreloadRoomStateBox.getAllKeys(); + for (final key in nonPreloadRoomStateBoxKeys) { + final multiKey = TupleKey.fromString(key); + if (multiKey.parts.first != roomId) continue; + await _nonPreloadRoomStateBox.delete(key); + } + final roomMembersBoxKeys = await _roomMembersBox.getAllKeys(); + for (final key in roomMembersBoxKeys) { + final multiKey = TupleKey.fromString(key); + if (multiKey.parts.first != roomId) continue; + await _roomMembersBox.delete(key); + } + final roomAccountDataBoxKeys = await _roomAccountDataBox.getAllKeys(); + for (final key in roomAccountDataBoxKeys) { + final multiKey = TupleKey.fromString(key); + if (multiKey.parts.first != roomId) continue; + await _roomAccountDataBox.delete(key); + } + await _roomsBox.delete(roomId); + } + + @override + Future> getAccountData() => + runBenchmarked>('Get all account data from store', + () async { + final accountData = {}; + final raws = await _accountDataBox.getAllValues(); + for (final entry in raws.entries) { + accountData[entry.key] = BasicEvent( + type: entry.key, + content: makeJson(entry.value), + ); + } + return accountData; + }); + + @override + Future?> getClient(String name) => + runBenchmarked('Get Client from store', () async { + final map = {}; + final keys = await _clientBox.getAllKeys(); + for (final key in keys) { + if (key == 'version') continue; + final value = await _clientBox.get(key); + if (value != null) map[key] = value; + } + if (map.isEmpty) return null; + return map; + }); + + @override + Future getEventById(String eventId, Room room) async { + final raw = await _eventsBox.get(TupleKey(room.id, eventId).toString()); + if (raw == null) return null; + return Event.fromJson(makeJson(raw), room); + } + + /// Loads a whole list of events at once from the store for a specific room + Future> _getEventsByIds(List eventIds, Room room) async { + final keys = eventIds + .map( + (eventId) => TupleKey(room.id, eventId).toString(), + ) + .toList(); + final rawEvents = await _eventsBox.getAll(keys); + return rawEvents + .whereType() + .map((rawEvent) => Event.fromJson(makeJson(rawEvent), room)) + .toList(); + } + + @override + Future> getEventList( + Room room, { + int start = 0, + bool onlySending = false, + int? limit, + }) => + runBenchmarked>('Get event list', () async { + // Get the synced event IDs from the store + final timelineKey = TupleKey(room.id, '').toString(); + final timelineEventIds = + (await _timelineFragmentsBox.get(timelineKey) ?? []); + + // Get the local stored SENDING events from the store + late final List sendingEventIds; + if (start != 0) { + sendingEventIds = []; + } else { + final sendingTimelineKey = TupleKey(room.id, 'SENDING').toString(); + sendingEventIds = + (await _timelineFragmentsBox.get(sendingTimelineKey) ?? []); + } + + // Combine those two lists while respecting the start and limit parameters. + final end = min(timelineEventIds.length, + start + (limit ?? timelineEventIds.length)); + final eventIds = sendingEventIds + + (start < timelineEventIds.length && !onlySending + ? timelineEventIds.getRange(start, end).toList() + : []); + + return await _getEventsByIds(eventIds.cast(), room); + }); + + @override + Future getFile(Uri mxcUri) async { + final fileStoragePath = this.fileStoragePath; + if (!supportsFileStoring || fileStoragePath == null) return null; + + final file = + File('${fileStoragePath.path}/${mxcUri.toString().split('/').last}'); + + if (await file.exists()) return await file.readAsBytes(); + return null; + } + + @override + Future getInboundGroupSession( + String roomId, + String sessionId, + ) async { + final raw = await _inboundGroupSessionsBox.get(sessionId); + if (raw == null) return null; + return StoredInboundGroupSession.fromJson(makeJson(raw)); + } + + @override + Future> + getInboundGroupSessionsToUpload() async { + final sessions = (await _inboundGroupSessionsBox.getAllValues()) + .values + .where((rawSession) => rawSession['uploaded'] == false) + .take(50) + .map( + (json) => StoredInboundGroupSession.fromJson( + makeJson(json), + ), + ) + .toList(); + return sessions; + } + + @override + Future> getLastSentMessageUserDeviceKey( + String userId, String deviceId) async { + final raw = + await _userDeviceKeysBox.get(TupleKey(userId, deviceId).toString()); + if (raw == null) return []; + return [raw['last_sent_message']]; + } + + @override + Future storeOlmSession(String identityKey, String sessionId, + String pickle, int lastReceived) async { + final rawSessions = + makeJson((await _olmSessionsBox.get(identityKey)) ?? {}, true); + rawSessions[sessionId] = { + 'identity_key': identityKey, + 'pickle': pickle, + 'session_id': sessionId, + 'last_received': lastReceived, + }; + await _olmSessionsBox.put(identityKey, rawSessions); + return; + } + + @override + Future> getOlmSessions( + String identityKey, String userId) async { + final rawSessions = await _olmSessionsBox.get(identityKey); + if (rawSessions == null || rawSessions.isEmpty) return []; + return rawSessions.values + .map((json) => OlmSession.fromJson(makeJson(json), userId)) + .toList(); + } + + @override + Future> getAllOlmSessions() => + _olmSessionsBox.getAllValues(); + + @override + Future> getOlmSessionsForDevices( + List identityKeys, String userId) async { + final sessions = await Future.wait( + identityKeys.map((identityKey) => getOlmSessions(identityKey, userId))); + return [for (final sublist in sessions) ...sublist]; + } + + @override + Future getOutboundGroupSession( + String roomId, String userId) async { + final raw = await _outboundGroupSessionsBox.get(roomId); + if (raw == null) return null; + return OutboundGroupSession.fromJson(makeJson(raw), userId); + } + + @override + Future getSingleRoom(Client client, String roomId, + {bool loadImportantStates = true}) async { + // Get raw room from database: + final roomData = await _roomsBox.get(roomId); + if (roomData == null) return null; + final room = Room.fromJson(makeJson(roomData), client); + + // Get important states: + if (loadImportantStates) { + final dbKeys = client.importantStateEvents + .map((state) => TupleKey(roomId, state).toString()) + .toList(); + final rawStates = await _preloadRoomStateBox.getAll(dbKeys); + for (final rawState in rawStates) { + if (rawState == null || rawState[''] == null) continue; + room.setState(Event.fromJson(makeJson(rawState['']), room)); + } + } + + return room; + } + + @override + Future> getRoomList(Client client) => + runBenchmarked>('Get room list from store', () async { + final rooms = {}; + + final rawRooms = await _roomsBox.getAllValues(); + + for (final raw in rawRooms.values) { + // Get the room + final room = Room.fromJson(makeJson(raw), client); + + // Add to the list and continue. + rooms[room.id] = room; + } + + final roomStatesDataRaws = await _preloadRoomStateBox.getAllValues(); + for (final entry in roomStatesDataRaws.entries) { + final keys = TupleKey.fromString(entry.key); + final roomId = keys.parts.first; + final room = rooms[roomId]; + if (room == null) { + Logs().w('Found event in store for unknown room', entry.value); + continue; + } + final states = entry.value; + final stateEvents = states.values + .map((raw) => Event.fromJson(makeJson(raw), room)) + .toList(); + for (final state in stateEvents) { + room.setState(state); + } + } + + // Get the room account data + final roomAccountDataRaws = await _roomAccountDataBox.getAllValues(); + for (final entry in roomAccountDataRaws.entries) { + final keys = TupleKey.fromString(entry.key); + final basicRoomEvent = BasicRoomEvent.fromJson( + makeJson(entry.value), + ); + final roomId = keys.parts.first; + if (rooms.containsKey(roomId)) { + rooms[roomId]!.roomAccountData[basicRoomEvent.type] = + basicRoomEvent; + } else { + Logs().w( + 'Found account data for unknown room $roomId. Delete now...'); + await _roomAccountDataBox + .delete(TupleKey(roomId, basicRoomEvent.type).toString()); + } + } + + return rooms.values.toList(); + }); + + @override + Future getSSSSCache(String type) async { + final raw = await _ssssCacheBox.get(type); + if (raw == null) return null; + return SSSSCache.fromJson(makeJson(raw)); + } + + @override + Future> getToDeviceEventQueue() async { + final raws = await _toDeviceQueueBox.getAllValues(); + final copiedRaws = raws.entries.map((entry) { + final copiedRaw = makeJson(entry.value, true); + copiedRaw['id'] = int.parse(entry.key); + copiedRaw['content'] = jsonDecode(copiedRaw['content'] as String); + return copiedRaw; + }).toList(); + return copiedRaws.map((raw) => QueuedToDeviceEvent.fromJson(raw)).toList(); + } + + @override + Future> getUnimportantRoomEventStatesForRoom( + List events, Room room) async { + final keys = (await _nonPreloadRoomStateBox.getAllKeys()).where((key) { + final tuple = TupleKey.fromString(key); + return tuple.parts.first == room.id && !events.contains(tuple.parts[1]); + }); + + final unimportantEvents = []; + for (final key in keys) { + final states = await _nonPreloadRoomStateBox.get(key); + if (states == null) continue; + unimportantEvents.addAll( + states.values.map((raw) => Event.fromJson(makeJson(raw), room))); + } + return unimportantEvents; + } + + @override + Future getUser(String userId, Room room) async { + final state = + await _roomMembersBox.get(TupleKey(room.id, userId).toString()); + if (state == null) return null; + return Event.fromJson(makeJson(state), room).asUser; + } + + @override + Future> getUserDeviceKeys(Client client) => + runBenchmarked>( + 'Get all user device keys from store', () async { + final deviceKeysOutdated = + await _userDeviceKeysOutdatedBox.getAllValues(); + if (deviceKeysOutdated.isEmpty) { + return {}; + } + final res = {}; + final userDeviceKeys = await _userDeviceKeysBox.getAllValues(); + final userCrossSigningKeys = + await _userCrossSigningKeysBox.getAllValues(); + for (final userId in deviceKeysOutdated.keys) { + final deviceKeysBoxKeys = userDeviceKeys.keys.where((tuple) { + final tupleKey = TupleKey.fromString(tuple); + return tupleKey.parts.first == userId; + }); + final crossSigningKeysBoxKeys = + userCrossSigningKeys.keys.where((tuple) { + final tupleKey = TupleKey.fromString(tuple); + return tupleKey.parts.first == userId; + }); + final childEntries = deviceKeysBoxKeys.map( + (key) { + final userDeviceKey = userDeviceKeys[key]; + if (userDeviceKey == null) return null; + return makeJson(userDeviceKey); + }, + ); + final crossSigningEntries = crossSigningKeysBoxKeys.map( + (key) { + final crossSigningKey = userCrossSigningKeys[key]; + if (crossSigningKey == null) return null; + return makeJson(crossSigningKey); + }, + ); + res[userId] = DeviceKeysList.fromDbJson( + { + 'client_id': client.id, + 'user_id': userId, + 'outdated': deviceKeysOutdated[userId], + }, + childEntries + .where((c) => c != null) + .toList() + .cast>(), + crossSigningEntries + .where((c) => c != null) + .toList() + .cast>(), + client); + } + return res; + }); + + @override + Future> getUsers(Room room) async { + final users = []; + final keys = (await _roomMembersBox.getAllKeys()) + .where((key) => TupleKey.fromString(key).parts.first == room.id) + .toList(); + final states = await _roomMembersBox.getAll(keys); + states.removeWhere((state) => state == null); + for (final state in states) { + users.add(Event.fromJson(makeJson(state!), room).asUser); + } + + return users; + } + + @override + Future insertClient( + String name, + String homeserverUrl, + String token, + String userId, + String? deviceId, + String? deviceName, + String? prevBatch, + String? olmAccount) async { + await transaction(() async { + await _clientBox.put('homeserver_url', homeserverUrl); + await _clientBox.put('token', token); + await _clientBox.put('user_id', userId); + if (deviceId == null) { + await _clientBox.delete('device_id'); + } else { + await _clientBox.put('device_id', deviceId); + } + if (deviceName == null) { + await _clientBox.delete('device_name'); + } else { + await _clientBox.put('device_name', deviceName); + } + if (prevBatch == null) { + await _clientBox.delete('prev_batch'); + } else { + await _clientBox.put('prev_batch', prevBatch); + } + if (olmAccount == null) { + await _clientBox.delete('olm_account'); + } else { + await _clientBox.put('olm_account', olmAccount); + } + await _clientBox.delete('sync_filter_id'); + }); + return 0; + } + + @override + Future insertIntoToDeviceQueue( + String type, String txnId, String content) async { + final id = DateTime.now().millisecondsSinceEpoch; + await _toDeviceQueueBox.put(id.toString(), { + 'type': type, + 'txn_id': txnId, + 'content': content, + }); + return id; + } + + @override + Future markInboundGroupSessionAsUploaded( + String roomId, String sessionId) async { + final raw = makeJson( + await _inboundGroupSessionsBox.get(sessionId) ?? {}, + true, + ); + if (raw.isEmpty) { + Logs().w( + 'Tried to mark inbound group session as uploaded which was not found in the database!'); + return; + } + raw['uploaded'] = true; + await _inboundGroupSessionsBox.put(sessionId, raw); + return; + } + + @override + Future markInboundGroupSessionsAsNeedingUpload() async { + final keys = await _inboundGroupSessionsBox.getAllKeys(); + for (final sessionId in keys) { + final raw = makeJson( + await _inboundGroupSessionsBox.get(sessionId) ?? {}, + true, + ); + if (raw.isEmpty) continue; + raw['uploaded'] = false; + await _inboundGroupSessionsBox.put(sessionId, raw); + } + return; + } + + @override + Future removeEvent(String eventId, String roomId) async { + await _eventsBox.delete(TupleKey(roomId, eventId).toString()); + final keys = await _timelineFragmentsBox.getAllKeys(); + for (final key in keys) { + final multiKey = TupleKey.fromString(key); + if (multiKey.parts.first != roomId) continue; + final eventIds = await _timelineFragmentsBox.get(key) ?? []; + final prevLength = eventIds.length; + eventIds.removeWhere((id) => id == eventId); + if (eventIds.length < prevLength) { + await _timelineFragmentsBox.put(key, eventIds); + } + } + return; + } + + @override + Future removeOutboundGroupSession(String roomId) async { + await _outboundGroupSessionsBox.delete(roomId); + return; + } + + @override + Future removeUserCrossSigningKey( + String userId, String publicKey) async { + await _userCrossSigningKeysBox + .delete(TupleKey(userId, publicKey).toString()); + return; + } + + @override + Future removeUserDeviceKey(String userId, String deviceId) async { + await _userDeviceKeysBox.delete(TupleKey(userId, deviceId).toString()); + return; + } + + @override + Future setBlockedUserCrossSigningKey( + bool blocked, String userId, String publicKey) async { + final raw = makeJson( + await _userCrossSigningKeysBox + .get(TupleKey(userId, publicKey).toString()) ?? + {}, + true, + ); + raw['blocked'] = blocked; + await _userCrossSigningKeysBox.put( + TupleKey(userId, publicKey).toString(), + raw, + ); + return; + } + + @override + Future setBlockedUserDeviceKey( + bool blocked, String userId, String deviceId) async { + final raw = makeJson( + await _userDeviceKeysBox.get(TupleKey(userId, deviceId).toString()) ?? + {}, + true); + raw['blocked'] = blocked; + await _userDeviceKeysBox.put( + TupleKey(userId, deviceId).toString(), + raw, + ); + return; + } + + @override + Future setLastActiveUserDeviceKey( + int lastActive, String userId, String deviceId) async { + final raw = makeJson( + await _userDeviceKeysBox.get(TupleKey(userId, deviceId).toString()) ?? + {}, + true); + + raw['last_active'] = lastActive; + await _userDeviceKeysBox.put( + TupleKey(userId, deviceId).toString(), + raw, + ); + } + + @override + Future setLastSentMessageUserDeviceKey( + String lastSentMessage, String userId, String deviceId) async { + final raw = makeJson( + await _userDeviceKeysBox.get(TupleKey(userId, deviceId).toString()) ?? {}, + true, + ); + raw['last_sent_message'] = lastSentMessage; + await _userDeviceKeysBox.put( + TupleKey(userId, deviceId).toString(), + raw, + ); + } + + @override + Future setRoomPrevBatch( + String? prevBatch, String roomId, Client client) async { + final raw = await _roomsBox.get(roomId); + if (raw == null) return; + final room = Room.fromJson(makeJson(raw), client); + room.prev_batch = prevBatch; + await _roomsBox.put(roomId, room.toJson()); + return; + } + + @override + Future setVerifiedUserCrossSigningKey( + bool verified, String userId, String publicKey) async { + final raw = makeJson( + (await _userCrossSigningKeysBox + .get(TupleKey(userId, publicKey).toString())) ?? + {}, + true, + ); + raw['verified'] = verified; + await _userCrossSigningKeysBox.put( + TupleKey(userId, publicKey).toString(), + raw, + ); + return; + } + + @override + Future setVerifiedUserDeviceKey( + bool verified, String userId, String deviceId) async { + final raw = makeJson( + await _userDeviceKeysBox.get(TupleKey(userId, deviceId).toString()) ?? {}, + true, + ); + raw['verified'] = verified; + await _userDeviceKeysBox.put( + TupleKey(userId, deviceId).toString(), + raw, + ); + return; + } + + @override + Future storeAccountData(String type, String content) async { + await _accountDataBox.put(type, makeJson(jsonDecode(content))); + return; + } + + @override + Future storeEventUpdate(EventUpdate eventUpdate, Client client) async { + // Ephemerals should not be stored + if (eventUpdate.type == EventUpdateType.ephemeral) return; + final tmpRoom = client.getRoomById(eventUpdate.roomID) ?? + Room(id: eventUpdate.roomID, client: client); + + // In case of this is a redaction event + if (eventUpdate.content['type'] == EventTypes.Redaction) { + final eventId = eventUpdate.content.tryGet('redacts'); + final event = + eventId != null ? await getEventById(eventId, tmpRoom) : null; + if (event != null) { + event.setRedactionEvent(Event.fromJson(eventUpdate.content, tmpRoom)); + await _eventsBox.put( + TupleKey(eventUpdate.roomID, event.eventId).toString(), + event.toJson()); + + if (tmpRoom.lastEvent?.eventId == event.eventId) { + if (client.importantStateEvents.contains(event.type)) { + await _preloadRoomStateBox.put( + TupleKey(eventUpdate.roomID, event.type).toString(), + {'': event.toJson()}, + ); + } else { + await _nonPreloadRoomStateBox.put( + TupleKey(eventUpdate.roomID, event.type).toString(), + {'': event.toJson()}, + ); + } + } + } + } + + // Store a common message event + if ({EventUpdateType.timeline, EventUpdateType.history} + .contains(eventUpdate.type)) { + final eventId = eventUpdate.content['event_id']; + // Is this ID already in the store? + final prevEvent = await _eventsBox + .get(TupleKey(eventUpdate.roomID, eventId).toString()); + final prevStatus = prevEvent == null + ? null + : () { + final json = makeJson(prevEvent); + final statusInt = json.tryGet('status') ?? + json + .tryGetMap('unsigned') + ?.tryGet(messageSendingStatusKey); + return statusInt == null ? null : eventStatusFromInt(statusInt); + }(); + + // calculate the status + final newStatus = eventStatusFromInt( + eventUpdate.content.tryGet('status') ?? + eventUpdate.content + .tryGetMap('unsigned') + ?.tryGet(messageSendingStatusKey) ?? + EventStatus.synced.intValue, + ); + + // Is this the response to a sending event which is already synced? Then + // there is nothing to do here. + if (!newStatus.isSynced && prevStatus != null && prevStatus.isSynced) { + return; + } + + final status = newStatus.isError || prevStatus == null + ? newStatus + : latestEventStatus( + prevStatus, + newStatus, + ); + + // Add the status and the sort order to the content so it get stored + eventUpdate.content['unsigned'] ??= {}; + eventUpdate.content['unsigned'][messageSendingStatusKey] = + eventUpdate.content['status'] = status.intValue; + + // In case this event has sent from this account we have a transaction ID + final transactionId = eventUpdate.content + .tryGetMap('unsigned') + ?.tryGet('transaction_id'); + await _eventsBox.put(TupleKey(eventUpdate.roomID, eventId).toString(), + eventUpdate.content); + + // Update timeline fragments + final key = TupleKey(eventUpdate.roomID, status.isSent ? '' : 'SENDING') + .toString(); + + final eventIds = + List.from(await _timelineFragmentsBox.get(key) ?? []); + + if (!eventIds.contains(eventId)) { + if (eventUpdate.type == EventUpdateType.history) { + eventIds.add(eventId); + } else { + eventIds.insert(0, eventId); + } + await _timelineFragmentsBox.put(key, eventIds); + } else if (status.isSynced && + prevStatus != null && + prevStatus.isSent && + eventUpdate.type != EventUpdateType.history) { + // Status changes from 1 -> 2? Make sure event is correctly sorted. + eventIds.remove(eventId); + eventIds.insert(0, eventId); + } + + // If event comes from server timeline, remove sending events with this ID + if (status.isSent) { + final key = TupleKey(eventUpdate.roomID, 'SENDING').toString(); + final eventIds = + List.from(await _timelineFragmentsBox.get(key) ?? []); + final i = eventIds.indexWhere((id) => id == eventId); + if (i != -1) { + await _timelineFragmentsBox.put(key, eventIds..removeAt(i)); + } + } + + // Is there a transaction id? Then delete the event with this id. + if (!status.isError && !status.isSending && transactionId != null) { + await removeEvent(transactionId, eventUpdate.roomID); + } + } + + final stateKey = + client.roomPreviewLastEvents.contains(eventUpdate.content['type']) + ? '' + : eventUpdate.content['state_key']; + // Store a common state event + if ({ + EventUpdateType.timeline, + EventUpdateType.state, + EventUpdateType.inviteState + }.contains(eventUpdate.type) && + stateKey != null) { + if (eventUpdate.content['type'] == EventTypes.RoomMember) { + await _roomMembersBox.put( + TupleKey( + eventUpdate.roomID, + eventUpdate.content['state_key'], + ).toString(), + eventUpdate.content); + } else { + final type = eventUpdate.content['type'] as String; + final roomStateBox = client.importantStateEvents.contains(type) + ? _preloadRoomStateBox + : _nonPreloadRoomStateBox; + final key = TupleKey( + eventUpdate.roomID, + type, + ).toString(); + final stateMap = makeJson(await roomStateBox.get(key) ?? {}, true); + // store state events and new messages, that either are not an edit or an edit of the lastest message + // An edit is an event, that has an edit relation to the latest event. In some cases for the second edit, we need to compare if both have an edit relation to the same event instead. + if (eventUpdate.content + .tryGetMap('content') + ?.tryGetMap('m.relates_to') == + null) { + stateMap[stateKey] = eventUpdate.content; + await roomStateBox.put(key, stateMap); + } else { + final editedEventRelationshipEventId = eventUpdate.content + .tryGetMap('content') + ?.tryGetMap('m.relates_to') + ?.tryGet('event_id'); + + final tmpRoom = client.getRoomById(eventUpdate.roomID) ?? + Room(id: eventUpdate.roomID, client: client); + + if (eventUpdate.content['type'] != + EventTypes + .Message || // send anything other than a message + eventUpdate.content + .tryGetMap('content') + ?.tryGetMap('m.relates_to') + ?.tryGet('rel_type') != + RelationshipTypes + .edit || // replies are always latest anyway + editedEventRelationshipEventId == + tmpRoom.lastEvent + ?.eventId || // edit of latest (original event) event + (tmpRoom.lastEvent?.relationshipType == + RelationshipTypes.edit && + editedEventRelationshipEventId == + tmpRoom.lastEvent + ?.relationshipEventId) // edit of latest (edited event) event + ) { + stateMap[stateKey] = eventUpdate.content; + await roomStateBox.put(key, stateMap); + } + } + } + } + + // Store a room account data event + if (eventUpdate.type == EventUpdateType.accountData) { + await _roomAccountDataBox.put( + TupleKey( + eventUpdate.roomID, + eventUpdate.content['type'], + ).toString(), + eventUpdate.content, + ); + } + } + + @override + Future storeFile(Uri mxcUri, Uint8List bytes, int time) async { + final fileStoragePath = this.fileStoragePath; + if (!supportsFileStoring || fileStoragePath == null) return; + + final file = + File('${fileStoragePath.path}/${mxcUri.toString().split('/').last}'); + + if (await file.exists()) return; + await file.writeAsBytes(bytes); + } + + @override + Future storeInboundGroupSession( + String roomId, + String sessionId, + String pickle, + String content, + String indexes, + String allowedAtIndex, + String senderKey, + String senderClaimedKey) async { + await _inboundGroupSessionsBox.put( + sessionId, + StoredInboundGroupSession( + roomId: roomId, + sessionId: sessionId, + pickle: pickle, + content: content, + indexes: indexes, + allowedAtIndex: allowedAtIndex, + senderKey: senderKey, + senderClaimedKeys: senderClaimedKey, + uploaded: false, + ).toJson()); + return; + } + + @override + Future storeOutboundGroupSession( + String roomId, String pickle, String deviceIds, int creationTime) async { + await _outboundGroupSessionsBox.put(roomId, { + 'room_id': roomId, + 'pickle': pickle, + 'device_ids': deviceIds, + 'creation_time': creationTime, + }); + return; + } + + @override + Future storePrevBatch( + String prevBatch, + ) async { + if ((await _clientBox.getAllKeys()).isEmpty) return; + await _clientBox.put('prev_batch', prevBatch); + return; + } + + @override + Future storeRoomUpdate( + String roomId, SyncRoomUpdate roomUpdate, Client client) async { + // Leave room if membership is leave + if (roomUpdate is LeftRoomUpdate) { + await forgetRoom(roomId); + return; + } + final membership = roomUpdate is LeftRoomUpdate + ? Membership.leave + : roomUpdate is InvitedRoomUpdate + ? Membership.invite + : Membership.join; + // Make sure room exists + final currentRawRoom = await _roomsBox.get(roomId); + if (currentRawRoom == null) { + await _roomsBox.put( + roomId, + roomUpdate is JoinedRoomUpdate + ? Room( + client: client, + id: roomId, + membership: membership, + highlightCount: + roomUpdate.unreadNotifications?.highlightCount?.toInt() ?? + 0, + notificationCount: roomUpdate + .unreadNotifications?.notificationCount + ?.toInt() ?? + 0, + prev_batch: roomUpdate.timeline?.prevBatch, + summary: roomUpdate.summary, + ).toJson() + : Room( + client: client, + id: roomId, + membership: membership, + ).toJson()); + } else if (roomUpdate is JoinedRoomUpdate) { + final currentRoom = Room.fromJson(makeJson(currentRawRoom), client); + await _roomsBox.put( + roomId, + Room( + client: client, + id: roomId, + membership: membership, + highlightCount: + roomUpdate.unreadNotifications?.highlightCount?.toInt() ?? + currentRoom.highlightCount, + notificationCount: + roomUpdate.unreadNotifications?.notificationCount?.toInt() ?? + currentRoom.notificationCount, + prev_batch: + roomUpdate.timeline?.prevBatch ?? currentRoom.prev_batch, + summary: RoomSummary.fromJson(currentRoom.summary.toJson() + ..addAll(roomUpdate.summary?.toJson() ?? {})), + ).toJson()); + } + + // Is the timeline limited? Then all previous messages should be + // removed from the database! + if (roomUpdate is JoinedRoomUpdate && + roomUpdate.timeline?.limited == true) { + await _timelineFragmentsBox.delete(TupleKey(roomId, '').toString()); + } + } + + @override + Future storeSSSSCache( + String type, String keyId, String ciphertext, String content) async { + await _ssssCacheBox.put( + type, + SSSSCache( + type: type, + keyId: keyId, + ciphertext: ciphertext, + content: content, + ).toJson()); + } + + @override + Future storeSyncFilterId( + String syncFilterId, + ) async { + await _clientBox.put('sync_filter_id', syncFilterId); + } + + @override + Future storeUserCrossSigningKey(String userId, String publicKey, + String content, bool verified, bool blocked) async { + await _userCrossSigningKeysBox.put( + TupleKey(userId, publicKey).toString(), + { + 'user_id': userId, + 'public_key': publicKey, + 'content': content, + 'verified': verified, + 'blocked': blocked, + }, + ); + } + + @override + Future storeUserDeviceKey(String userId, String deviceId, + String content, bool verified, bool blocked, int lastActive) async { + await _userDeviceKeysBox.put(TupleKey(userId, deviceId).toString(), { + 'user_id': userId, + 'device_id': deviceId, + 'content': content, + 'verified': verified, + 'blocked': blocked, + 'last_active': lastActive, + 'last_sent_message': '', + }); + return; + } + + @override + Future storeUserDeviceKeysInfo(String userId, bool outdated) async { + await _userDeviceKeysOutdatedBox.put(userId, outdated); + return; + } + + @override + Future transaction(Future Function() action) => + _collection.transaction(action); + + @override + Future updateClient( + String homeserverUrl, + String token, + String userId, + String? deviceId, + String? deviceName, + String? prevBatch, + String? olmAccount, + ) async { + await transaction(() async { + await _clientBox.put('homeserver_url', homeserverUrl); + await _clientBox.put('token', token); + await _clientBox.put('user_id', userId); + if (deviceId == null) { + await _clientBox.delete('device_id'); + } else { + await _clientBox.put('device_id', deviceId); + } + if (deviceName == null) { + await _clientBox.delete('device_name'); + } else { + await _clientBox.put('device_name', deviceName); + } + if (prevBatch == null) { + await _clientBox.delete('prev_batch'); + } else { + await _clientBox.put('prev_batch', prevBatch); + } + if (olmAccount == null) { + await _clientBox.delete('olm_account'); + } else { + await _clientBox.put('olm_account', olmAccount); + } + }); + return; + } + + @override + Future updateClientKeys( + String olmAccount, + ) async { + await _clientBox.put('olm_account', olmAccount); + return; + } + + @override + Future updateInboundGroupSessionAllowedAtIndex( + String allowedAtIndex, String roomId, String sessionId) async { + final raw = await _inboundGroupSessionsBox.get(sessionId); + if (raw == null) { + Logs().w( + 'Tried to update inbound group session as uploaded which wasnt found in the database!'); + return; + } + raw['allowed_at_index'] = allowedAtIndex; + await _inboundGroupSessionsBox.put(sessionId, raw); + return; + } + + @override + Future updateInboundGroupSessionIndexes( + String indexes, String roomId, String sessionId) async { + final raw = await _inboundGroupSessionsBox.get(sessionId); + if (raw == null) { + Logs().w( + 'Tried to update inbound group session indexes of a session which was not found in the database!'); + return; + } + final json = makeJson(raw, true); + json['indexes'] = indexes; + await _inboundGroupSessionsBox.put(sessionId, json); + return; + } + + @override + Future> getAllInboundGroupSessions() async { + final rawSessions = await _inboundGroupSessionsBox.getAllValues(); + return rawSessions.values + .map((raw) => StoredInboundGroupSession.fromJson(makeJson(raw))) + .toList(); + } + + @override + Future addSeenDeviceId( + String userId, + String deviceId, + String publicKeys, + ) => + _seenDeviceIdsBox.put(TupleKey(userId, deviceId).toString(), publicKeys); + + @override + Future addSeenPublicKey( + String publicKey, + String deviceId, + ) => + _seenDeviceKeysBox.put(publicKey, deviceId); + + @override + Future deviceIdSeen(userId, deviceId) async { + final raw = + await _seenDeviceIdsBox.get(TupleKey(userId, deviceId).toString()); + if (raw == null) return null; + return raw; + } + + @override + Future publicKeySeen(String publicKey) async { + final raw = await _seenDeviceKeysBox.get(publicKey); + if (raw == null) return null; + return raw; + } + + @override + Future exportDump() async { + final dataMap = { + _clientBoxName: await _clientBox.getAllValues(), + _accountDataBoxName: await _accountDataBox.getAllValues(), + _roomsBoxName: await _roomsBox.getAllValues(), + _preloadRoomStateBoxName: await _preloadRoomStateBox.getAllValues(), + _nonPreloadRoomStateBoxName: await _nonPreloadRoomStateBox.getAllValues(), + _roomMembersBoxName: await _roomMembersBox.getAllValues(), + _toDeviceQueueBoxName: await _toDeviceQueueBox.getAllValues(), + _roomAccountDataBoxName: await _roomAccountDataBox.getAllValues(), + _inboundGroupSessionsBoxName: + await _inboundGroupSessionsBox.getAllValues(), + _outboundGroupSessionsBoxName: + await _outboundGroupSessionsBox.getAllValues(), + _olmSessionsBoxName: await _olmSessionsBox.getAllValues(), + _userDeviceKeysBoxName: await _userDeviceKeysBox.getAllValues(), + _userDeviceKeysOutdatedBoxName: + await _userDeviceKeysOutdatedBox.getAllValues(), + _userCrossSigningKeysBoxName: + await _userCrossSigningKeysBox.getAllValues(), + _ssssCacheBoxName: await _ssssCacheBox.getAllValues(), + _presencesBoxName: await _presencesBox.getAllValues(), + _timelineFragmentsBoxName: await _timelineFragmentsBox.getAllValues(), + _eventsBoxName: await _eventsBox.getAllValues(), + _seenDeviceIdsBoxName: await _seenDeviceIdsBox.getAllValues(), + _seenDeviceKeysBoxName: await _seenDeviceKeysBox.getAllValues(), + }; + final json = jsonEncode(dataMap); + await clear(); + return json; + } + + @override + Future importDump(String export) async { + try { + await clear(); + await open(); + final json = Map.from(jsonDecode(export)).cast(); + for (final key in json[_clientBoxName]!.keys) { + await _clientBox.put(key, json[_clientBoxName]![key]); + } + for (final key in json[_accountDataBoxName]!.keys) { + await _accountDataBox.put(key, json[_accountDataBoxName]![key]); + } + for (final key in json[_roomsBoxName]!.keys) { + await _roomsBox.put(key, json[_roomsBoxName]![key]); + } + for (final key in json[_preloadRoomStateBoxName]!.keys) { + await _preloadRoomStateBox.put( + key, json[_preloadRoomStateBoxName]![key]); + } + for (final key in json[_nonPreloadRoomStateBoxName]!.keys) { + await _nonPreloadRoomStateBox.put( + key, json[_nonPreloadRoomStateBoxName]![key]); + } + for (final key in json[_roomMembersBoxName]!.keys) { + await _roomMembersBox.put(key, json[_roomMembersBoxName]![key]); + } + for (final key in json[_toDeviceQueueBoxName]!.keys) { + await _toDeviceQueueBox.put(key, json[_toDeviceQueueBoxName]![key]); + } + for (final key in json[_roomAccountDataBoxName]!.keys) { + await _roomAccountDataBox.put(key, json[_roomAccountDataBoxName]![key]); + } + for (final key in json[_inboundGroupSessionsBoxName]!.keys) { + await _inboundGroupSessionsBox.put( + key, json[_inboundGroupSessionsBoxName]![key]); + } + for (final key in json[_outboundGroupSessionsBoxName]!.keys) { + await _outboundGroupSessionsBox.put( + key, json[_outboundGroupSessionsBoxName]![key]); + } + for (final key in json[_olmSessionsBoxName]!.keys) { + await _olmSessionsBox.put(key, json[_olmSessionsBoxName]![key]); + } + for (final key in json[_userDeviceKeysBoxName]!.keys) { + await _userDeviceKeysBox.put(key, json[_userDeviceKeysBoxName]![key]); + } + for (final key in json[_userDeviceKeysOutdatedBoxName]!.keys) { + await _userDeviceKeysOutdatedBox.put( + key, json[_userDeviceKeysOutdatedBoxName]![key]); + } + for (final key in json[_userCrossSigningKeysBoxName]!.keys) { + await _userCrossSigningKeysBox.put( + key, json[_userCrossSigningKeysBoxName]![key]); + } + for (final key in json[_ssssCacheBoxName]!.keys) { + await _ssssCacheBox.put(key, json[_ssssCacheBoxName]![key]); + } + for (final key in json[_presencesBoxName]!.keys) { + await _presencesBox.put(key, json[_presencesBoxName]![key]); + } + for (final key in json[_timelineFragmentsBoxName]!.keys) { + await _timelineFragmentsBox.put( + key, json[_timelineFragmentsBoxName]![key]); + } + for (final key in json[_seenDeviceIdsBoxName]!.keys) { + await _seenDeviceIdsBox.put(key, json[_seenDeviceIdsBoxName]![key]); + } + for (final key in json[_seenDeviceKeysBoxName]!.keys) { + await _seenDeviceKeysBox.put(key, json[_seenDeviceKeysBoxName]![key]); + } + return true; + } catch (e, s) { + Logs().e('Database import error: ', e, s); + return false; + } + } + + @override + Future> getEventIdList( + Room room, { + int start = 0, + bool includeSending = false, + int? limit, + }) => + runBenchmarked>('Get event id list', () async { + // Get the synced event IDs from the store + final timelineKey = TupleKey(room.id, '').toString(); + final timelineEventIds = List.from( + (await _timelineFragmentsBox.get(timelineKey)) ?? []); + + // Get the local stored SENDING events from the store + late final List sendingEventIds; + if (!includeSending) { + sendingEventIds = []; + } else { + final sendingTimelineKey = TupleKey(room.id, 'SENDING').toString(); + sendingEventIds = List.from( + (await _timelineFragmentsBox.get(sendingTimelineKey)) ?? []); + } + + // Combine those two lists while respecting the start and limit parameters. + final eventIds = sendingEventIds + timelineEventIds; + if (limit != null && eventIds.length > limit) { + eventIds.removeRange(limit, eventIds.length); + } + + return eventIds; + }); + + @override + Future storePresence(String userId, CachedPresence presence) => + _presencesBox.put(userId, presence.toJson()); + + @override + Future getPresence(String userId) async { + final rawPresence = await _presencesBox.get(userId); + if (rawPresence == null) return null; + + return CachedPresence.fromJson(makeJson(rawPresence)); + } +} + +Map makeJson(Map json, [bool copy = false]) => + copy ? copyMap(json) : json.cast(); diff --git a/lib/src/database/sqflite_box.dart b/lib/src/database/sqflite_box.dart new file mode 100644 index 00000000..21053a38 --- /dev/null +++ b/lib/src/database/sqflite_box.dart @@ -0,0 +1,343 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:sqflite_common/sqlite_api.dart'; + +/// Key-Value store abstraction over Sqflite so that the sdk database can use +/// a single interface for all platforms. API is inspired by Hive. +class BoxCollection { + final Database _db; + final Set boxNames; + + BoxCollection(this._db, this.boxNames); + + static Future open( + String name, + Set boxNames, { + Object? sqfliteDatabase, + dynamic idbFactory, + }) async { + if (sqfliteDatabase is! Database) { + throw ('You must provide a Database `sqfliteDatabase` for FluffyBox on native.'); + } + final batch = sqfliteDatabase.batch(); + for (final name in boxNames) { + batch.execute( + 'CREATE TABLE IF NOT EXISTS $name (k TEXT PRIMARY KEY NOT NULL, v TEXT)', + ); + batch.execute('CREATE INDEX IF NOT EXISTS k_index ON $name (k)'); + } + await batch.commit(noResult: true); + return BoxCollection(sqfliteDatabase, boxNames); + } + + Box openBox(String name) { + if (!boxNames.contains(name)) { + throw ('Box with name $name is not in the known box names of this collection.'); + } + return Box(name, this); + } + + Batch? _activeBatch; + + Completer? _transactionLock; + final _transactionZones = {}; + + Future transaction( + Future Function() action, { + List? boxNames, + bool readOnly = false, + }) async { + // we want transactions to lock, however NOT if transactoins are run inside of each other. + // to be able to do this, we use dart zones (https://dart.dev/articles/archive/zones). + // _transactionZones holds a set of all zones which are currently running a transaction. + // _transactionLock holds the lock. + + // first we try to determine if we are inside of a transaction currently + var isInTransaction = false; + Zone? zone = Zone.current; + // for that we keep on iterating to the parent zone until there is either no zone anymore + // or we have found a zone inside of _transactionZones. + while (zone != null) { + if (_transactionZones.contains(zone)) { + isInTransaction = true; + break; + } + zone = zone.parent; + } + // if we are inside a transaction....just run the action + if (isInTransaction) { + return await action(); + } + // if we are *not* in a transaction, time to wait for the lock! + while (_transactionLock != null) { + await _transactionLock!.future; + } + // claim the lock + final lock = Completer(); + _transactionLock = lock; + try { + // run the action inside of a new zone + return await runZoned(() async { + try { + // don't forget to add the new zone to _transactionZones! + _transactionZones.add(Zone.current); + + final batch = _db.batch(); + _activeBatch = batch; + await action(); + _activeBatch = null; + await batch.commit(noResult: true); + return; + } finally { + // aaaand remove the zone from _transactionZones again + _transactionZones.remove(Zone.current); + } + }); + } finally { + // aaaand finally release the lock + _transactionLock = null; + lock.complete(); + } + } + + Future clear() => transaction( + () async { + for (final name in boxNames) { + await _db.delete(name); + } + }, + ); + + Future close() => _db.close(); +} + +class Box { + final String name; + final BoxCollection boxCollection; + final Map _cache = {}; + + /// _cachedKeys is only used to make sure that if you fetch all keys from a + /// box, you do not need to have an expensive read operation twice. There is + /// no other usage for this at the moment. So the cache is never partial. + /// Once the keys are cached, they need to be updated when changed in put and + /// delete* so that the cache does not become outdated. + Set? _cachedKeys; + bool get _keysCached => _cachedKeys != null; + + static const Set allowedValueTypes = { + List, + Map, + String, + int, + double, + bool, + }; + + Box(this.name, this.boxCollection) { + if (!allowedValueTypes.any((type) => V == type)) { + throw Exception( + 'Illegal value type for Box: "${V.toString()}". Must be one of $allowedValueTypes', + ); + } + } + + String? _toString(V? value) { + if (value == null) return null; + switch (V) { + case const (List): + case const (Map): + return jsonEncode(value); + case const (String): + case const (int): + case const (double): + case const (bool): + default: + return value.toString(); + } + } + + V? _fromString(Object? value) { + if (value == null) return null; + if (value is! String) { + throw Exception( + 'Wrong database type! Expected String but got one of type ${value.runtimeType}'); + } + switch (V) { + case const (int): + return int.parse(value) as V; + case const (double): + return double.parse(value) as V; + case const (bool): + return (value == 'true') as V; + case const (List): + return List.unmodifiable(jsonDecode(value)) as V; + case const (Map): + return Map.unmodifiable(jsonDecode(value)) as V; + case const (String): + default: + return value as V; + } + } + + Future> getAllKeys([Transaction? txn]) async { + if (_keysCached) return _cachedKeys!.toList(); + + final executor = txn ?? boxCollection._db; + + final result = await executor.query(name, columns: ['k']); + final keys = result.map((row) => row['k'] as String).toList(); + + _cachedKeys = keys.toSet(); + return keys; + } + + Future> getAllValues([Transaction? txn]) async { + final executor = txn ?? boxCollection._db; + + final result = await executor.query(name); + return Map.fromEntries( + result.map( + (row) => MapEntry( + row['k'] as String, + _fromString(row['v']) as V, + ), + ), + ); + } + + Future get(String key, [Transaction? txn]) async { + if (_cache.containsKey(key)) return _cache[key]; + + final executor = txn ?? boxCollection._db; + + final result = await executor.query( + name, + columns: ['v'], + where: 'k = ?', + whereArgs: [key], + ); + + final value = result.isEmpty ? null : _fromString(result.single['v']); + _cache[key] = value; + return value; + } + + Future> getAll(List keys, [Transaction? txn]) async { + if (!keys.any((key) => !_cache.containsKey(key))) { + return keys.map((key) => _cache[key]).toList(); + } + + // The SQL operation might fail with more than 1000 keys. We define some + // buffer here and half the amount of keys recursively for this situation. + const getAllMax = 800; + if (keys.length > getAllMax) { + final half = keys.length ~/ 2; + return [ + ...(await getAll(keys.sublist(0, half))), + ...(await getAll(keys.sublist(half))), + ]; + } + + final executor = txn ?? boxCollection._db; + + final list = []; + + final result = await executor.query( + name, + where: 'k IN (${keys.map((_) => '?').join(',')})', + whereArgs: keys, + ); + final resultMap = Map.fromEntries( + result.map((row) => MapEntry(row['k'] as String, _fromString(row['v']))), + ); + + // We want to make sure that they values are returnd in the exact same + // order than the given keys. That's why we do this instead of just return + // `resultMap.values`. + list.addAll(keys.map((key) => resultMap[key])); + + _cache.addAll(resultMap); + + return list; + } + + Future put(String key, V val) async { + final txn = boxCollection._activeBatch; + + final params = { + 'k': key, + 'v': _toString(val), + }; + if (txn == null) { + await boxCollection._db.insert( + name, + params, + conflictAlgorithm: ConflictAlgorithm.replace, + ); + } else { + txn.insert( + name, + params, + conflictAlgorithm: ConflictAlgorithm.replace, + ); + } + + _cache[key] = val; + _cachedKeys?.add(key); + return; + } + + Future delete(String key, [Batch? txn]) async { + txn ??= boxCollection._activeBatch; + + if (txn == null) { + await boxCollection._db.delete(name, where: 'k = ?', whereArgs: [key]); + } else { + txn.delete(name, where: 'k = ?', whereArgs: [key]); + } + + _cache.remove(key); + _cachedKeys?.remove(key); + return; + } + + Future deleteAll(List keys, [Batch? txn]) async { + txn ??= boxCollection._activeBatch; + + final placeholder = keys.map((_) => '?').join(','); + if (txn == null) { + await boxCollection._db.delete( + name, + where: 'k IN ($placeholder)', + whereArgs: keys, + ); + } else { + txn.delete( + name, + where: 'k IN ($placeholder)', + whereArgs: keys, + ); + } + + for (final key in keys) { + _cache.remove(key); + _cachedKeys?.removeAll(keys); + } + return; + } + + Future clear([Batch? txn]) async { + txn ??= boxCollection._activeBatch; + + if (txn == null) { + await boxCollection._db.delete(name); + } else { + txn.delete(name); + } + + _cache.clear(); + _cachedKeys = null; + return; + } +} diff --git a/pubspec.yaml b/pubspec.yaml index 33d4e9e7..ce6c2aa7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -28,6 +28,7 @@ dependencies: random_string: ^2.3.1 sdp_transform: ^0.3.2 slugify: ^2.0.0 + sqflite_common: ^2.4.5 typed_data: ^1.3.2 webrtc_interface: ^1.0.13 @@ -36,6 +37,7 @@ dev_dependencies: file: ">=6.1.1 <8.0.0" import_sorter: ^4.6.0 lints: ^3.0.0 + sqflite_common_ffi: ^2.2.5 test: ^1.15.7 #flutter_test: {sdk: flutter} #dependency_overrides: diff --git a/test/box_test.dart b/test/box_test.dart new file mode 100644 index 00000000..d58cf616 --- /dev/null +++ b/test/box_test.dart @@ -0,0 +1,80 @@ +import 'package:sqflite_common_ffi/sqflite_ffi.dart'; +import 'package:test/test.dart'; + +import 'package:matrix/src/database/sqflite_box.dart'; + +void main() { + group('Box tests', () { + late BoxCollection collection; + const Set boxNames = {'cats', 'dogs'}; + const data = {'name': 'Fluffy', 'age': 2}; + const data2 = {'name': 'Loki', 'age': 4}; + setUp(() async { + final db = await databaseFactoryFfi.openDatabase(':memory:'); + collection = await BoxCollection.open( + 'testbox', + boxNames, + sqfliteDatabase: db, + ); + }); + + test('Box.put and Box.get', () async { + final box = collection.openBox('cats'); + await box.put('fluffy', data); + expect(await box.get('fluffy'), data); + await box.clear(); + }); + + test('Box.getAll', () async { + final box = collection.openBox('cats'); + await box.put('fluffy', data); + await box.put('loki', data2); + expect(await box.getAll(['fluffy', 'loki']), [data, data2]); + await box.clear(); + }); + + test('Box.getAllKeys', () async { + final box = collection.openBox('cats'); + await box.put('fluffy', data); + await box.put('loki', data2); + expect(await box.getAllKeys(), ['fluffy', 'loki']); + await box.clear(); + }); + + test('Box.getAllValues', () async { + final box = collection.openBox('cats'); + await box.put('fluffy', data); + await box.put('loki', data2); + expect(await box.getAllValues(), {'fluffy': data, 'loki': data2}); + await box.clear(); + }); + + test('Box.delete', () async { + final box = collection.openBox('cats'); + await box.put('fluffy', data); + await box.put('loki', data2); + await box.delete('fluffy'); + expect(await box.get('fluffy'), null); + await box.clear(); + }); + + test('Box.deleteAll', () async { + final box = collection.openBox('cats'); + await box.put('fluffy', data); + await box.put('loki', data2); + await box.deleteAll(['fluffy', 'loki']); + expect(await box.get('fluffy'), null); + expect(await box.get('loki'), null); + await box.clear(); + }); + + test('Box.clear', () async { + final box = collection.openBox('cats'); + await box.put('fluffy', data); + await box.put('loki', data2); + await box.clear(); + expect(await box.get('fluffy'), null); + expect(await box.get('loki'), null); + }); + }); +} diff --git a/test/database_api_test.dart b/test/database_api_test.dart index ff172397..6e3c9999 100644 --- a/test/database_api_test.dart +++ b/test/database_api_test.dart @@ -27,11 +27,11 @@ import 'package:matrix/matrix.dart'; import 'fake_database.dart'; void main() { - group('HiveCollections Database Test', () { + group('Matrix SDK Database Test', () { late DatabaseApi database; late int toDeviceQueueIndex; - test('Open', () async { - database = await getHiveCollectionsDatabase(null); + test('Setup', () async { + database = await getMatrixSdkDatabase(null); }); test('transaction', () async { var counter = 0; diff --git a/test/fake_database.dart b/test/fake_database.dart index 779e9c5e..af428557 100644 --- a/test/fake_database.dart +++ b/test/fake_database.dart @@ -18,6 +18,7 @@ import 'package:file/local.dart'; import 'package:hive/hive.dart'; +import 'package:sqflite_common_ffi/sqflite_ffi.dart'; import 'package:matrix/matrix.dart'; @@ -40,6 +41,14 @@ Future getHiveCollectionsDatabase(Client? c) async { return db; } +// ignore: deprecated_member_use_from_same_package +Future getMatrixSdkDatabase(Client? c) async { + final database = await databaseFactoryFfi.openDatabase(':memory:'); + final db = MatrixSdkDatabase('unit_test.${c?.hashCode}', database: database); + await db.open(); + return db; +} + // ignore: deprecated_member_use_from_same_package Future getHiveDatabase(Client? c) async { if (!hiveInitialized) {