Merge branch 'krille/remove-transaction-workaround' into 'main'
refactor: Remove database transaction workaround See merge request famedly/company/frontend/famedlysdk!1192
This commit is contained in:
commit
ed0ab3bdb9
|
|
@ -1569,10 +1569,11 @@ class Client extends MatrixApi {
|
||||||
}
|
}
|
||||||
dynamic syncError;
|
dynamic syncError;
|
||||||
await _checkSyncFilter();
|
await _checkSyncFilter();
|
||||||
|
timeout ??= const Duration(seconds: 30);
|
||||||
final syncRequest = sync(
|
final syncRequest = sync(
|
||||||
filter: syncFilterId,
|
filter: syncFilterId,
|
||||||
since: prevBatch,
|
since: prevBatch,
|
||||||
timeout: timeout?.inMilliseconds ?? 30000,
|
timeout: timeout.inMilliseconds,
|
||||||
setPresence: syncPresence,
|
setPresence: syncPresence,
|
||||||
).then((v) => Future<SyncUpdate?>.value(v)).catchError((e) {
|
).then((v) => Future<SyncUpdate?>.value(v)).catchError((e) {
|
||||||
syncError = e;
|
syncError = e;
|
||||||
|
|
@ -1580,7 +1581,8 @@ class Client extends MatrixApi {
|
||||||
});
|
});
|
||||||
_currentSyncId = syncRequest.hashCode;
|
_currentSyncId = syncRequest.hashCode;
|
||||||
onSyncStatus.add(SyncStatusUpdate(SyncStatus.waitingForResponse));
|
onSyncStatus.add(SyncStatusUpdate(SyncStatus.waitingForResponse));
|
||||||
final syncResp = await syncRequest;
|
final syncResp =
|
||||||
|
await syncRequest.timeout(timeout + const Duration(seconds: 10));
|
||||||
onSyncStatus.add(SyncStatusUpdate(SyncStatus.processing));
|
onSyncStatus.add(SyncStatusUpdate(SyncStatus.processing));
|
||||||
if (syncResp == null) throw syncError ?? 'Unknown sync error';
|
if (syncResp == null) throw syncError ?? 'Unknown sync error';
|
||||||
if (_currentSyncId != syncRequest.hashCode) {
|
if (_currentSyncId != syncRequest.hashCode) {
|
||||||
|
|
|
||||||
|
|
@ -308,7 +308,7 @@ abstract class DatabaseApi {
|
||||||
|
|
||||||
Future<dynamic> close();
|
Future<dynamic> close();
|
||||||
|
|
||||||
Future<T> transaction<T>(Future<T> Function() action);
|
Future<void> transaction(Future<void> Function() action);
|
||||||
|
|
||||||
Future<String> exportDump();
|
Future<String> exportDump();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1310,61 +1310,9 @@ class HiveCollectionsDatabase extends DatabaseApi {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Completer<void>? _transactionLock;
|
|
||||||
final _transactionZones = <Zone>{};
|
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<T> transaction<T>(Future<T> Function() action) async {
|
Future<void> transaction(Future<void> Function() action) =>
|
||||||
// we want transactions to lock, however NOT if transactoins are run inside of each other.
|
_collection.transaction(action);
|
||||||
// to be able to do this, we use dart zones (https://dart.dev/articles/archive/zones).
|
|
||||||
// _transactionZones holds a set of all zones which are currently running a transaction.
|
|
||||||
// _transactionLock holds the lock.
|
|
||||||
|
|
||||||
// first we try to determine if we are inside of a transaction currently
|
|
||||||
var isInTransaction = false;
|
|
||||||
Zone? zone = Zone.current;
|
|
||||||
// for that we keep on iterating to the parent zone until there is either no zone anymore
|
|
||||||
// or we have found a zone inside of _transactionZones.
|
|
||||||
while (zone != null) {
|
|
||||||
if (_transactionZones.contains(zone)) {
|
|
||||||
isInTransaction = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
zone = zone.parent;
|
|
||||||
}
|
|
||||||
// if we are inside a transaction....just run the action
|
|
||||||
if (isInTransaction) {
|
|
||||||
return await action();
|
|
||||||
}
|
|
||||||
// if we are *not* in a transaction, time to wait for the lock!
|
|
||||||
while (_transactionLock != null) {
|
|
||||||
await _transactionLock!.future;
|
|
||||||
}
|
|
||||||
// claim the lock
|
|
||||||
final lock = Completer<void>();
|
|
||||||
_transactionLock = lock;
|
|
||||||
try {
|
|
||||||
// run the action inside of a new zone
|
|
||||||
return await runZoned(() async {
|
|
||||||
try {
|
|
||||||
// don't forget to add the new zone to _transactionZones!
|
|
||||||
_transactionZones.add(Zone.current);
|
|
||||||
late final T result;
|
|
||||||
await _collection.transaction(() async {
|
|
||||||
result = await action();
|
|
||||||
});
|
|
||||||
return result;
|
|
||||||
} finally {
|
|
||||||
// aaaand remove the zone from _transactionZones again
|
|
||||||
_transactionZones.remove(Zone.current);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} finally {
|
|
||||||
// aaaand finally release the lock
|
|
||||||
_transactionLock = null;
|
|
||||||
lock.complete();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<void> updateClient(
|
Future<void> updateClient(
|
||||||
|
|
|
||||||
|
|
@ -1255,7 +1255,7 @@ class FamedlySdkHiveDatabase extends DatabaseApi {
|
||||||
final _transactionZones = <Zone>{};
|
final _transactionZones = <Zone>{};
|
||||||
|
|
||||||
@override
|
@override
|
||||||
Future<T> transaction<T>(Future<T> Function() action) async {
|
Future<void> transaction(Future<void> Function() action) async {
|
||||||
// we want transactions to lock, however NOT if transactoins are run inside of each other.
|
// we want transactions to lock, however NOT if transactoins are run inside of each other.
|
||||||
// to be able to do this, we use dart zones (https://dart.dev/articles/archive/zones).
|
// to be able to do this, we use dart zones (https://dart.dev/articles/archive/zones).
|
||||||
// _transactionZones holds a set of all zones which are currently running a transaction.
|
// _transactionZones holds a set of all zones which are currently running a transaction.
|
||||||
|
|
|
||||||
|
|
@ -64,18 +64,6 @@ void testDatabase(
|
||||||
});
|
});
|
||||||
expect(counter++, 3);
|
expect(counter++, 3);
|
||||||
});
|
});
|
||||||
|
|
||||||
// we can't use Zone.root.run inside of tests so we abuse timers instead
|
|
||||||
Timer(Duration(milliseconds: 50), () async {
|
|
||||||
await database.transaction(() async {
|
|
||||||
expect(counter++, 6);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
await database.transaction(() async {
|
|
||||||
expect(counter++, 4);
|
|
||||||
await Future.delayed(Duration(milliseconds: 100));
|
|
||||||
expect(counter++, 5);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
test('insertIntoToDeviceQueue', () async {
|
test('insertIntoToDeviceQueue', () async {
|
||||||
toDeviceQueueIndex = await database.insertIntoToDeviceQueue(
|
toDeviceQueueIndex = await database.insertIntoToDeviceQueue(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue