feat: Add onProgress for upload and download methods

This commit is contained in:
Christian Kußowski 2025-09-18 15:30:38 +02:00
parent a44dfbe58f
commit 8735ecd378
No known key found for this signature in database
GPG Key ID: E067ECD60F1A0652
6 changed files with 154 additions and 12 deletions

View File

@ -37,6 +37,7 @@ import 'package:matrix/src/models/timeline_chunk.dart';
import 'package:matrix/src/utils/cached_stream_controller.dart'; import 'package:matrix/src/utils/cached_stream_controller.dart';
import 'package:matrix/src/utils/client_init_exception.dart'; import 'package:matrix/src/utils/client_init_exception.dart';
import 'package:matrix/src/utils/multilock.dart'; import 'package:matrix/src/utils/multilock.dart';
import 'package:matrix/src/utils/multipart_request_progress.dart';
import 'package:matrix/src/utils/run_benchmarked.dart'; import 'package:matrix/src/utils/run_benchmarked.dart';
import 'package:matrix/src/utils/run_in_root.dart'; import 'package:matrix/src/utils/run_in_root.dart';
import 'package:matrix/src/utils/sync_update_item_count.dart'; import 'package:matrix/src/utils/sync_update_item_count.dart';
@ -1523,6 +1524,10 @@ class Client extends MatrixApi {
Uint8List file, { Uint8List file, {
String? filename, String? filename,
String? contentType, String? contentType,
/// Callback which gets triggered on progress containing the amount of
/// uploaded bytes.
void Function(int)? onProgress,
}) async { }) async {
final mediaConfig = await getConfig(); final mediaConfig = await getConfig();
final maxMediaSize = mediaConfig.mUploadSize; final maxMediaSize = mediaConfig.mUploadSize;
@ -1531,8 +1536,31 @@ class Client extends MatrixApi {
} }
contentType ??= lookupMimeType(filename ?? '', headerBytes: file); contentType ??= lookupMimeType(filename ?? '', headerBytes: file);
final mxc = await super
.uploadContent(file, filename: filename, contentType: contentType); final requestUri = Uri(
path: '_matrix/media/v3/upload',
queryParameters: {
if (filename != null) 'filename': filename,
},
);
final request = MultipartRequest(
'POST',
baseUri!.resolveUri(requestUri),
onProgress: onProgress,
);
request.headers['authorization'] = 'Bearer ${bearerToken!}';
if (contentType != null) request.headers['content-type'] = contentType;
request.files.add(
http.MultipartFile.fromBytes('file', file, filename: filename),
);
final response = await httpClient.send(request);
final responseBody = await response.stream.toBytes();
if (response.statusCode != 200) unexpectedResponse(response, responseBody);
final responseString = utf8.decode(responseBody);
final json = jsonDecode(responseString);
final mxc = ((json['content_uri'] as String).startsWith('mxc://')
? Uri.parse(json['content_uri'] as String)
: throw Exception('Uri not an mxc URI'));
final database = this.database; final database = this.database;
if (file.length <= database.maxFileSize) { if (file.length <= database.maxFileSize) {
@ -1606,7 +1634,10 @@ class Client extends MatrixApi {
/// Uploads a new user avatar for this user. Leave file null to remove the /// Uploads a new user avatar for this user. Leave file null to remove the
/// current avatar. /// current avatar.
Future<void> setAvatar(MatrixFile? file) async { Future<void> setAvatar(
MatrixFile? file, {
void Function(int)? onUploadProgress,
}) async {
if (file == null) { if (file == null) {
// We send an empty String to remove the avatar. Sending Null **should** // We send an empty String to remove the avatar. Sending Null **should**
// work but it doesn't with Synapse. See: // work but it doesn't with Synapse. See:
@ -1617,6 +1648,7 @@ class Client extends MatrixApi {
file.bytes, file.bytes,
filename: file.name, filename: file.name,
contentType: file.mimeType, contentType: file.mimeType,
onProgress: onUploadProgress,
); );
await setAvatarUrl(userID!, uploadResp); await setAvatarUrl(userID!, uploadResp);
return; return;

View File

@ -16,17 +16,20 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>. * along with this program. If not, see <https://www.gnu.org/licenses/>.
*/ */
import 'dart:async';
import 'dart:convert'; import 'dart:convert';
import 'dart:typed_data'; import 'dart:typed_data';
import 'package:collection/collection.dart'; import 'package:collection/collection.dart';
import 'package:html/parser.dart'; import 'package:html/parser.dart';
import 'package:http/http.dart' as http;
import 'package:mime/mime.dart'; import 'package:mime/mime.dart';
import 'package:matrix/matrix.dart'; import 'package:matrix/matrix.dart';
import 'package:matrix/src/utils/file_send_request_credentials.dart'; import 'package:matrix/src/utils/file_send_request_credentials.dart';
import 'package:matrix/src/utils/html_to_text.dart'; import 'package:matrix/src/utils/html_to_text.dart';
import 'package:matrix/src/utils/markdown.dart'; import 'package:matrix/src/utils/markdown.dart';
import 'package:matrix/src/utils/multipart_request_progress.dart';
abstract class RelationshipTypes { abstract class RelationshipTypes {
static const String reply = 'm.in_reply_to'; static const String reply = 'm.in_reply_to';
@ -747,6 +750,10 @@ class Event extends MatrixEvent {
bool getThumbnail = false, bool getThumbnail = false,
Future<Uint8List> Function(Uri)? downloadCallback, Future<Uint8List> Function(Uri)? downloadCallback,
bool fromLocalStoreOnly = false, bool fromLocalStoreOnly = false,
/// Callback which gets triggered on progress containing the amount of
/// downloaded bytes.
void Function(int)? onDownloadProgress,
}) async { }) async {
if (![EventTypes.Message, EventTypes.Sticker].contains(type)) { if (![EventTypes.Message, EventTypes.Sticker].contains(type)) {
throw ("This event has the type '$type' and so it can't contain an attachment."); throw ("This event has the type '$type' and so it can't contain an attachment.");
@ -781,11 +788,14 @@ class Event extends MatrixEvent {
final canDownloadFileFromServer = uint8list == null && !fromLocalStoreOnly; final canDownloadFileFromServer = uint8list == null && !fromLocalStoreOnly;
if (canDownloadFileFromServer) { if (canDownloadFileFromServer) {
final httpClient = room.client.httpClient; final httpClient = room.client.httpClient;
downloadCallback ??= (Uri url) async => (await httpClient.get( downloadCallback ??= (Uri url) async {
url, final request = http.Request('GET', url);
headers: {'authorization': 'Bearer ${room.client.accessToken}'}, request.headers['authorization'] = 'Bearer ${room.client.accessToken}';
))
.bodyBytes; final response = await httpClient.send(request);
return await response.stream.toBytesWithProgress(onDownloadProgress);
};
uint8list = uint8list =
await downloadCallback(await mxcUrl.getDownloadUri(room.client)); await downloadCallback(await mxcUrl.getDownloadUri(room.client));
storeable = storeable && uint8list.lengthInBytes < database.maxFileSize; storeable = storeable && uint8list.lengthInBytes < database.maxFileSize;

View File

@ -838,6 +838,11 @@ class Room {
Map<String, dynamic>? extraContent, Map<String, dynamic>? extraContent,
String? threadRootEventId, String? threadRootEventId,
String? threadLastEventId, String? threadLastEventId,
/// Callback which gets triggered on progress containing the amount of
/// uploaded bytes.
void Function(int)? onUploadProgress,
void Function(int)? onThumbnailUploadProgress,
}) async { }) async {
txid ??= client.generateUniqueTransactionId(); txid ??= client.generateUniqueTransactionId();
sendingFilePlaceholders[txid] = file; sendingFilePlaceholders[txid] = file;
@ -955,12 +960,14 @@ class Room {
uploadFile.bytes, uploadFile.bytes,
filename: uploadFile.name, filename: uploadFile.name,
contentType: uploadFile.mimeType, contentType: uploadFile.mimeType,
onProgress: onUploadProgress,
); );
thumbnailUploadResp = uploadThumbnail != null thumbnailUploadResp = uploadThumbnail != null
? await client.uploadContent( ? await client.uploadContent(
uploadThumbnail.bytes, uploadThumbnail.bytes,
filename: uploadThumbnail.name, filename: uploadThumbnail.name,
contentType: uploadThumbnail.mimeType, contentType: uploadThumbnail.mimeType,
onProgress: onThumbnailUploadProgress,
) )
: null; : null;
} on MatrixException catch (_) { } on MatrixException catch (_) {
@ -2104,10 +2111,17 @@ class Room {
/// Uploads a new avatar for this room. Returns the event ID of the new /// Uploads a new avatar for this room. Returns the event ID of the new
/// m.room.avatar event. Insert null to remove the current avatar. /// m.room.avatar event. Insert null to remove the current avatar.
Future<String> setAvatar(MatrixFile? file) async { Future<String> setAvatar(
MatrixFile? file, {
void Function(int)? onUploadProgress,
}) async {
final uploadResp = file == null final uploadResp = file == null
? null ? null
: await client.uploadContent(file.bytes, filename: file.name); : await client.uploadContent(
file.bytes,
filename: file.name,
onProgress: onUploadProgress,
);
return await client.setRoomStateWithKey( return await client.setRoomStateWithKey(
id, id,
EventTypes.RoomAvatar, EventTypes.RoomAvatar,

View File

@ -0,0 +1,57 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:http/http.dart' as http;
class MultipartRequest extends http.MultipartRequest {
MultipartRequest(
super.method,
super.url, {
this.onProgress,
});
final void Function(int bytes)? onProgress;
@override
http.ByteStream finalize() {
final byteStream = super.finalize();
if (onProgress == null) return byteStream;
final total = contentLength;
int bytes = 0;
final t = StreamTransformer.fromHandlers(
handleData: (List<int> data, EventSink<List<int>> sink) {
bytes += data.length;
onProgress?.call(bytes);
if (total >= bytes) {
sink.add(data);
}
},
);
final stream = byteStream.transform(t);
return http.ByteStream(stream);
}
}
extension ToBytesWithProgress on http.ByteStream {
/// Collects the data of this stream in a [Uint8List].
Future<Uint8List> toBytesWithProgress(void Function(int)? onProgress) {
var length = 0;
final completer = Completer<Uint8List>();
final sink = ByteConversionSink.withCallback(
(bytes) => completer.complete(Uint8List.fromList(bytes)),
);
listen(
(bytes) {
sink.add(bytes);
onProgress?.call(length += bytes.length);
},
onError: completer.completeError,
onDone: sink.close,
cancelOnError: true,
);
return completer.future;
}
}

View File

@ -1488,13 +1488,18 @@ void main() {
}); });
test('upload', () async { test('upload', () async {
final client = await getClient(); final client = await getClient();
final response = final onProgressMap = <int>[];
await client.uploadContent(Uint8List(0), filename: 'file.jpeg'); final response = await client.uploadContent(
Uint8List(0),
filename: 'file.jpeg',
onProgress: onProgressMap.add,
);
expect(response.toString(), 'mxc://example.com/AQwafuaFswefuhsfAFAgsw'); expect(response.toString(), 'mxc://example.com/AQwafuaFswefuhsfAFAgsw');
expect( expect(
await client.database.getFile(response) != null, await client.database.getFile(response) != null,
client.database.supportsFileStoring, client.database.supportsFileStoring,
); );
expect(onProgressMap, [74, 183, 183, 185, 261]);
await client.dispose(closeDatabase: true); await client.dispose(closeDatabase: true);
}); });

View File

@ -2485,6 +2485,30 @@ void main() async {
await room.client.dispose(closeDatabase: true); await room.client.dispose(closeDatabase: true);
}, },
); );
test('downloadAndDecryptAttachment from server', () async {
final client = await getClient();
final event = Event(
room: client.rooms.first,
eventId: 'test',
originServerTs: DateTime.now(),
senderId: client.userID!,
content: {
'body': 'ascii.txt',
'filename': 'ascii.txt',
'info': {'mimetype': 'application/msword', 'size': 6},
'msgtype': 'm.file',
'url': 'mxc://example.org/abcd1234ascii',
},
type: EventTypes.Message,
);
final progressList = <int>[];
await event.downloadAndDecryptAttachment(
onDownloadProgress: progressList.add,
);
await client.dispose();
expect(progressList, [112]);
});
test('downloadAndDecryptAttachment store', tags: 'olm', () async { test('downloadAndDecryptAttachment store', tags: 'olm', () async {
final FILE_BUFF = Uint8List.fromList([0]); final FILE_BUFF = Uint8List.fromList([0]);
var serverHits = 0; var serverHits = 0;