From fb66c5caa91385342c268f019b824412e4aebd21 Mon Sep 17 00:00:00 2001 From: Alex Tran Date: Mon, 12 Jan 2026 20:26:55 -0600 Subject: [PATCH] refactor --- .../services/background_worker.service.dart | 2 +- .../pages/editing/drift_edit.page.dart | 2 +- .../share_intent_upload.provider.dart | 2 +- .../backup/drift_backup.provider.dart | 12 +- .../infrastructure/action.provider.dart | 24 +- mobile/lib/services/upload.service.dart | 365 ++++++++---------- 6 files changed, 181 insertions(+), 226 deletions(-) diff --git a/mobile/lib/domain/services/background_worker.service.dart b/mobile/lib/domain/services/background_worker.service.dart index 079ed17f21..134aa9f7ef 100644 --- a/mobile/lib/domain/services/background_worker.service.dart +++ b/mobile/lib/domain/services/background_worker.service.dart @@ -249,7 +249,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { final networkCapabilities = await _ref?.read(connectivityApiProvider).getCapabilities() ?? []; return _ref ?.read(uploadServiceProvider) - .startUploadWithHttp(currentUser.id, networkCapabilities.isUnmetered, _cancellationToken); + .uploadBackupCandidates(currentUser.id, networkCapabilities.isUnmetered, _cancellationToken); }, (error, stack) { dPrint(() => "Error in backup zone $error, $stack"); diff --git a/mobile/lib/presentation/pages/editing/drift_edit.page.dart b/mobile/lib/presentation/pages/editing/drift_edit.page.dart index 5fd97c3dc4..1247e392c8 100644 --- a/mobile/lib/presentation/pages/editing/drift_edit.page.dart +++ b/mobile/lib/presentation/pages/editing/drift_edit.page.dart @@ -79,7 +79,7 @@ class DriftEditImagePage extends ConsumerWidget { return; } - await ref.read(uploadServiceProvider).manualBackup([localAsset], CancellationToken()); + await ref.read(uploadServiceProvider).uploadLocalAssets([localAsset], CancellationToken()); } catch (e) { ImmichToast.show( durationInSecond: 6, diff --git a/mobile/lib/providers/asset_viewer/share_intent_upload.provider.dart b/mobile/lib/providers/asset_viewer/share_intent_upload.provider.dart index 02fddfdfa9..a024024281 100644 --- a/mobile/lib/providers/asset_viewer/share_intent_upload.provider.dart +++ b/mobile/lib/providers/asset_viewer/share_intent_upload.provider.dart @@ -64,7 +64,7 @@ class ShareIntentUploadStateNotifier extends StateNotifier 0 ? bytes / totalBytes : 0.0; diff --git a/mobile/lib/providers/backup/drift_backup.provider.dart b/mobile/lib/providers/backup/drift_backup.provider.dart index 3b2dd4f4d9..bbb158fe39 100644 --- a/mobile/lib/providers/backup/drift_backup.provider.dart +++ b/mobile/lib/providers/backup/drift_backup.provider.dart @@ -403,14 +403,16 @@ class DriftBackupNotifier extends StateNotifier { final hasWifi = networkCapabilities.isUnmetered; _logger.info('Network capabilities: $networkCapabilities, hasWifi/isUnmetered: $hasWifi'); - return _uploadService.startUploadWithHttp( + return _uploadService.uploadBackupCandidates( userId, hasWifi, cancelToken, - onProgress: _handleForegroundBackupProgress, - onSuccess: _handleForegroundBackupSuccess, - onError: _handleForegroundBackupError, - onICloudProgress: _handleICloudProgress, + callbacks: UploadCallbacks( + onProgress: _handleForegroundBackupProgress, + onSuccess: _handleForegroundBackupSuccess, + onError: _handleForegroundBackupError, + onICloudProgress: _handleICloudProgress, + ), ); } diff --git a/mobile/lib/providers/infrastructure/action.provider.dart b/mobile/lib/providers/infrastructure/action.provider.dart index 5c7427a277..3ec7fc33a8 100644 --- a/mobile/lib/providers/infrastructure/action.provider.dart +++ b/mobile/lib/providers/infrastructure/action.provider.dart @@ -426,19 +426,21 @@ class ActionNotifier extends Notifier { } try { - await _uploadService.manualBackup( + await _uploadService.uploadLocalAssets( assetsToUpload, cancelToken, - onProgress: (localAssetId, filename, bytes, totalBytes) { - final progress = totalBytes > 0 ? bytes / totalBytes : 0.0; - progressNotifier.setProgress(localAssetId, progress); - }, - onSuccess: (localAssetId, remoteAssetId) { - progressNotifier.remove(localAssetId); - }, - onError: (localAssetId, errorMessage) { - progressNotifier.setError(localAssetId); - }, + callbacks: UploadCallbacks( + onProgress: (localAssetId, filename, bytes, totalBytes) { + final progress = totalBytes > 0 ? bytes / totalBytes : 0.0; + progressNotifier.setProgress(localAssetId, progress); + }, + onSuccess: (localAssetId, remoteAssetId) { + progressNotifier.remove(localAssetId); + }, + onError: (localAssetId, errorMessage) { + progressNotifier.setError(localAssetId); + }, + ), ); return ActionResult(count: assetsToUpload.length, success: true); } catch (error, stack) { diff --git a/mobile/lib/services/upload.service.dart b/mobile/lib/services/upload.service.dart index c922695272..dc88f5b113 100644 --- a/mobile/lib/services/upload.service.dart +++ b/mobile/lib/services/upload.service.dart @@ -26,6 +26,15 @@ import 'package:logging/logging.dart'; import 'package:path/path.dart' as p; import 'package:photo_manager/photo_manager.dart' show PMProgressHandler; +class UploadCallbacks { + final void Function(String id, String filename, int bytes, int totalBytes)? onProgress; + final void Function(String localId, String remoteId)? onSuccess; + final void Function(String id, String errorMessage)? onError; + final void Function(String id, double progress)? onICloudProgress; + + const UploadCallbacks({this.onProgress, this.onSuccess, this.onError, this.onICloudProgress}); +} + final uploadServiceProvider = Provider((ref) { final service = UploadService( ref.watch(uploadRepositoryProvider), @@ -99,67 +108,6 @@ class UploadService { return _backupRepository.getAllCounts(userId); } - Future manualBackup( - List localAssets, - CancellationToken cancelToken, { - void Function(String localAssetId, String filename, int bytes, int totalBytes)? onProgress, - void Function(String localAssetId, String remoteAssetId)? onSuccess, - void Function(String localAssetId, String errorMessage)? onError, - void Function(String localAssetId, double progress)? onICloudProgress, - }) async { - if (localAssets.isEmpty) { - return; - } - - const concurrentUploads = 3; - final httpClients = List.generate(concurrentUploads, (_) => Client()); - - await _storageRepository.clearCache(); - - shouldAbortQueuingTasks = false; - - try { - int currentIndex = 0; - - Future worker(Client httpClient) async { - while (true) { - if (shouldAbortQueuingTasks || cancelToken.isCancelled) { - break; - } - - final index = currentIndex; - if (index >= localAssets.length) { - break; - } - currentIndex++; - - final asset = localAssets[index]; - - await _uploadSingleAsset( - asset, - httpClient, - cancelToken, - onProgress: onProgress, - onSuccess: onSuccess, - onError: onError, - onICloudProgress: onICloudProgress, - ); - } - } - - final workerFutures = >[]; - for (int i = 0; i < concurrentUploads; i++) { - workerFutures.add(worker(httpClients[i])); - } - - await Future.wait(workerFutures); - } finally { - for (final client in httpClients) { - client.close(); - } - } - } - /// Find backup candidates /// Build the upload tasks /// Enqueue the tasks @@ -188,84 +136,106 @@ class UploadService { } } - Future startUploadWithHttp( + /// Upload backup candidates from database (auto/background backup) + Future uploadBackupCandidates( String userId, bool hasWifi, CancellationToken cancelToken, { - void Function(String localAssetId, String filename, int bytes, int totalBytes)? onProgress, - void Function(String localAssetId, String remoteAssetId)? onSuccess, - void Function(String localAssetId, String errorMessage)? onError, - void Function(String localAssetId, double progress)? onICloudProgress, + UploadCallbacks callbacks = const UploadCallbacks(), }) async { - const concurrentUploads = 3; - final httpClients = List.generate(concurrentUploads, (_) => Client()); - - await _storageRepository.clearCache(); - - shouldAbortQueuingTasks = false; - final candidates = await _backupRepository.getCandidates(userId); if (candidates.isEmpty) { return; } - try { - int currentIndex = 0; + await _executeWithWorkerPool( + items: candidates, + cancelToken: cancelToken, + shouldSkip: (asset) { + final requireWifi = _shouldRequireWiFi(asset); + return requireWifi && !hasWifi; + }, + processItem: (asset, httpClient) => _uploadSingleAsset(asset, httpClient, cancelToken, callbacks: callbacks), + ); + } - Future worker(Client httpClient) async { - while (true) { - if (shouldAbortQueuingTasks || cancelToken.isCancelled) { - break; - } - - final index = currentIndex; - if (index >= candidates.length) { - break; - } - currentIndex++; - - final asset = candidates[index]; - - final requireWifi = _shouldRequireWiFi(asset); - if (requireWifi && !hasWifi) { - continue; - } - - await _uploadSingleAsset( - asset, - httpClient, - cancelToken, - onProgress: onProgress, - onSuccess: onSuccess, - onError: onError, - onICloudProgress: onICloudProgress, - ); - } - } - - // Start all workers in parallel - each worker continuously pulls from the queue - final workerFutures = >[]; - - for (int i = 0; i < concurrentUploads; i++) { - workerFutures.add(worker(httpClients[i])); - } - - await Future.wait(workerFutures); - } finally { - for (final client in httpClients) { - client.close(); - } + /// Upload local assets from user selection (manual backup) + Future uploadLocalAssets( + List localAssets, + CancellationToken cancelToken, { + UploadCallbacks callbacks = const UploadCallbacks(), + }) async { + if (localAssets.isEmpty) { + return; } + + await _executeWithWorkerPool( + items: localAssets, + cancelToken: cancelToken, + processItem: (asset, httpClient) => _uploadSingleAsset(asset, httpClient, cancelToken, callbacks: callbacks), + ); + } + + /// Upload external files (e.g., from share intent) + Future uploadExternalFiles( + List files, { + CancellationToken? cancelToken, + void Function(String fileId, int bytes, int totalBytes)? onProgress, + void Function(String fileId)? onSuccess, + void Function(String fileId, String errorMessage)? onError, + }) async { + if (files.isEmpty) { + return; + } + + final effectiveCancelToken = cancelToken ?? CancellationToken(); + + await _executeWithWorkerPool( + items: files, + cancelToken: effectiveCancelToken, + processItem: (file, httpClient) async { + final fileId = p.hash(file.path).toString(); + + final result = await _uploadSingleFile( + file, + deviceAssetId: fileId, + httpClient: httpClient, + cancelToken: effectiveCancelToken, + onProgress: (bytes, totalBytes) => onProgress?.call(fileId, bytes, totalBytes), + ); + + if (result.isSuccess) { + onSuccess?.call(fileId); + } else if (!result.isCancelled && result.errorMessage != null) { + onError?.call(fileId, result.errorMessage!); + } + }, + ); + } + + /// Cancel all ongoing uploads and reset the upload queue + /// + /// Return the number of left over tasks in the queue + Future cancelBackup() async { + shouldAbortQueuingTasks = true; + + await _storageRepository.clearCache(); + await _uploadRepository.reset(kBackupGroup); + await _uploadRepository.deleteDatabaseRecords(kBackupGroup); + + final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup); + return activeTasks.length; + } + + Future resumeBackup() { + return _uploadRepository.start(); } Future _uploadSingleAsset( LocalAsset asset, Client httpClient, CancellationToken cancelToken, { - required void Function(String id, String filename, int bytes, int totalBytes)? onProgress, - required void Function(String localAssetId, String remoteAssetId)? onSuccess, - required void Function(String localAssetId, String errorMessage)? onError, - required void Function(String localAssetId, double progress)? onICloudProgress, + required UploadCallbacks callbacks, }) async { File? file; File? livePhotoFile; @@ -287,7 +257,7 @@ class UploadService { progressHandler = PMProgressHandler(); progressSubscription = progressHandler.stream.listen((event) { - onICloudProgress?.call(asset.localId!, event.progress); + callbacks.onICloudProgress?.call(asset.localId!, event.progress); }); try { @@ -347,7 +317,8 @@ class UploadService { fields: fields, httpClient: httpClient, cancelToken: cancelToken, - onProgress: (bytes, totalBytes) => onProgress?.call(asset.localId!, livePhotoTitle, bytes, totalBytes), + onProgress: (bytes, totalBytes) => + callbacks.onProgress?.call(asset.localId!, livePhotoTitle, bytes, totalBytes), logContext: 'livePhotoVideo[${asset.localId}]', ); @@ -367,12 +338,13 @@ class UploadService { fields: fields, httpClient: httpClient, cancelToken: cancelToken, - onProgress: (bytes, totalBytes) => onProgress?.call(asset.localId!, originalFileName, bytes, totalBytes), + onProgress: (bytes, totalBytes) => + callbacks.onProgress?.call(asset.localId!, originalFileName, bytes, totalBytes), logContext: 'asset[${asset.localId}]', ); if (result.isSuccess && result.remoteAssetId != null) { - onSuccess?.call(asset.localId!, result.remoteAssetId!); + callbacks.onSuccess?.call(asset.localId!, result.remoteAssetId!); } else if (result.isCancelled) { _logger.warning(() => "Backup was cancelled by the user"); shouldAbortQueuingTasks = true; @@ -382,7 +354,7 @@ class UploadService { "Error(${result.statusCode}) uploading ${asset.localId} | $originalFileName | Created on ${asset.createdAt} | ${result.errorMessage}", ); - onError?.call(asset.localId!, result.errorMessage!); + callbacks.onError?.call(asset.localId!, result.errorMessage!); if (result.errorMessage == "Quota has been exceeded!") { shouldAbortQueuingTasks = true; @@ -390,7 +362,7 @@ class UploadService { } } catch (error, stackTrace) { _logger.severe(() => "Error backup asset: ${error.toString()}", stackTrace); - onError?.call(asset.localId!, error.toString()); + callbacks.onError?.call(asset.localId!, error.toString()); } finally { if (Platform.isIOS) { try { @@ -403,86 +375,8 @@ class UploadService { } } - /// Cancel all ongoing uploads and reset the upload queue - /// - /// Return the number of left over tasks in the queue - Future cancelBackup() async { - shouldAbortQueuingTasks = true; - - await _storageRepository.clearCache(); - await _uploadRepository.reset(kBackupGroup); - await _uploadRepository.deleteDatabaseRecords(kBackupGroup); - - final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup); - return activeTasks.length; - } - - Future resumeBackup() { - return _uploadRepository.start(); - } - - /// Upload multiple files using foreground HTTP with concurrent workers - /// This is used for share intent uploads - Future uploadFilesWithHttp( - List files, { - CancellationToken? cancelToken, - void Function(String fileId, int bytes, int totalBytes)? onProgress, - void Function(String fileId)? onSuccess, - void Function(String fileId, String errorMessage)? onError, - }) async { - if (files.isEmpty) { - return; - } - - const concurrentUploads = 3; - final httpClients = List.generate(concurrentUploads, (_) => Client()); - final effectiveCancelToken = cancelToken ?? CancellationToken(); - - try { - int currentIndex = 0; - - Future worker(Client httpClient) async { - while (true) { - if (effectiveCancelToken.isCancelled) break; - - final index = currentIndex; - if (index >= files.length) break; - currentIndex++; - - final file = files[index]; - final fileId = p.hash(file.path).toString(); - - final result = await _uploadSingleFileWithHttp( - file, - deviceAssetId: fileId, - httpClient: httpClient, - cancelToken: effectiveCancelToken, - onProgress: (bytes, totalBytes) => onProgress?.call(fileId, bytes, totalBytes), - ); - - if (result.isSuccess) { - onSuccess?.call(fileId); - } else if (!result.isCancelled && result.errorMessage != null) { - onError?.call(fileId, result.errorMessage!); - } - } - } - - final workerFutures = >[]; - for (int i = 0; i < concurrentUploads; i++) { - workerFutures.add(worker(httpClients[i])); - } - - await Future.wait(workerFutures); - } finally { - for (final client in httpClients) { - client.close(); - } - } - } - /// Upload a single file using foreground HTTP upload - Future _uploadSingleFileWithHttp( + Future _uploadSingleFile( File file, { required String deviceAssetId, required Client httpClient, @@ -576,6 +470,63 @@ class UploadService { } } + /// Generic worker pool for concurrent uploads + /// + /// [items] - List of items to process + /// [cancelToken] - Token to cancel the operation + /// [processItem] - Function to process each item with an HTTP client + /// [shouldSkip] - Optional function to skip items (e.g., WiFi requirement check) + /// [concurrentWorkers] - Number of concurrent workers (default: 3) + Future _executeWithWorkerPool({ + required List items, + required CancellationToken cancelToken, + required Future Function(T item, Client httpClient) processItem, + bool Function(T item)? shouldSkip, + int concurrentWorkers = 3, + }) async { + final httpClients = List.generate(concurrentWorkers, (_) => Client()); + + await _storageRepository.clearCache(); + shouldAbortQueuingTasks = false; + + try { + int currentIndex = 0; + + Future worker(Client httpClient) async { + while (true) { + if (shouldAbortQueuingTasks || cancelToken.isCancelled) { + break; + } + + final index = currentIndex; + if (index >= items.length) { + break; + } + currentIndex++; + + final item = items[index]; + + if (shouldSkip?.call(item) ?? false) { + continue; + } + + await processItem(item, httpClient); + } + } + + final workerFutures = >[]; + for (int i = 0; i < concurrentWorkers; i++) { + workerFutures.add(worker(httpClients[i])); + } + + await Future.wait(workerFutures); + } finally { + for (final client in httpClients) { + client.close(); + } + } + } + @visibleForTesting Future getUploadTask(LocalAsset asset, {String group = kBackupGroup, int? priority}) async { final entity = await _storageRepository.getAssetEntityForAsset(asset);