Compare commits

..

7 Commits

Author SHA1 Message Date
shenlong-tanwen
c8797dedb1 fix: crash on upload 2026-01-20 23:11:05 +05:30
shenlong-tanwen
3cd2d7f657 fix: remote sync errors in backup page 2026-01-20 21:31:16 +05:30
shenlong-tanwen
3c1a5c744b fix: isolate cleanup 2026-01-20 21:07:07 +05:30
Alex
f26a5da87e fix: websocket edit task 2026-01-20 09:31:21 -06:00
Alex
d2e7bc3cfd Merge branch 'main' into feat/isolate 2026-01-20 09:26:09 -06:00
shenlong-tanwen
74d463c19c graceful exit 2026-01-19 19:53:21 +05:30
shenlong-tanwen
39b2af1940 fix: isolate management 2026-01-19 18:44:10 +05:30
21 changed files with 396 additions and 577 deletions

View File

@@ -24,12 +24,11 @@ import 'package:immich_mobile/providers/user.provider.dart';
import 'package:immich_mobile/repositories/file_media.repository.dart';
import 'package:immich_mobile/services/app_settings.service.dart';
import 'package:immich_mobile/services/auth.service.dart';
import 'package:immich_mobile/services/localization.service.dart';
import 'package:immich_mobile/services/foreground_upload.service.dart';
import 'package:immich_mobile/services/localization.service.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/utils/http_ssl_options.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
@@ -93,7 +92,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
await Future.wait(
[
loadTranslations(),
workerManagerPatch.init(dynamicSpawning: true),
_ref?.read(authServiceProvider).setOpenApiServiceEndpoint(),
// Initialize the file downloader
FileDownloader().configure(
@@ -192,25 +190,21 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
final backgroundSyncManager = _ref?.read(backgroundSyncProvider);
final nativeSyncApi = _ref?.read(nativeSyncApiProvider);
await _drift.close();
await _driftLogger.close();
_ref?.dispose();
_ref = null;
await _drift.close();
await _driftLogger.close();
_cancellationToken.cancel();
_logger.info("Cleaning up background worker");
final cleanupFutures = [
nativeSyncApi?.cancelHashing(),
workerManagerPatch.dispose().catchError((_) async {
// Discard any errors on the dispose call
return;
}),
LogService.I.dispose(),
Store.dispose(),
backgroundSyncManager?.cancel(),
backgroundSyncManager?.cancelLocal(),
];
if (_isar.isOpen) {

View File

@@ -4,7 +4,6 @@ import 'package:immich_mobile/domain/utils/migrate_cloud_ids.dart' as m;
import 'package:immich_mobile/domain/utils/sync_linked_album.dart';
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
import 'package:immich_mobile/utils/isolate.dart';
import 'package:worker_manager/worker_manager.dart';
typedef SyncCallback = void Function();
typedef SyncCallbackWithResult<T> = void Function(T result);
@@ -27,12 +26,12 @@ class BackgroundSyncManager {
final SyncCallback? onCloudIdSyncComplete;
final SyncErrorCallback? onCloudIdSyncError;
Cancelable<bool?>? _syncTask;
Cancelable<void>? _syncWebsocketTask;
Cancelable<void>? _cloudIdSyncTask;
Cancelable<void>? _deviceAlbumSyncTask;
Cancelable<void>? _linkedAlbumSyncTask;
Cancelable<void>? _hashTask;
CancellableTask<bool>? _syncTask;
CancellableTask<void>? _syncWebsocketTask;
CancellableTask<void>? _cloudIdSyncTask;
CancellableTask<void>? _deviceAlbumSyncTask;
CancellableTask<void>? _linkedAlbumSyncTask;
CancellableTask<void>? _hashTask;
BackgroundSyncManager({
this.onRemoteSyncStart,
@@ -50,59 +49,42 @@ class BackgroundSyncManager {
});
Future<void> cancel() async {
final futures = <Future>[];
if (_syncTask != null) {
futures.add(_syncTask!.future);
}
_syncTask?.cancel();
_syncTask = null;
if (_syncWebsocketTask != null) {
futures.add(_syncWebsocketTask!.future);
}
_syncWebsocketTask?.cancel();
_syncWebsocketTask = null;
if (_cloudIdSyncTask != null) {
futures.add(_cloudIdSyncTask!.future);
}
_cloudIdSyncTask?.cancel();
_cloudIdSyncTask = null;
if (_linkedAlbumSyncTask != null) {
futures.add(_linkedAlbumSyncTask!.future);
}
_linkedAlbumSyncTask?.cancel();
_linkedAlbumSyncTask = null;
try {
await Future.wait(futures);
} on CanceledError {
// Ignore cancellation errors
await Future.wait(
[
_syncTask?.future,
_syncWebsocketTask?.future,
_cloudIdSyncTask?.future,
_linkedAlbumSyncTask?.future,
].nonNulls,
);
} catch (e) {
// Ignore cancellation errors and cleanup timeouts
}
_syncTask = null;
_syncWebsocketTask = null;
_cloudIdSyncTask = null;
_linkedAlbumSyncTask = null;
}
Future<void> cancelLocal() async {
final futures = <Future>[];
if (_hashTask != null) {
futures.add(_hashTask!.future);
}
_hashTask?.cancel();
_hashTask = null;
if (_deviceAlbumSyncTask != null) {
futures.add(_deviceAlbumSyncTask!.future);
}
_deviceAlbumSyncTask?.cancel();
_deviceAlbumSyncTask = null;
try {
await Future.wait(futures);
} on CanceledError {
// Ignore cancellation errors
await Future.wait([_hashTask?.future, _deviceAlbumSyncTask?.future].nonNulls);
} catch (e) {
// Ignore cancellation errors and cleanup timeouts
}
_hashTask = null;
_deviceAlbumSyncTask = null;
}
// No need to cancel the task, as it can also be run when the user logs out
@@ -133,7 +115,8 @@ class BackgroundSyncManager {
.catchError((error) {
onLocalSyncError?.call(error.toString());
_deviceAlbumSyncTask = null;
});
})
.future;
}
Future<void> hashAssets() {
@@ -156,7 +139,8 @@ class BackgroundSyncManager {
.catchError((error) {
onHashingError?.call(error.toString());
_hashTask = null;
});
})
.future;
}
Future<bool> syncRemote() {
@@ -170,7 +154,7 @@ class BackgroundSyncManager {
computation: (ref) => ref.read(syncStreamServiceProvider).sync(),
debugLabel: 'remote-sync',
);
return _syncTask!
return _syncTask!.future
.then((result) {
final success = result ?? false;
onRemoteSyncComplete?.call(success);
@@ -193,7 +177,7 @@ class BackgroundSyncManager {
_syncWebsocketTask = _handleWsAssetUploadReadyV1Batch(batchData);
return _syncWebsocketTask!.whenComplete(() {
_syncWebsocketTask = null;
});
}).future;
}
Future<void> syncWebsocketEditBatch(List<dynamic> batchData) {
@@ -203,7 +187,7 @@ class BackgroundSyncManager {
_syncWebsocketTask = _handleWsAssetEditReadyV1Batch(batchData);
return _syncWebsocketTask!.whenComplete(() {
_syncWebsocketTask = null;
});
}).future;
}
Future<void> syncLinkedAlbum() {
@@ -214,7 +198,7 @@ class BackgroundSyncManager {
_linkedAlbumSyncTask = runInIsolateGentle(computation: syncLinkedAlbumsIsolated, debugLabel: 'linked-album-sync');
return _linkedAlbumSyncTask!.whenComplete(() {
_linkedAlbumSyncTask = null;
});
}).future;
}
Future<void> syncCloudIds() {
@@ -224,7 +208,7 @@ class BackgroundSyncManager {
onCloudIdSyncStart?.call();
_cloudIdSyncTask = runInIsolateGentle(computation: m.syncCloudIds);
_cloudIdSyncTask = runInIsolateGentle(computation: m.syncCloudIds, debugLabel: 'cloud-id-sync');
return _cloudIdSyncTask!
.whenComplete(() {
onCloudIdSyncComplete?.call();
@@ -233,16 +217,17 @@ class BackgroundSyncManager {
.catchError((error) {
onCloudIdSyncError?.call(error.toString());
_cloudIdSyncTask = null;
});
})
.future;
}
}
Cancelable<void> _handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
CancellableTask<void> _handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetUploadReadyV1Batch(batchData),
debugLabel: 'websocket-batch',
);
Cancelable<void> _handleWsAssetEditReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
CancellableTask<void> _handleWsAssetEditReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetEditReadyV1Batch(batchData),
debugLabel: 'websocket-edit',
);

View File

@@ -1,6 +1,5 @@
import 'dart:async';
import 'dart:io';
import 'dart:math';
import 'package:auto_route/auto_route.dart';
import 'package:background_downloader/background_downloader.dart';
@@ -41,7 +40,6 @@ import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/utils/http_ssl_options.dart';
import 'package:immich_mobile/utils/licenses.dart';
import 'package:immich_mobile/utils/migration.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:immich_ui/immich_ui.dart';
import 'package:intl/date_symbol_data_local.dart';
import 'package:logging/logging.dart';
@@ -53,8 +51,6 @@ void main() async {
final (isar, drift, logDb) = await Bootstrap.initDB();
await Bootstrap.initDomain(isar, drift, logDb);
await initApp();
// Warm-up isolate pool for worker manager
await workerManagerPatch.init(dynamicSpawning: true, isolatesCount: max(Platform.numberOfProcessors - 1, 5));
await migrateDatabaseIfNeeded(isar, drift);
HttpSSLOptions.apply();

View File

@@ -6,7 +6,6 @@ import 'package:immich_mobile/domain/models/setting.model.dart';
import 'package:immich_mobile/extensions/build_context_extensions.dart';
import 'package:immich_mobile/extensions/duration_extensions.dart';
import 'package:immich_mobile/extensions/theme_extensions.dart';
import 'package:immich_mobile/presentation/widgets/asset_viewer/asset_viewer.state.dart';
import 'package:immich_mobile/presentation/widgets/images/thumbnail.widget.dart';
import 'package:immich_mobile/presentation/widgets/timeline/constants.dart';
import 'package:immich_mobile/providers/backup/asset_upload_progress.provider.dart';
@@ -48,7 +47,6 @@ class _ThumbnailTileState extends ConsumerState<ThumbnailTile> {
Widget build(BuildContext context) {
final asset = widget.asset;
final heroIndex = widget.heroOffset ?? TabsRouterScope.of(context)?.controller.activeIndex ?? 0;
final isCurrentAsset = ref.watch(assetViewerProvider.select((current) => current.currentAsset == asset));
final assetContainerColor = context.isDarkTheme
? context.primaryColor.darken(amount: 0.4)
@@ -61,10 +59,6 @@ class _ThumbnailTileState extends ConsumerState<ThumbnailTile> {
final bool storageIndicator =
ref.watch(settingsProvider.select((s) => s.get(Setting.showStorageIndicator))) && widget.showStorageIndicator;
if (!isCurrentAsset) {
_hideIndicators = false;
}
if (isSelected) {
_showSelectionContainer = true;
}
@@ -102,11 +96,7 @@ class _ThumbnailTileState extends ConsumerState<ThumbnailTile> {
children: [
Positioned.fill(
child: Hero(
// This key resets the hero animation when the asset is changed in the asset viewer.
// It doesn't seem like the best solution, and only works to reset the hero, not prime the hero of the new active asset for animation,
// but other solutions have failed thus far.
key: ValueKey(isCurrentAsset),
tag: '${asset?.heroTag}_$heroIndex',
tag: '${asset?.heroTag ?? ''}_$heroIndex',
child: Thumbnail.fromAsset(asset: asset, size: widget.size),
// Placeholderbuilder used to hide indicators on first hero animation, since flightShuttleBuilder isn't called until both source and destination hero exist in widget tree.
placeholderBuilder: (context, heroSize, child) {

View File

@@ -160,7 +160,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
_resumeBackup();
}),
_resumeBackup(),
backgroundManager.syncCloudIds(),
_safeRun(backgroundManager.syncCloudIds(), "syncCloudIds"),
]);
} else {
await _safeRun(backgroundManager.hashAssets(), "hashAssets");
@@ -218,7 +218,14 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
try {
if (Store.isBetaTimelineEnabled) {
unawaited(_ref.read(backgroundWorkerLockServiceProvider).unlock());
unawaited(
Future.wait([
_ref.read(backgroundWorkerLockServiceProvider).unlock(),
_ref.read(nativeSyncApiProvider).cancelHashing(),
_ref.read(backgroundSyncProvider).cancel(),
_ref.read(backgroundSyncProvider).cancelLocal(),
]),
);
}
await _performPause();
} catch (e, stackTrace) {

View File

@@ -21,7 +21,13 @@ final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) {
backupProvider.updateError(isSuccess == true ? BackupError.none : BackupError.syncFailed);
}
},
onRemoteSyncError: syncStatusNotifier.errorRemoteSync,
onRemoteSyncError: (error) {
syncStatusNotifier.errorRemoteSync(error);
final backupProvider = ref.read(driftBackupProvider.notifier);
if (backupProvider.mounted) {
backupProvider.updateError(BackupError.syncFailed);
}
},
onLocalSyncStart: syncStatusNotifier.startLocalSync,
onLocalSyncComplete: syncStatusNotifier.completeLocalSync,
onLocalSyncError: syncStatusNotifier.errorLocalSync,

View File

@@ -1,95 +1,307 @@
import 'dart:async';
import 'dart:isolate';
import 'dart:ui';
import 'package:flutter/services.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/services/log.service.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:immich_mobile/utils/http_ssl_options.dart';
import 'package:immich_mobile/wm_executor.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
import 'package:worker_manager/worker_manager.dart';
class InvalidIsolateUsageException implements Exception {
const InvalidIsolateUsageException();
class CancellableTask<T> {
final Future<T?> future;
final void Function() cancel;
@override
String toString() => "IsolateHelper should only be used from the root isolate";
}
const CancellableTask({required this.future, required this.cancel});
// !! Should be used only from the root isolate
Cancelable<T?> runInIsolateGentle<T>({
required Future<T> Function(ProviderContainer ref) computation,
String? debugLabel,
}) {
final token = RootIsolateToken.instance;
if (token == null) {
throw const InvalidIsolateUsageException();
CancellableTask<T> whenComplete(void Function() action) {
return CancellableTask(future: future.whenComplete(action), cancel: cancel);
}
return workerManagerPatch.executeGentle((cancelledChecker) async {
T? result;
await runZonedGuarded(
() async {
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
DartPluginRegistrant.ensureInitialized();
CancellableTask<T> catchError(Function onError) {
return CancellableTask(future: future.catchError(onError), cancel: cancel);
}
final (isar, drift, logDb) = await Bootstrap.initDB();
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false, listenStoreUpdates: false);
final ref = ProviderContainer(
CancellableTask<R> then<R>(FutureOr<R> Function(T?) onValue) {
return CancellableTask(future: future.then(onValue), cancel: cancel);
}
}
sealed class _IsolateMessage {
const _IsolateMessage();
}
class _InitMessage extends _IsolateMessage {
final SendPort sendPort;
const _InitMessage(this.sendPort);
}
class _CancelMessage extends _IsolateMessage {
const _CancelMessage();
}
class _ResultMessage extends _IsolateMessage {
final dynamic data;
const _ResultMessage(this.data);
}
class _ErrorMessage extends _IsolateMessage {
final Object? error;
final StackTrace? stackTrace;
const _ErrorMessage(this.error, [this.stackTrace]);
}
class _DoneMessage extends _IsolateMessage {
const _DoneMessage();
}
class _IsolateTaskConfig<T> {
final Future<T> Function(ProviderContainer ref) computation;
final SendPort mainSendPort;
final RootIsolateToken rootToken;
final String debugLabel;
const _IsolateTaskConfig({
required this.computation,
required this.mainSendPort,
required this.rootToken,
required this.debugLabel,
});
}
class _IsolateTaskRunner<T> {
final Completer<T?> _completer = Completer<T?>();
final ReceivePort _receivePort = ReceivePort();
final String debugLabel;
Isolate? _isolate;
SendPort? _isolateSendPort;
bool _isCancelled = false;
bool _isCleanedUp = false;
Timer? _cleanupTimeoutTimer;
_IsolateTaskRunner({required this.debugLabel});
Future<void> start(Future<T> Function(ProviderContainer ref) computation) async {
final token = RootIsolateToken.instance;
if (token == null) {
_completer.completeError(Exception("RootIsolateToken is not available. Isolate cannot be started."));
return;
}
_receivePort.listen(_handleMessage);
final config = _IsolateTaskConfig<T>(
computation: computation,
mainSendPort: _receivePort.sendPort,
rootToken: token,
debugLabel: debugLabel,
);
try {
_isolate = await Isolate.spawn(_isolateEntryPoint<T>, config, debugName: debugLabel);
} catch (error, stack) {
_completer.completeError(error, stack);
_cleanup();
}
}
void cancel() {
if (_isCancelled || _isCleanedUp) return;
_isCancelled = true;
dPrint(() => "[$debugLabel] Cancellation requested");
_isolateSendPort?.send(const _CancelMessage());
_cleanupTimeoutTimer = Timer(const Duration(seconds: 4), () {
if (!_isCleanedUp) {
dPrint(() => "[$debugLabel] Cleanup timeout - force killing isolate");
_isolate?.kill(priority: Isolate.immediate);
if (!_completer.isCompleted) {
_completer.completeError(Exception("Isolate cleanup timed out for task: $debugLabel"));
}
_cleanup();
}
});
}
void _handleMessage(dynamic message) {
if (message is! _IsolateMessage) return;
switch (message) {
case _InitMessage(:var sendPort):
_isolateSendPort = sendPort;
dPrint(() => "[$debugLabel] Isolate initialized");
break;
case _ResultMessage(:var data):
_cleanup();
if (!_completer.isCompleted) {
_completer.complete(data as T?);
dPrint(() => "[$debugLabel] Isolate task completed with result - $data");
}
break;
case _ErrorMessage(:var error, :var stackTrace):
_cleanup();
if (!_completer.isCompleted) {
dPrint(() => "[$debugLabel] Isolate task completed with error - $error");
_completer.completeError(error ?? Exception("Unknown error in isolate"), stackTrace ?? StackTrace.current);
}
break;
case _DoneMessage():
dPrint(() => "[$debugLabel] Isolate cleanup completed");
_cleanup();
break;
case _CancelMessage():
// Not expected to receive cancel from isolate
break;
}
}
void _cleanup() {
if (_isCleanedUp) return;
_isCleanedUp = true;
_cleanupTimeoutTimer?.cancel();
_receivePort.close();
_isolate?.kill(priority: Isolate.immediate);
_isolate = null;
_isolateSendPort = null;
dPrint(() => "[$debugLabel] Isolate cleaned up");
}
Future<T?> get future => _completer.future;
}
Future<void> _cleanupResources<T>(ProviderContainer? ref, Isar isar, Drift drift, DriftLogger logDb) async {
try {
final cleanupFutures = <Future>[
Store.dispose(),
LogService.I.dispose(),
logDb.close(),
drift.close(),
if (isar.isOpen) isar.close().catchError((_) => false),
];
ref?.dispose();
await Future.wait(cleanupFutures).timeout(
const Duration(seconds: 2),
onTimeout: () {
dPrint(() => "Cleanup timeout - some resources may not be closed");
return [];
},
);
} catch (error, stack) {
dPrint(() => "Error during isolate cleanup: $error with stack: $stack");
}
}
Future<void> _isolateEntryPoint<T>(_IsolateTaskConfig<T> config) async {
final receivePort = ReceivePort();
config.mainSendPort.send(_InitMessage(receivePort.sendPort));
bool isCancelled = false;
ProviderContainer? ref;
final Isar isar;
final Drift drift;
final DriftLogger logDb;
try {
BackgroundIsolateBinaryMessenger.ensureInitialized(config.rootToken);
DartPluginRegistrant.ensureInitialized();
final (bootIsar, bootDrift, bootLogDb) = await Bootstrap.initDB();
await Bootstrap.initDomain(bootIsar, bootDrift, bootLogDb, shouldBufferLogs: false, listenStoreUpdates: false);
isar = bootIsar;
drift = bootDrift;
logDb = bootLogDb;
} catch (error, stack) {
dPrint(() => "[$config.debugLabel] Error during isolate bootstrap: $error");
config.mainSendPort.send(_ErrorMessage(error, stack));
return;
}
final subscription = receivePort.listen((message) async {
if (message is _CancelMessage) {
isCancelled = true;
try {
receivePort.close();
await _cleanupResources(ref, isar, drift, logDb);
} catch (error, stack) {
dPrint(() => "Error during isolate cancellation cleanup: $error with stack: $stack");
} finally {
config.mainSendPort.send(const _ErrorMessage("Isolate task cancelled"));
}
}
});
final log = Logger("IsolateWorker[${config.debugLabel}]");
await runZonedGuarded(
() async {
try {
ref = ProviderContainer(
overrides: [
// TODO: Remove once isar is removed
dbProvider.overrideWithValue(isar),
isarProvider.overrideWithValue(isar),
cancellationProvider.overrideWithValue(cancelledChecker),
cancellationProvider.overrideWithValue(() => isCancelled),
driftProvider.overrideWith(driftOverride(drift)),
],
);
Logger log = Logger("IsolateLogger");
HttpSSLOptions.apply(applyNative: false);
final result = await config.computation(ref!);
try {
HttpSSLOptions.apply(applyNative: false);
result = await computation(ref);
} on CanceledError {
log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
} catch (error, stack) {
log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
} finally {
try {
ref.dispose();
await Store.dispose();
await LogService.I.dispose();
await logDb.close();
await drift.close();
// Close Isar safely
try {
if (isar.isOpen) {
await isar.close();
}
} catch (e) {
dPrint(() => "Error closing Isar: $e");
}
} catch (error, stack) {
dPrint(() => "Error closing resources in isolate: $error, $stack");
} finally {
ref.dispose();
// Delay to ensure all resources are released
await Future.delayed(const Duration(seconds: 2));
}
if (!isCancelled) {
config.mainSendPort.send(_ResultMessage(result));
} else {
log.fine("Task completed but was cancelled - not sending result");
}
},
(error, stack) {
dPrint(() => "Error in isolate $debugLabel zone: $error, $stack");
},
);
return result;
});
} catch (error, stack) {
log.severe("Error in isolate execution", error, stack);
config.mainSendPort.send(_ErrorMessage(error, stack));
} finally {
try {
receivePort.close();
unawaited(subscription.cancel());
await _cleanupResources(ref, isar, drift, logDb);
} catch (error, stack) {
dPrint(() => "Error during isolate cleanup: $error with stack: $stack");
} finally {
unawaited(subscription.cancel());
config.mainSendPort.send(const _DoneMessage());
}
}
},
(error, stack) async {
dPrint(() => "Uncaught error in isolate zone: $error, $stack");
receivePort.close();
unawaited(subscription.cancel());
await _cleanupResources(ref, isar, drift, logDb);
config.mainSendPort.send(_ErrorMessage(error, stack));
},
);
}
CancellableTask<T> runInIsolateGentle<T>({
required Future<T> Function(ProviderContainer ref) computation,
String? debugLabel,
}) {
final runner = _IsolateTaskRunner<T>(
debugLabel: debugLabel ?? 'isolate-task-${DateTime.now().millisecondsSinceEpoch}',
)..start(computation);
return CancellableTask<T>(future: runner.future, cancel: runner.cancel);
}

View File

@@ -1,251 +0,0 @@
// part of 'package:worker_manager/worker_manager.dart';
// ignore_for_file: implementation_imports, avoid_print
import 'dart:async';
import 'dart:math';
import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';
import 'package:worker_manager/src/number_of_processors/processors_io.dart';
import 'package:worker_manager/src/worker/worker.dart';
import 'package:worker_manager/worker_manager.dart';
final workerManagerPatch = _Executor();
// [-2^54; 2^53] is compatible with dart2js, see core.int doc
const _minId = -9007199254740992;
const _maxId = 9007199254740992;
class Mixinable<T> {
late final itSelf = this as T;
}
mixin _ExecutorLogger on Mixinable<_Executor> {
var log = false;
@mustCallSuper
void init() {
logMessage("${itSelf._isolatesCount} workers have been spawned and initialized");
}
void logTaskAdded<R>(String uid) {
logMessage("added task with number $uid");
}
@mustCallSuper
void dispose() {
logMessage("worker_manager have been disposed");
}
@mustCallSuper
void _cancel(Task task) {
logMessage("Task ${task.id} have been canceled");
}
void logMessage(String message) {
if (log) print(message);
}
}
class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
final _queue = PriorityQueue<Task>();
final _pool = <Worker>[];
var _nextTaskId = _minId;
var _dynamicSpawning = false;
var _isolatesCount = numberOfProcessors;
@override
Future<void> init({int? isolatesCount, bool? dynamicSpawning}) async {
if (_pool.isNotEmpty) {
print("worker_manager already warmed up, init is ignored. Dispose before init");
return;
}
if (isolatesCount != null) {
if (isolatesCount < 0) {
throw Exception("isolatesCount must be greater than 0");
}
_isolatesCount = isolatesCount;
}
_dynamicSpawning = dynamicSpawning ?? false;
await _ensureWorkersInitialized();
super.init();
}
@override
Future<void> dispose() async {
_queue.clear();
for (final worker in _pool) {
worker.kill();
}
_pool.clear();
super.dispose();
}
Cancelable<R> execute<R>(Execute<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
return _createCancelable<R>(execution: execution, priority: priority);
}
Cancelable<R> executeNow<R>(ExecuteGentle<R> execution) {
final task = TaskGentle<R>(
id: "",
workPriority: WorkPriority.immediately,
execution: execution,
completer: Completer<R>(),
);
Future<void> run() async {
try {
final result = await execution(() => task.canceled);
task.complete(result, null, null);
} catch (error, st) {
task.complete(null, error, st);
}
}
run();
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
}
Cancelable<R> executeWithPort<R, T>(
ExecuteWithPort<R> execution, {
WorkPriority priority = WorkPriority.immediately,
required void Function(T value) onMessage,
}) {
return _createCancelable<R>(
execution: execution,
priority: priority,
onMessage: (message) => onMessage(message as T),
);
}
Cancelable<R> executeGentle<R>(ExecuteGentle<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
return _createCancelable<R>(execution: execution, priority: priority);
}
Cancelable<R> executeGentleWithPort<R, T>(
ExecuteGentleWithPort<R> execution, {
WorkPriority priority = WorkPriority.immediately,
required void Function(T value) onMessage,
}) {
return _createCancelable<R>(
execution: execution,
priority: priority,
onMessage: (message) => onMessage(message as T),
);
}
void _createWorkers() {
for (var i = 0; i < _isolatesCount; i++) {
_pool.add(Worker());
}
}
Future<void> _initializeWorkers() async {
await Future.wait(_pool.map((e) => e.initialize()));
}
Cancelable<R> _createCancelable<R>({
required Function execution,
WorkPriority priority = WorkPriority.immediately,
void Function(Object value)? onMessage,
}) {
if (_nextTaskId + 1 == _maxId) {
_nextTaskId = _minId;
}
final id = _nextTaskId.toString();
_nextTaskId++;
late final Task<R> task;
final completer = Completer<R>();
if (execution is Execute<R>) {
task = TaskRegular<R>(id: id, workPriority: priority, execution: execution, completer: completer);
} else if (execution is ExecuteWithPort<R>) {
task = TaskWithPort<R>(
id: id,
workPriority: priority,
execution: execution,
completer: completer,
onMessage: onMessage!,
);
} else if (execution is ExecuteGentle<R>) {
task = TaskGentle<R>(id: id, workPriority: priority, execution: execution, completer: completer);
} else if (execution is ExecuteGentleWithPort<R>) {
task = TaskGentleWithPort<R>(
id: id,
workPriority: priority,
execution: execution,
completer: completer,
onMessage: onMessage!,
);
}
_queue.add(task);
_schedule();
logTaskAdded(task.id);
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
}
Future<void> _ensureWorkersInitialized() async {
if (_pool.isEmpty) {
_createWorkers();
if (!_dynamicSpawning) {
await _initializeWorkers();
final poolSize = _pool.length;
final queueSize = _queue.length;
for (int i = 0; i <= min(poolSize, queueSize); i++) {
_schedule();
}
}
}
if (_pool.every((worker) => worker.taskId != null)) {
return;
}
if (_dynamicSpawning) {
final freeWorker = _pool.firstWhereOrNull(
(worker) => worker.taskId == null && !worker.initialized && !worker.initializing,
);
await freeWorker?.initialize();
_schedule();
}
}
void _schedule() {
final availableWorker = _pool.firstWhereOrNull((worker) => worker.taskId == null && worker.initialized);
if (availableWorker == null) {
_ensureWorkersInitialized();
return;
}
if (_queue.isEmpty) return;
final task = _queue.removeFirst();
availableWorker
.work(task)
.then(
(value) {
//could be completed already by cancel and it is normal.
//Assuming that worker finished with error and cleaned gracefully
task.complete(value, null, null);
},
onError: (error, st) {
task.complete(null, error, st);
},
)
.whenComplete(() {
if (_dynamicSpawning && _queue.isEmpty) availableWorker.kill();
_schedule();
});
}
@override
void _cancel(Task task) {
task.cancel();
_queue.remove(task);
final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id);
if (task is Gentle) {
targetWorker?.cancelGentle();
} else {
targetWorker?.kill();
if (!_dynamicSpawning) targetWorker?.initialize();
}
super._cancel(task);
}
}

View File

@@ -2154,14 +2154,6 @@ packages:
url: "https://pub.dev"
source: hosted
version: "0.0.3"
worker_manager:
dependency: "direct main"
description:
name: worker_manager
sha256: "1bce9f894a0c187856f5fc0e150e7fe1facce326f048ca6172947754dac3d4f3"
url: "https://pub.dev"
source: hosted
version: "7.2.7"
xdg_directories:
dependency: transitive
description:

View File

@@ -85,7 +85,6 @@ dependencies:
url_launcher: ^6.3.2
uuid: ^4.5.1
wakelock_plus: ^1.3.0
worker_manager: ^7.2.7
dev_dependencies:
auto_route_generator: ^9.0.0

View File

@@ -1,6 +1,6 @@
{
"name": "immich-core",
"version": "2.4.1",
"version": "2.0.1",
"title": "Immich Core",
"description": "Core workflow capabilities for Immich",
"author": "Immich Team",
@@ -12,7 +12,9 @@
"methodName": "filterFileName",
"title": "Filter by filename",
"description": "Filter assets by filename pattern using text matching or regular expressions",
"supportedContexts": ["asset"],
"supportedContexts": [
"asset"
],
"schema": {
"type": "object",
"properties": {
@@ -24,7 +26,11 @@
"matchType": {
"type": "string",
"title": "Match type",
"enum": ["contains", "regex", "exact"],
"enum": [
"contains",
"regex",
"exact"
],
"default": "contains",
"description": "Type of pattern matching to perform"
},
@@ -34,14 +40,18 @@
"description": "Whether matching should be case-sensitive"
}
},
"required": ["pattern"]
"required": [
"pattern"
]
}
},
{
"methodName": "filterFileType",
"title": "Filter by file type",
"description": "Filter assets by file type",
"supportedContexts": ["asset"],
"supportedContexts": [
"asset"
],
"schema": {
"type": "object",
"properties": {
@@ -50,19 +60,26 @@
"title": "File types",
"items": {
"type": "string",
"enum": ["image", "video"]
"enum": [
"image",
"video"
]
},
"description": "Allowed file types"
}
},
"required": ["fileTypes"]
"required": [
"fileTypes"
]
}
},
{
"methodName": "filterPerson",
"title": "Filter by person",
"description": "Filter assets by detected people in the photo",
"supportedContexts": ["person"],
"description": "Filter by detected person",
"supportedContexts": [
"person"
],
"schema": {
"type": "object",
"properties": {
@@ -75,14 +92,15 @@
"description": "List of person to match",
"subType": "people-picker"
},
"matchMode": {
"type": "string",
"title": "Match mode",
"enum": ["any", "all", "exact"],
"default": "any"
"matchAny": {
"type": "boolean",
"default": true,
"description": "Match any name (true) or require all names (false)"
}
},
"required": ["personIds"]
"required": [
"personIds"
]
}
}
],
@@ -91,14 +109,18 @@
"methodName": "actionArchive",
"title": "Archive",
"description": "Move the asset to archive",
"supportedContexts": ["asset"],
"supportedContexts": [
"asset"
],
"schema": {}
},
{
"methodName": "actionFavorite",
"title": "Favorite",
"description": "Mark the asset as favorite or unfavorite",
"supportedContexts": ["asset"],
"supportedContexts": [
"asset"
],
"schema": {
"type": "object",
"properties": {
@@ -114,7 +136,10 @@
"methodName": "actionAddToAlbum",
"title": "Add to Album",
"description": "Add the item to a specified album",
"supportedContexts": ["asset", "person"],
"supportedContexts": [
"asset",
"person"
],
"schema": {
"type": "object",
"properties": {
@@ -125,7 +150,9 @@
"subType": "album-picker"
}
},
"required": ["albumId"]
"required": [
"albumId"
]
}
}
]

View File

@@ -1,6 +1,5 @@
declare module 'main' {
export function filterFileName(): I32;
export function filterPerson(): I32;
export function actionAddToAlbum(): I32;
export function actionArchive(): I32;
}
@@ -9,6 +8,5 @@ declare module 'extism:host' {
interface user {
updateAsset(ptr: PTR): I32;
addAssetToAlbum(ptr: PTR): I32;
getFacesForAsset(ptr: PTR): PTR;
}
}

View File

@@ -1,4 +1,4 @@
const { updateAsset, addAssetToAlbum, getFacesForAsset } = Host.getFunctions();
const { updateAsset, addAssetToAlbum } = Host.getFunctions();
function parseInput() {
return JSON.parse(Host.inputString());
@@ -9,64 +9,6 @@ function returnOutput(output: any) {
return 0;
}
/**
* Filter by person - checks if the recognized person matches the configured person IDs.
*
* For PersonRecognized trigger:
* - data.personId contains the ID of the person that was just recognized
* - Checks if personId is in the configured list
*
* matchMode options:
* - 'any': passes if the triggering person is in the list
* - 'all': passes if all configured persons are present in the asset
* - 'exact': passes if the asset contains exactly the configured persons
*/
export function filterPerson() {
const input = parseInput();
const { authToken, data, config } = input;
const { personIds, matchMode = 'any' } = config;
if (!personIds || personIds.length === 0) {
return returnOutput({ passed: true });
}
const triggerPersonId = data.personId;
if (matchMode === 'any') {
const passed = triggerPersonId && personIds.includes(triggerPersonId);
return returnOutput({ passed });
}
const payload = Memory.fromJsonObject({
authToken,
assetId: data.asset.id,
});
const resultPtr = getFacesForAsset(payload.offset);
payload.free();
const faces = JSON.parse(Memory.find(resultPtr).readJsonObject());
const assetPersonIds: string[] = faces
.filter((face: { personId: string | null }) => face.personId !== null)
.map((face: { personId: string }) => face.personId);
let passed = false;
if (matchMode === 'all') {
passed = personIds.every((id: string) => assetPersonIds.includes(id));
} else if (matchMode === 'exact') {
const uniquePersonIds = new Set(personIds);
const uniqueAssetPersonIds = new Set(assetPersonIds);
passed =
uniquePersonIds.size === uniqueAssetPersonIds.size &&
personIds.every((id: string) => uniqueAssetPersonIds.has(id));
}
return returnOutput({ passed });
}
export function filterFileName() {
const input = parseInput();
const { data, config } = input;
@@ -102,7 +44,7 @@ export function actionAddToAlbum() {
authToken,
assetId: data.asset.id,
albumId: albumId,
}),
})
);
addAssetToAlbum(ptr.offset);
@@ -119,7 +61,7 @@ export function actionArchive() {
authToken,
id: data.asset.id,
visibility: 'archive',
}),
})
);
updateAsset(ptr.offset);

