Merge branch 'nico/large-files-fix' into 'main'

fix: timeout when sending large files

Closes famedly-web#540

See merge request famedly/company/frontend/famedlysdk!1125
This commit is contained in:
Nicolas Werner 2022-09-09 10:03:08 +00:00
commit b0239074bf
2 changed files with 4 additions and 59 deletions

View File

@ -189,8 +189,8 @@ class Client extends MatrixApi {
? NativeImplementationsIsolate(compute) ? NativeImplementationsIsolate(compute)
: nativeImplementations, : nativeImplementations,
super( super(
httpClient: httpClient: FixedTimeoutHttpClient(
VariableTimeoutHttpClient(httpClient ?? http.Client())) { httpClient ?? http.Client(), Duration(seconds: 35))) {
if (logLevel != null) Logs().level = logLevel; if (logLevel != null) Logs().level = logLevel;
importantStateEvents.addAll([ importantStateEvents.addAll([
EventTypes.RoomName, EventTypes.RoomName,

View File

@ -20,19 +20,6 @@ import 'dart:async';
import 'package:http/http.dart' as http; import 'package:http/http.dart' as http;
import 'package:matrix/matrix.dart';
/// Stream.timeout fails if no progress is made in timeLimit.
/// In contrast, streamTotalTimeout fails if the stream isn't completed
/// until timeoutFuture.
Stream<T> streamTotalTimeout<T>(
Stream<T> stream, Future<Never> timeoutFuture) async* {
final si = StreamIterator(stream);
while (await Future.any([si.moveNext(), timeoutFuture])) {
yield si.current;
}
}
http.StreamedResponse replaceStream( http.StreamedResponse replaceStream(
http.StreamedResponse base, Stream<List<int>> stream) => http.StreamedResponse base, Stream<List<int>> stream) =>
http.StreamedResponse( http.StreamedResponse(
@ -57,10 +44,8 @@ abstract class TimeoutHttpClient extends http.BaseClient {
@override @override
Future<http.StreamedResponse> send(http.BaseRequest request) async { Future<http.StreamedResponse> send(http.BaseRequest request) async {
final timeoutFuture = Completer<Never>().future.timeout(timeout); final response = await inner.send(request);
final response = await Future.any([inner.send(request), timeoutFuture]); return replaceStream(response, response.stream.timeout(timeout));
return replaceStream(
response, streamTotalTimeout(response.stream, timeoutFuture));
} }
} }
@ -69,43 +54,3 @@ class FixedTimeoutHttpClient extends TimeoutHttpClient {
@override @override
Duration timeout; Duration timeout;
} }
class VariableTimeoutHttpClient extends TimeoutHttpClient {
/// Matrix synchronisation is done with https long polling. This needs a
/// timeout which is usually 30 seconds.
int syncTimeoutSec;
int _timeoutFactor = 1;
@override
Duration get timeout =>
Duration(seconds: _timeoutFactor * syncTimeoutSec + 5);
VariableTimeoutHttpClient(http.Client inner, [this.syncTimeoutSec = 30])
: super(inner);
@override
Future<http.StreamedResponse> send(http.BaseRequest request) async {
try {
final response = await super.send(request);
return replaceStream(response, (() async* {
try {
await for (final chunk in response.stream) {
yield chunk;
}
_timeoutFactor = 1;
} on TimeoutException catch (e, s) {
_timeoutFactor *= 2;
throw MatrixConnectionException(e, s);
} catch (e, s) {
throw MatrixConnectionException(e, s);
}
})());
} on TimeoutException catch (e, s) {
_timeoutFactor *= 2;
throw MatrixConnectionException(e, s);
} catch (e, s) {
throw MatrixConnectionException(e, s);
}
}
}