diff --git a/lib/src/client.dart b/lib/src/client.dart index cea838e5..c79a3a50 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -37,6 +37,7 @@ import 'package:matrix/src/models/timeline_chunk.dart'; import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:matrix/src/utils/client_init_exception.dart'; import 'package:matrix/src/utils/multilock.dart'; +import 'package:matrix/src/utils/multipart_request_progress.dart'; import 'package:matrix/src/utils/run_benchmarked.dart'; import 'package:matrix/src/utils/run_in_root.dart'; import 'package:matrix/src/utils/sync_update_item_count.dart'; @@ -1523,6 +1524,10 @@ class Client extends MatrixApi { Uint8List file, { String? filename, String? contentType, + + /// Callback which gets triggered on progress containing the amount of + /// uploaded bytes. + void Function(int)? onProgress, }) async { final mediaConfig = await getConfig(); final maxMediaSize = mediaConfig.mUploadSize; @@ -1531,8 +1536,31 @@ class Client extends MatrixApi { } contentType ??= lookupMimeType(filename ?? '', headerBytes: file); - final mxc = await super - .uploadContent(file, filename: filename, contentType: contentType); + + final requestUri = Uri( + path: '_matrix/media/v3/upload', + queryParameters: { + if (filename != null) 'filename': filename, + }, + ); + final request = MultipartRequest( + 'POST', + baseUri!.resolveUri(requestUri), + onProgress: onProgress, + ); + request.headers['authorization'] = 'Bearer ${bearerToken!}'; + if (contentType != null) request.headers['content-type'] = contentType; + request.files.add( + http.MultipartFile.fromBytes('file', file, filename: filename), + ); + final response = await httpClient.send(request); + final responseBody = await response.stream.toBytes(); + if (response.statusCode != 200) unexpectedResponse(response, responseBody); + final responseString = utf8.decode(responseBody); + final json = jsonDecode(responseString); + final mxc = ((json['content_uri'] as String).startsWith('mxc://') + ? Uri.parse(json['content_uri'] as String) + : throw Exception('Uri not an mxc URI')); final database = this.database; if (file.length <= database.maxFileSize) { @@ -1606,7 +1634,10 @@ class Client extends MatrixApi { /// Uploads a new user avatar for this user. Leave file null to remove the /// current avatar. - Future setAvatar(MatrixFile? file) async { + Future setAvatar( + MatrixFile? file, { + void Function(int)? onUploadProgress, + }) async { if (file == null) { // We send an empty String to remove the avatar. Sending Null **should** // work but it doesn't with Synapse. See: @@ -1617,6 +1648,7 @@ class Client extends MatrixApi { file.bytes, filename: file.name, contentType: file.mimeType, + onProgress: onUploadProgress, ); await setAvatarUrl(userID!, uploadResp); return; diff --git a/lib/src/event.dart b/lib/src/event.dart index 14b49e9a..60c39583 100644 --- a/lib/src/event.dart +++ b/lib/src/event.dart @@ -16,17 +16,20 @@ * along with this program. If not, see . */ +import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; import 'package:collection/collection.dart'; import 'package:html/parser.dart'; +import 'package:http/http.dart' as http; import 'package:mime/mime.dart'; import 'package:matrix/matrix.dart'; import 'package:matrix/src/utils/file_send_request_credentials.dart'; import 'package:matrix/src/utils/html_to_text.dart'; import 'package:matrix/src/utils/markdown.dart'; +import 'package:matrix/src/utils/multipart_request_progress.dart'; abstract class RelationshipTypes { static const String reply = 'm.in_reply_to'; @@ -747,6 +750,10 @@ class Event extends MatrixEvent { bool getThumbnail = false, Future Function(Uri)? downloadCallback, bool fromLocalStoreOnly = false, + + /// Callback which gets triggered on progress containing the amount of + /// downloaded bytes. + void Function(int)? onDownloadProgress, }) async { if (![EventTypes.Message, EventTypes.Sticker].contains(type)) { throw ("This event has the type '$type' and so it can't contain an attachment."); @@ -781,11 +788,14 @@ class Event extends MatrixEvent { final canDownloadFileFromServer = uint8list == null && !fromLocalStoreOnly; if (canDownloadFileFromServer) { final httpClient = room.client.httpClient; - downloadCallback ??= (Uri url) async => (await httpClient.get( - url, - headers: {'authorization': 'Bearer ${room.client.accessToken}'}, - )) - .bodyBytes; + downloadCallback ??= (Uri url) async { + final request = http.Request('GET', url); + request.headers['authorization'] = 'Bearer ${room.client.accessToken}'; + + final response = await httpClient.send(request); + + return await response.stream.toBytesWithProgress(onDownloadProgress); + }; uint8list = await downloadCallback(await mxcUrl.getDownloadUri(room.client)); storeable = storeable && uint8list.lengthInBytes < database.maxFileSize; diff --git a/lib/src/room.dart b/lib/src/room.dart index e6e92327..0238db48 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -838,6 +838,11 @@ class Room { Map? extraContent, String? threadRootEventId, String? threadLastEventId, + + /// Callback which gets triggered on progress containing the amount of + /// uploaded bytes. + void Function(int)? onUploadProgress, + void Function(int)? onThumbnailUploadProgress, }) async { txid ??= client.generateUniqueTransactionId(); sendingFilePlaceholders[txid] = file; @@ -955,12 +960,14 @@ class Room { uploadFile.bytes, filename: uploadFile.name, contentType: uploadFile.mimeType, + onProgress: onUploadProgress, ); thumbnailUploadResp = uploadThumbnail != null ? await client.uploadContent( uploadThumbnail.bytes, filename: uploadThumbnail.name, contentType: uploadThumbnail.mimeType, + onProgress: onThumbnailUploadProgress, ) : null; } on MatrixException catch (_) { @@ -2104,10 +2111,17 @@ class Room { /// Uploads a new avatar for this room. Returns the event ID of the new /// m.room.avatar event. Insert null to remove the current avatar. - Future setAvatar(MatrixFile? file) async { + Future setAvatar( + MatrixFile? file, { + void Function(int)? onUploadProgress, + }) async { final uploadResp = file == null ? null - : await client.uploadContent(file.bytes, filename: file.name); + : await client.uploadContent( + file.bytes, + filename: file.name, + onProgress: onUploadProgress, + ); return await client.setRoomStateWithKey( id, EventTypes.RoomAvatar, diff --git a/lib/src/utils/multipart_request_progress.dart b/lib/src/utils/multipart_request_progress.dart new file mode 100644 index 00000000..7fb9335d --- /dev/null +++ b/lib/src/utils/multipart_request_progress.dart @@ -0,0 +1,57 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:http/http.dart' as http; + +class MultipartRequest extends http.MultipartRequest { + MultipartRequest( + super.method, + super.url, { + this.onProgress, + }); + + final void Function(int bytes)? onProgress; + + @override + http.ByteStream finalize() { + final byteStream = super.finalize(); + if (onProgress == null) return byteStream; + + final total = contentLength; + int bytes = 0; + + final t = StreamTransformer.fromHandlers( + handleData: (List data, EventSink> sink) { + bytes += data.length; + onProgress?.call(bytes); + if (total >= bytes) { + sink.add(data); + } + }, + ); + final stream = byteStream.transform(t); + return http.ByteStream(stream); + } +} + +extension ToBytesWithProgress on http.ByteStream { + /// Collects the data of this stream in a [Uint8List]. + Future toBytesWithProgress(void Function(int)? onProgress) { + var length = 0; + final completer = Completer(); + final sink = ByteConversionSink.withCallback( + (bytes) => completer.complete(Uint8List.fromList(bytes)), + ); + listen( + (bytes) { + sink.add(bytes); + onProgress?.call(length += bytes.length); + }, + onError: completer.completeError, + onDone: sink.close, + cancelOnError: true, + ); + return completer.future; + } +} diff --git a/test/client_test.dart b/test/client_test.dart index 19773a1e..7bc747ff 100644 --- a/test/client_test.dart +++ b/test/client_test.dart @@ -1488,13 +1488,18 @@ void main() { }); test('upload', () async { final client = await getClient(); - final response = - await client.uploadContent(Uint8List(0), filename: 'file.jpeg'); + final onProgressMap = []; + final response = await client.uploadContent( + Uint8List(0), + filename: 'file.jpeg', + onProgress: onProgressMap.add, + ); expect(response.toString(), 'mxc://example.com/AQwafuaFswefuhsfAFAgsw'); expect( await client.database.getFile(response) != null, client.database.supportsFileStoring, ); + expect(onProgressMap, [74, 183, 183, 185, 261]); await client.dispose(closeDatabase: true); }); diff --git a/test/event_test.dart b/test/event_test.dart index e3edae90..0bfb297f 100644 --- a/test/event_test.dart +++ b/test/event_test.dart @@ -2485,6 +2485,30 @@ void main() async { await room.client.dispose(closeDatabase: true); }, ); + + test('downloadAndDecryptAttachment from server', () async { + final client = await getClient(); + final event = Event( + room: client.rooms.first, + eventId: 'test', + originServerTs: DateTime.now(), + senderId: client.userID!, + content: { + 'body': 'ascii.txt', + 'filename': 'ascii.txt', + 'info': {'mimetype': 'application/msword', 'size': 6}, + 'msgtype': 'm.file', + 'url': 'mxc://example.org/abcd1234ascii', + }, + type: EventTypes.Message, + ); + final progressList = []; + await event.downloadAndDecryptAttachment( + onDownloadProgress: progressList.add, + ); + await client.dispose(); + expect(progressList, [112]); + }); test('downloadAndDecryptAttachment store', tags: 'olm', () async { final FILE_BUFF = Uint8List.fromList([0]); var serverHits = 0;