diff --git a/lib/matrix.dart b/lib/matrix.dart index 426992c3..5a09bc14 100644 --- a/lib/matrix.dart +++ b/lib/matrix.dart @@ -33,6 +33,7 @@ export 'src/utils/sync_update_extension.dart'; export 'src/utils/to_device_event.dart'; export 'src/utils/uia_request.dart'; export 'src/utils/commands_extension.dart'; +export 'src/utils/http_timeout.dart'; export 'src/client.dart'; export 'src/event.dart'; export 'src/room.dart'; diff --git a/lib/src/client.dart b/lib/src/client.dart index ad650669..20858763 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -34,6 +34,7 @@ import 'user.dart'; import 'utils/commands_extension.dart'; import 'utils/device_keys_list.dart'; import 'utils/event_update.dart'; +import 'utils/http_timeout.dart'; import 'utils/matrix_file.dart'; import 'utils/room_update.dart'; import 'utils/to_device_event.dart'; @@ -170,7 +171,9 @@ class Client extends MatrixApi { state: StateFilter(lazyLoadMembers: true), ), ), - super(httpClient: httpClient) { + super( + httpClient: + VariableTimeoutHttpClient(httpClient ?? http.Client())) { supportedLoginTypes ??= {AuthenticationTypes.password}; verificationMethods ??= {}; importantStateEvents ??= {}; diff --git a/lib/src/utils/http_timeout.dart b/lib/src/utils/http_timeout.dart new file mode 100644 index 00000000..9ebf4bce --- /dev/null +++ b/lib/src/utils/http_timeout.dart @@ -0,0 +1,116 @@ +/* + * Famedly Matrix SDK + * Copyright (C) 2021 Famedly GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +import 'dart:async'; + +import 'package:http/http.dart' as http; + +import '../../matrix.dart'; + +/// Stream.timeout fails if no progress is made in timeLimit. +/// In contrast, streamTotalTimeout fails if the stream isn't completed +/// until timeoutFuture. +Stream streamTotalTimeout( + Stream stream, Future timeoutFuture) async* { + final si = StreamIterator(stream); + while (await Future.any([si.moveNext(), timeoutFuture])) { + yield si.current; + } +} + +http.StreamedResponse replaceStream( + http.StreamedResponse base, Stream> stream) => + http.StreamedResponse( + http.ByteStream(stream), + base.statusCode, + contentLength: base.contentLength, + request: base.request, + headers: base.headers, + isRedirect: base.isRedirect, + persistentConnection: base.persistentConnection, + reasonPhrase: base.reasonPhrase, + ); + +/// Http Client that enforces a timeout on requests. +/// Timeout calculation is done in a subclass. +abstract class TimeoutHttpClient extends http.BaseClient { + TimeoutHttpClient(this.inner); + + http.Client inner; + + Duration get timeout; + + @override + Future send(http.BaseRequest request) async { + final timeoutFuture = Completer().future.timeout(timeout); + final response = await Future.any([inner.send(request), timeoutFuture]); + return replaceStream( + response, streamTotalTimeout(response.stream, timeoutFuture)); + } +} + +class FixedTimeoutHttpClient extends TimeoutHttpClient { + FixedTimeoutHttpClient(http.Client inner, this.timeout) : super(inner); + @override + Duration timeout; + + @override + Future send(http.BaseRequest request) => + super.send(request); +} + +class VariableTimeoutHttpClient extends TimeoutHttpClient { + /// Matrix synchronisation is done with https long polling. This needs a + /// timeout which is usually 30 seconds. + int syncTimeoutSec; + + int _timeoutFactor = 1; + + @override + Duration get timeout => + Duration(seconds: _timeoutFactor * syncTimeoutSec + 5); + + VariableTimeoutHttpClient(http.Client inner, [this.syncTimeoutSec = 30]) + : super(inner); + + @override + Future send(http.BaseRequest request, + {Duration timeout}) async { + try { + final response = await super.send(request); + return replaceStream(response, (() async* { + try { + await for (final chunk in response.stream) { + yield chunk; + } + _timeoutFactor = 1; + } on TimeoutException catch (e, s) { + _timeoutFactor *= 2; + throw MatrixConnectionException(e, s); + } catch (e, s) { + throw MatrixConnectionException(e, s); + } + })()); + } on TimeoutException catch (e, s) { + _timeoutFactor *= 2; + throw MatrixConnectionException(e, s); + } catch (e, s) { + throw MatrixConnectionException(e, s); + } + } +}