View File

@@ -122,7 +122,6 @@ select
"asset_face"."id",
"asset_face"."personId",
"asset_face"."sourceType",
"asset_face"."assetId",
(
select
to_json(obj)

View File

@@ -318,7 +318,6 @@ export class AlbumRepository {
await db
.insertInto('album_asset')
.values(assetIds.map((assetId) => ({ albumId, assetId })))
.onConflict((oc) => oc.columns(['albumId', 'assetId']).doNothing())
.execute();
}
@@ -327,11 +326,7 @@ export class AlbumRepository {
if (values.length === 0) {
return;
}
await this.db
.insertInto('album_asset')
.values(values)
.onConflict((oc) => oc.columns(['albumId', 'assetId']).doNothing())
.execute();
await this.db.insertInto('album_asset').values(values).execute();
}
/**

View File

@@ -44,7 +44,6 @@ type EventMap = {
// asset events
AssetCreate: [{ asset: Asset }];
PersonRecognized: [{ assetId: string; ownerId: string; personId: string }];
AssetTag: [{ assetId: string }];
AssetUntag: [{ assetId: string }];
AssetHide: [{ assetId: string; userId: string }];

View File

@@ -248,7 +248,7 @@ export class PersonRepository {
getFaceForFacialRecognitionJob(id: string) {
return this.db
.selectFrom('asset_face')
.select(['asset_face.id', 'asset_face.personId', 'asset_face.sourceType', 'asset_face.assetId'])
.select(['asset_face.id', 'asset_face.personId', 'asset_face.sourceType'])
.select((eb) =>
jsonObjectFrom(
eb

View File

@@ -540,12 +540,6 @@ export class PersonService extends BaseService {
if (personId) {
this.logger.debug(`Assigning face ${id} to person ${personId}`);
await this.personRepository.reassignFaces({ faceIds: [id], newPersonId: personId });
await this.eventRepository.emit('PersonRecognized', {
assetId: face.assetId,
ownerId: face.asset.ownerId,
personId,
});
}
return JobStatus.Success;

View File

@@ -7,7 +7,6 @@ import { AlbumRepository } from 'src/repositories/album.repository';
import { AssetRepository } from 'src/repositories/asset.repository';
import { CryptoRepository } from 'src/repositories/crypto.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { PersonRepository } from 'src/repositories/person.repository';
import { AssetTable } from 'src/schema/tables/asset.table';
import { requireAccess } from 'src/utils/access';
@@ -21,7 +20,6 @@ export class PluginHostFunctions {
private albumRepository: AlbumRepository,
private accessRepository: AccessRepository,
private cryptoRepository: CryptoRepository,
private personRepository: PersonRepository,
private logger: LoggingRepository,
private pluginJwtSecret: string,
) {}
@@ -35,7 +33,6 @@ export class PluginHostFunctions {
'extism:host/user': {
updateAsset: (cp: CurrentPlugin, offs: bigint) => this.handleUpdateAsset(cp, offs),
addAssetToAlbum: (cp: CurrentPlugin, offs: bigint) => this.handleAddAssetToAlbum(cp, offs),
getFacesForAsset: (cp: CurrentPlugin, offs: bigint) => this.handleGetFacesForAsset(cp, offs),
},
};
}
@@ -120,28 +117,4 @@ export class PluginHostFunctions {
await this.albumRepository.addAssetIds(albumId, [assetId]);
return 0;
}
/**
* Host function wrapper for getFacesForAsset.
* Reads the input from the plugin, parses it, and returns faces data.
*/
private async handleGetFacesForAsset(cp: CurrentPlugin, offs: bigint) {
const input = JSON.parse(cp.read(offs)!.text());
const result = await this.getFacesForAsset(input);
return cp.store(JSON.stringify(result));
}
async getFacesForAsset(input: { authToken: string; assetId: string }) {
const { authToken, assetId } = input;
const auth = this.validateToken(authToken);
await requireAccess(this.accessRepository, {
auth: { user: { id: auth.userId } } as any,
permission: Permission.AssetRead,
ids: [assetId],
});
return this.personRepository.getFaces(assetId);
}
}

