This commit is contained in:
Alex Tran
2026-01-12 20:26:55 -06:00
parent d35e73939f
commit fb66c5caa9
6 changed files with 181 additions and 226 deletions

View File

@@ -249,7 +249,7 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
final networkCapabilities = await _ref?.read(connectivityApiProvider).getCapabilities() ?? [];
return _ref
?.read(uploadServiceProvider)
.startUploadWithHttp(currentUser.id, networkCapabilities.isUnmetered, _cancellationToken);
.uploadBackupCandidates(currentUser.id, networkCapabilities.isUnmetered, _cancellationToken);
},
(error, stack) {
dPrint(() => "Error in backup zone $error, $stack");

View File

@@ -79,7 +79,7 @@ class DriftEditImagePage extends ConsumerWidget {
return;
}
await ref.read(uploadServiceProvider).manualBackup([localAsset], CancellationToken());
await ref.read(uploadServiceProvider).uploadLocalAssets([localAsset], CancellationToken());
} catch (e) {
ImmichToast.show(
durationInSecond: 6,

View File

@@ -64,7 +64,7 @@ class ShareIntentUploadStateNotifier extends StateNotifier<List<ShareIntentAttac
_updateStatus(fileId, UploadStatus.running);
}
await _uploadService.uploadFilesWithHttp(
await _uploadService.uploadExternalFiles(
files,
onProgress: (fileId, bytes, totalBytes) {
final progress = totalBytes > 0 ? bytes / totalBytes : 0.0;

View File

@@ -403,14 +403,16 @@ class DriftBackupNotifier extends StateNotifier<DriftBackupState> {
final hasWifi = networkCapabilities.isUnmetered;
_logger.info('Network capabilities: $networkCapabilities, hasWifi/isUnmetered: $hasWifi');
return _uploadService.startUploadWithHttp(
return _uploadService.uploadBackupCandidates(
userId,
hasWifi,
cancelToken,
onProgress: _handleForegroundBackupProgress,
onSuccess: _handleForegroundBackupSuccess,
onError: _handleForegroundBackupError,
onICloudProgress: _handleICloudProgress,
callbacks: UploadCallbacks(
onProgress: _handleForegroundBackupProgress,
onSuccess: _handleForegroundBackupSuccess,
onError: _handleForegroundBackupError,
onICloudProgress: _handleICloudProgress,
),
);
}

View File

@@ -426,19 +426,21 @@ class ActionNotifier extends Notifier<void> {
}
try {
await _uploadService.manualBackup(
await _uploadService.uploadLocalAssets(
assetsToUpload,
cancelToken,
onProgress: (localAssetId, filename, bytes, totalBytes) {
final progress = totalBytes > 0 ? bytes / totalBytes : 0.0;
progressNotifier.setProgress(localAssetId, progress);
},
onSuccess: (localAssetId, remoteAssetId) {
progressNotifier.remove(localAssetId);
},
onError: (localAssetId, errorMessage) {
progressNotifier.setError(localAssetId);
},
callbacks: UploadCallbacks(
onProgress: (localAssetId, filename, bytes, totalBytes) {
final progress = totalBytes > 0 ? bytes / totalBytes : 0.0;
progressNotifier.setProgress(localAssetId, progress);
},
onSuccess: (localAssetId, remoteAssetId) {
progressNotifier.remove(localAssetId);
},
onError: (localAssetId, errorMessage) {
progressNotifier.setError(localAssetId);
},
),
);
return ActionResult(count: assetsToUpload.length, success: true);
} catch (error, stack) {

View File

@@ -26,6 +26,15 @@ import 'package:logging/logging.dart';
import 'package:path/path.dart' as p;
import 'package:photo_manager/photo_manager.dart' show PMProgressHandler;
class UploadCallbacks {
final void Function(String id, String filename, int bytes, int totalBytes)? onProgress;
final void Function(String localId, String remoteId)? onSuccess;
final void Function(String id, String errorMessage)? onError;
final void Function(String id, double progress)? onICloudProgress;
const UploadCallbacks({this.onProgress, this.onSuccess, this.onError, this.onICloudProgress});
}
final uploadServiceProvider = Provider((ref) {
final service = UploadService(
ref.watch(uploadRepositoryProvider),
@@ -99,67 +108,6 @@ class UploadService {
return _backupRepository.getAllCounts(userId);
}
Future<void> manualBackup(
List<LocalAsset> localAssets,
CancellationToken cancelToken, {
void Function(String localAssetId, String filename, int bytes, int totalBytes)? onProgress,
void Function(String localAssetId, String remoteAssetId)? onSuccess,
void Function(String localAssetId, String errorMessage)? onError,
void Function(String localAssetId, double progress)? onICloudProgress,
}) async {
if (localAssets.isEmpty) {
return;
}
const concurrentUploads = 3;
final httpClients = List.generate(concurrentUploads, (_) => Client());
await _storageRepository.clearCache();
shouldAbortQueuingTasks = false;
try {
int currentIndex = 0;
Future<void> worker(Client httpClient) async {
while (true) {
if (shouldAbortQueuingTasks || cancelToken.isCancelled) {
break;
}
final index = currentIndex;
if (index >= localAssets.length) {
break;
}
currentIndex++;
final asset = localAssets[index];
await _uploadSingleAsset(
asset,
httpClient,
cancelToken,
onProgress: onProgress,
onSuccess: onSuccess,
onError: onError,
onICloudProgress: onICloudProgress,
);
}
}
final workerFutures = <Future<void>>[];
for (int i = 0; i < concurrentUploads; i++) {
workerFutures.add(worker(httpClients[i]));
}
await Future.wait(workerFutures);
} finally {
for (final client in httpClients) {
client.close();
}
}
}
/// Find backup candidates
/// Build the upload tasks
/// Enqueue the tasks
@@ -188,84 +136,106 @@ class UploadService {
}
}
Future<void> startUploadWithHttp(
/// Upload backup candidates from database (auto/background backup)
Future<void> uploadBackupCandidates(
String userId,
bool hasWifi,
CancellationToken cancelToken, {
void Function(String localAssetId, String filename, int bytes, int totalBytes)? onProgress,
void Function(String localAssetId, String remoteAssetId)? onSuccess,
void Function(String localAssetId, String errorMessage)? onError,
void Function(String localAssetId, double progress)? onICloudProgress,
UploadCallbacks callbacks = const UploadCallbacks(),
}) async {
const concurrentUploads = 3;
final httpClients = List.generate(concurrentUploads, (_) => Client());
await _storageRepository.clearCache();
shouldAbortQueuingTasks = false;
final candidates = await _backupRepository.getCandidates(userId);
if (candidates.isEmpty) {
return;
}
try {
int currentIndex = 0;
await _executeWithWorkerPool<LocalAsset>(
items: candidates,
cancelToken: cancelToken,
shouldSkip: (asset) {
final requireWifi = _shouldRequireWiFi(asset);
return requireWifi && !hasWifi;
},
processItem: (asset, httpClient) => _uploadSingleAsset(asset, httpClient, cancelToken, callbacks: callbacks),
);
}
Future<void> worker(Client httpClient) async {
while (true) {
if (shouldAbortQueuingTasks || cancelToken.isCancelled) {
break;
}
final index = currentIndex;
if (index >= candidates.length) {
break;
}
currentIndex++;
final asset = candidates[index];
final requireWifi = _shouldRequireWiFi(asset);
if (requireWifi && !hasWifi) {
continue;
}
await _uploadSingleAsset(
asset,
httpClient,
cancelToken,
onProgress: onProgress,
onSuccess: onSuccess,
onError: onError,
onICloudProgress: onICloudProgress,
);
}
}
// Start all workers in parallel - each worker continuously pulls from the queue
final workerFutures = <Future<void>>[];
for (int i = 0; i < concurrentUploads; i++) {
workerFutures.add(worker(httpClients[i]));
}
await Future.wait(workerFutures);
} finally {
for (final client in httpClients) {
client.close();
}
/// Upload local assets from user selection (manual backup)
Future<void> uploadLocalAssets(
List<LocalAsset> localAssets,
CancellationToken cancelToken, {
UploadCallbacks callbacks = const UploadCallbacks(),
}) async {
if (localAssets.isEmpty) {
return;
}
await _executeWithWorkerPool<LocalAsset>(
items: localAssets,
cancelToken: cancelToken,
processItem: (asset, httpClient) => _uploadSingleAsset(asset, httpClient, cancelToken, callbacks: callbacks),
);
}
/// Upload external files (e.g., from share intent)
Future<void> uploadExternalFiles(
List<File> files, {
CancellationToken? cancelToken,
void Function(String fileId, int bytes, int totalBytes)? onProgress,
void Function(String fileId)? onSuccess,
void Function(String fileId, String errorMessage)? onError,
}) async {
if (files.isEmpty) {
return;
}
final effectiveCancelToken = cancelToken ?? CancellationToken();
await _executeWithWorkerPool<File>(
items: files,
cancelToken: effectiveCancelToken,
processItem: (file, httpClient) async {
final fileId = p.hash(file.path).toString();
final result = await _uploadSingleFile(
file,
deviceAssetId: fileId,
httpClient: httpClient,
cancelToken: effectiveCancelToken,
onProgress: (bytes, totalBytes) => onProgress?.call(fileId, bytes, totalBytes),
);
if (result.isSuccess) {
onSuccess?.call(fileId);
} else if (!result.isCancelled && result.errorMessage != null) {
onError?.call(fileId, result.errorMessage!);
}
},
);
}
/// Cancel all ongoing uploads and reset the upload queue
///
/// Return the number of left over tasks in the queue
Future<int> cancelBackup() async {
shouldAbortQueuingTasks = true;
await _storageRepository.clearCache();
await _uploadRepository.reset(kBackupGroup);
await _uploadRepository.deleteDatabaseRecords(kBackupGroup);
final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup);
return activeTasks.length;
}
Future<void> resumeBackup() {
return _uploadRepository.start();
}
Future<void> _uploadSingleAsset(
LocalAsset asset,
Client httpClient,
CancellationToken cancelToken, {
required void Function(String id, String filename, int bytes, int totalBytes)? onProgress,
required void Function(String localAssetId, String remoteAssetId)? onSuccess,
required void Function(String localAssetId, String errorMessage)? onError,
required void Function(String localAssetId, double progress)? onICloudProgress,
required UploadCallbacks callbacks,
}) async {
File? file;
File? livePhotoFile;
@@ -287,7 +257,7 @@ class UploadService {
progressHandler = PMProgressHandler();
progressSubscription = progressHandler.stream.listen((event) {
onICloudProgress?.call(asset.localId!, event.progress);
callbacks.onICloudProgress?.call(asset.localId!, event.progress);
});
try {
@@ -347,7 +317,8 @@ class UploadService {
fields: fields,
httpClient: httpClient,
cancelToken: cancelToken,
onProgress: (bytes, totalBytes) => onProgress?.call(asset.localId!, livePhotoTitle, bytes, totalBytes),
onProgress: (bytes, totalBytes) =>
callbacks.onProgress?.call(asset.localId!, livePhotoTitle, bytes, totalBytes),
logContext: 'livePhotoVideo[${asset.localId}]',
);
@@ -367,12 +338,13 @@ class UploadService {
fields: fields,
httpClient: httpClient,
cancelToken: cancelToken,
onProgress: (bytes, totalBytes) => onProgress?.call(asset.localId!, originalFileName, bytes, totalBytes),
onProgress: (bytes, totalBytes) =>
callbacks.onProgress?.call(asset.localId!, originalFileName, bytes, totalBytes),
logContext: 'asset[${asset.localId}]',
);
if (result.isSuccess && result.remoteAssetId != null) {
onSuccess?.call(asset.localId!, result.remoteAssetId!);
callbacks.onSuccess?.call(asset.localId!, result.remoteAssetId!);
} else if (result.isCancelled) {
_logger.warning(() => "Backup was cancelled by the user");
shouldAbortQueuingTasks = true;
@@ -382,7 +354,7 @@ class UploadService {
"Error(${result.statusCode}) uploading ${asset.localId} | $originalFileName | Created on ${asset.createdAt} | ${result.errorMessage}",
);
onError?.call(asset.localId!, result.errorMessage!);
callbacks.onError?.call(asset.localId!, result.errorMessage!);
if (result.errorMessage == "Quota has been exceeded!") {
shouldAbortQueuingTasks = true;
@@ -390,7 +362,7 @@ class UploadService {
}
} catch (error, stackTrace) {
_logger.severe(() => "Error backup asset: ${error.toString()}", stackTrace);
onError?.call(asset.localId!, error.toString());
callbacks.onError?.call(asset.localId!, error.toString());
} finally {
if (Platform.isIOS) {
try {
@@ -403,86 +375,8 @@ class UploadService {
}
}
/// Cancel all ongoing uploads and reset the upload queue
///
/// Return the number of left over tasks in the queue
Future<int> cancelBackup() async {
shouldAbortQueuingTasks = true;
await _storageRepository.clearCache();
await _uploadRepository.reset(kBackupGroup);
await _uploadRepository.deleteDatabaseRecords(kBackupGroup);
final activeTasks = await _uploadRepository.getActiveTasks(kBackupGroup);
return activeTasks.length;
}
Future<void> resumeBackup() {
return _uploadRepository.start();
}
/// Upload multiple files using foreground HTTP with concurrent workers
/// This is used for share intent uploads
Future<void> uploadFilesWithHttp(
List<File> files, {
CancellationToken? cancelToken,
void Function(String fileId, int bytes, int totalBytes)? onProgress,
void Function(String fileId)? onSuccess,
void Function(String fileId, String errorMessage)? onError,
}) async {
if (files.isEmpty) {
return;
}
const concurrentUploads = 3;
final httpClients = List.generate(concurrentUploads, (_) => Client());
final effectiveCancelToken = cancelToken ?? CancellationToken();
try {
int currentIndex = 0;
Future<void> worker(Client httpClient) async {
while (true) {
if (effectiveCancelToken.isCancelled) break;
final index = currentIndex;
if (index >= files.length) break;
currentIndex++;
final file = files[index];
final fileId = p.hash(file.path).toString();
final result = await _uploadSingleFileWithHttp(
file,
deviceAssetId: fileId,
httpClient: httpClient,
cancelToken: effectiveCancelToken,
onProgress: (bytes, totalBytes) => onProgress?.call(fileId, bytes, totalBytes),
);
if (result.isSuccess) {
onSuccess?.call(fileId);
} else if (!result.isCancelled && result.errorMessage != null) {
onError?.call(fileId, result.errorMessage!);
}
}
}
final workerFutures = <Future<void>>[];
for (int i = 0; i < concurrentUploads; i++) {
workerFutures.add(worker(httpClients[i]));
}
await Future.wait(workerFutures);
} finally {
for (final client in httpClients) {
client.close();
}
}
}
/// Upload a single file using foreground HTTP upload
Future<UploadResult> _uploadSingleFileWithHttp(
Future<UploadResult> _uploadSingleFile(
File file, {
required String deviceAssetId,
required Client httpClient,
@@ -576,6 +470,63 @@ class UploadService {
}
}
/// Generic worker pool for concurrent uploads
///
/// [items] - List of items to process
/// [cancelToken] - Token to cancel the operation
/// [processItem] - Function to process each item with an HTTP client
/// [shouldSkip] - Optional function to skip items (e.g., WiFi requirement check)
/// [concurrentWorkers] - Number of concurrent workers (default: 3)
Future<void> _executeWithWorkerPool<T>({
required List<T> items,
required CancellationToken cancelToken,
required Future<void> Function(T item, Client httpClient) processItem,
bool Function(T item)? shouldSkip,
int concurrentWorkers = 3,
}) async {
final httpClients = List.generate(concurrentWorkers, (_) => Client());
await _storageRepository.clearCache();
shouldAbortQueuingTasks = false;
try {
int currentIndex = 0;
Future<void> worker(Client httpClient) async {
while (true) {
if (shouldAbortQueuingTasks || cancelToken.isCancelled) {
break;
}
final index = currentIndex;
if (index >= items.length) {
break;
}
currentIndex++;
final item = items[index];
if (shouldSkip?.call(item) ?? false) {
continue;
}
await processItem(item, httpClient);
}
}
final workerFutures = <Future<void>>[];
for (int i = 0; i < concurrentWorkers; i++) {
workerFutures.add(worker(httpClients[i]));
}
await Future.wait(workerFutures);
} finally {
for (final client in httpClients) {
client.close();
}
}
}
@visibleForTesting
Future<UploadTask?> getUploadTask(LocalAsset asset, {String group = kBackupGroup, int? priority}) async {
final entity = await _storageRepository.getAssetEntityForAsset(asset);