View File

@@ -17,7 +17,6 @@ import { IWorkflowJob, JobItem, JobOf, WorkflowData } from 'src/types';
interface WorkflowContext {
authToken: string;
asset: Asset;
personId?: string;
}
interface PluginInput<T = unknown> {
@@ -25,7 +24,6 @@ interface PluginInput<T = unknown> {
config: T;
data: {
asset: Asset;
personId?: string;
};
}
@@ -46,7 +44,6 @@ export class PluginService extends BaseService {
this.albumRepository,
this.accessRepository,
this.cryptoRepository,
this.personRepository,
this.logger,
this.pluginJwtSecret,
);
@@ -120,9 +117,7 @@ export class PluginService extends BaseService {
private async loadPluginToDatabase(manifest: PluginManifestDto, basePath: string): Promise<void> {
const currentPlugin = await this.pluginRepository.getPluginByName(manifest.name);
const isDev = this.configRepository.isDev();
if (currentPlugin != null && currentPlugin.version === manifest.version && !isDev) {
if (currentPlugin != null && currentPlugin.version === manifest.version) {
this.logger.log(`Plugin ${manifest.name} is up to date (version ${manifest.version}). Skipping`);
return;
}
@@ -183,14 +178,6 @@ export class PluginService extends BaseService {
});
}
@OnEvent({ name: 'PersonRecognized' })
async handlePersonRecognized({ assetId, ownerId, personId }: ArgOf<'PersonRecognized'>) {
await this.handleTrigger(PluginTriggerType.PersonRecognized, {
ownerId,
event: { userId: ownerId, assetId, personId },
});
}
private async handleTrigger<T extends PluginTriggerType>(
triggerType: T,
params: { ownerId: string; event: WorkflowData[T] },
@@ -232,7 +219,7 @@ export class PluginService extends BaseService {
const authToken = this.cryptoRepository.signJwt({ userId: data.userId }, this.pluginJwtSecret);
const context: WorkflowContext = {
const context = {
authToken,
asset,
};
@@ -243,35 +230,13 @@ export class PluginService extends BaseService {
}
await this.executeActions(workflowActions, context);
this.logger.debug(`Workflow ${workflowId} executed successfully for AssetCreate`);
this.logger.debug(`Workflow ${workflowId} executed successfully`);
return JobStatus.Success;
}
case PluginTriggerType.PersonRecognized: {
const data = event as WorkflowData[PluginTriggerType.PersonRecognized];
const asset = await this.assetRepository.getById(data.assetId);
if (!asset) {
this.logger.error(`Asset ${data.assetId} not found for workflow ${workflowId}`);
return JobStatus.Failed;
}
const authToken = this.cryptoRepository.signJwt({ userId: data.userId }, this.pluginJwtSecret);
const context: WorkflowContext = {
authToken,
asset,
personId: data.personId,
};
const filtersPassed = await this.executeFilters(workflowFilters, context);
if (!filtersPassed) {
return JobStatus.Skipped;
}
await this.executeActions(workflowActions, context);
this.logger.debug(`Workflow ${workflowId} executed successfully for PersonRecognized`);
return JobStatus.Success;
this.logger.error('unimplemented');
return JobStatus.Skipped;
}
default: {
@@ -304,7 +269,6 @@ export class PluginService extends BaseService {
config: workflowFilter.filterConfig,
data: {
asset: context.asset,
personId: context.personId,
},
};
@@ -347,7 +311,6 @@ export class PluginService extends BaseService {
config: workflowAction.actionConfig,
data: {
asset: context.asset,
personId: context.personId,
},
};

View File

@@ -259,9 +259,8 @@ export interface WorkflowData {
asset: Asset;
};
[PluginTriggerType.PersonRecognized]: {
userId: string;
assetId: string;
personId: string;
assetId: string;
};
}