mirror of
https://github.com/immich-app/immich.git
synced 2026-01-25 10:54:37 -08:00
Compare commits
7 Commits
feat/dynam
...
feat/isola
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8797dedb1 | ||
|
|
3cd2d7f657 | ||
|
|
3c1a5c744b | ||
|
|
f26a5da87e | ||
|
|
d2e7bc3cfd | ||
|
|
74d463c19c | ||
|
|
39b2af1940 |
@@ -24,12 +24,11 @@ import 'package:immich_mobile/providers/user.provider.dart';
|
|||||||
import 'package:immich_mobile/repositories/file_media.repository.dart';
|
import 'package:immich_mobile/repositories/file_media.repository.dart';
|
||||||
import 'package:immich_mobile/services/app_settings.service.dart';
|
import 'package:immich_mobile/services/app_settings.service.dart';
|
||||||
import 'package:immich_mobile/services/auth.service.dart';
|
import 'package:immich_mobile/services/auth.service.dart';
|
||||||
import 'package:immich_mobile/services/localization.service.dart';
|
|
||||||
import 'package:immich_mobile/services/foreground_upload.service.dart';
|
import 'package:immich_mobile/services/foreground_upload.service.dart';
|
||||||
|
import 'package:immich_mobile/services/localization.service.dart';
|
||||||
import 'package:immich_mobile/utils/bootstrap.dart';
|
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||||
import 'package:immich_mobile/utils/debug_print.dart';
|
import 'package:immich_mobile/utils/debug_print.dart';
|
||||||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||||
import 'package:immich_mobile/wm_executor.dart';
|
|
||||||
import 'package:isar/isar.dart';
|
import 'package:isar/isar.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
|
|
||||||
@@ -93,7 +92,6 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||||||
await Future.wait(
|
await Future.wait(
|
||||||
[
|
[
|
||||||
loadTranslations(),
|
loadTranslations(),
|
||||||
workerManagerPatch.init(dynamicSpawning: true),
|
|
||||||
_ref?.read(authServiceProvider).setOpenApiServiceEndpoint(),
|
_ref?.read(authServiceProvider).setOpenApiServiceEndpoint(),
|
||||||
// Initialize the file downloader
|
// Initialize the file downloader
|
||||||
FileDownloader().configure(
|
FileDownloader().configure(
|
||||||
@@ -192,25 +190,21 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi {
|
|||||||
final backgroundSyncManager = _ref?.read(backgroundSyncProvider);
|
final backgroundSyncManager = _ref?.read(backgroundSyncProvider);
|
||||||
final nativeSyncApi = _ref?.read(nativeSyncApiProvider);
|
final nativeSyncApi = _ref?.read(nativeSyncApiProvider);
|
||||||
|
|
||||||
await _drift.close();
|
|
||||||
await _driftLogger.close();
|
|
||||||
|
|
||||||
_ref?.dispose();
|
_ref?.dispose();
|
||||||
_ref = null;
|
_ref = null;
|
||||||
|
|
||||||
|
await _drift.close();
|
||||||
|
await _driftLogger.close();
|
||||||
|
|
||||||
_cancellationToken.cancel();
|
_cancellationToken.cancel();
|
||||||
_logger.info("Cleaning up background worker");
|
_logger.info("Cleaning up background worker");
|
||||||
|
|
||||||
final cleanupFutures = [
|
final cleanupFutures = [
|
||||||
nativeSyncApi?.cancelHashing(),
|
nativeSyncApi?.cancelHashing(),
|
||||||
workerManagerPatch.dispose().catchError((_) async {
|
|
||||||
// Discard any errors on the dispose call
|
|
||||||
return;
|
|
||||||
}),
|
|
||||||
LogService.I.dispose(),
|
LogService.I.dispose(),
|
||||||
Store.dispose(),
|
Store.dispose(),
|
||||||
|
|
||||||
backgroundSyncManager?.cancel(),
|
backgroundSyncManager?.cancel(),
|
||||||
|
backgroundSyncManager?.cancelLocal(),
|
||||||
];
|
];
|
||||||
|
|
||||||
if (_isar.isOpen) {
|
if (_isar.isOpen) {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import 'package:immich_mobile/domain/utils/migrate_cloud_ids.dart' as m;
|
|||||||
import 'package:immich_mobile/domain/utils/sync_linked_album.dart';
|
import 'package:immich_mobile/domain/utils/sync_linked_album.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
|
||||||
import 'package:immich_mobile/utils/isolate.dart';
|
import 'package:immich_mobile/utils/isolate.dart';
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
|
||||||
|
|
||||||
typedef SyncCallback = void Function();
|
typedef SyncCallback = void Function();
|
||||||
typedef SyncCallbackWithResult<T> = void Function(T result);
|
typedef SyncCallbackWithResult<T> = void Function(T result);
|
||||||
@@ -27,12 +26,12 @@ class BackgroundSyncManager {
|
|||||||
final SyncCallback? onCloudIdSyncComplete;
|
final SyncCallback? onCloudIdSyncComplete;
|
||||||
final SyncErrorCallback? onCloudIdSyncError;
|
final SyncErrorCallback? onCloudIdSyncError;
|
||||||
|
|
||||||
Cancelable<bool?>? _syncTask;
|
CancellableTask<bool>? _syncTask;
|
||||||
Cancelable<void>? _syncWebsocketTask;
|
CancellableTask<void>? _syncWebsocketTask;
|
||||||
Cancelable<void>? _cloudIdSyncTask;
|
CancellableTask<void>? _cloudIdSyncTask;
|
||||||
Cancelable<void>? _deviceAlbumSyncTask;
|
CancellableTask<void>? _deviceAlbumSyncTask;
|
||||||
Cancelable<void>? _linkedAlbumSyncTask;
|
CancellableTask<void>? _linkedAlbumSyncTask;
|
||||||
Cancelable<void>? _hashTask;
|
CancellableTask<void>? _hashTask;
|
||||||
|
|
||||||
BackgroundSyncManager({
|
BackgroundSyncManager({
|
||||||
this.onRemoteSyncStart,
|
this.onRemoteSyncStart,
|
||||||
@@ -50,59 +49,42 @@ class BackgroundSyncManager {
|
|||||||
});
|
});
|
||||||
|
|
||||||
Future<void> cancel() async {
|
Future<void> cancel() async {
|
||||||
final futures = <Future>[];
|
|
||||||
|
|
||||||
if (_syncTask != null) {
|
|
||||||
futures.add(_syncTask!.future);
|
|
||||||
}
|
|
||||||
_syncTask?.cancel();
|
_syncTask?.cancel();
|
||||||
_syncTask = null;
|
|
||||||
|
|
||||||
if (_syncWebsocketTask != null) {
|
|
||||||
futures.add(_syncWebsocketTask!.future);
|
|
||||||
}
|
|
||||||
_syncWebsocketTask?.cancel();
|
_syncWebsocketTask?.cancel();
|
||||||
_syncWebsocketTask = null;
|
|
||||||
|
|
||||||
if (_cloudIdSyncTask != null) {
|
|
||||||
futures.add(_cloudIdSyncTask!.future);
|
|
||||||
}
|
|
||||||
_cloudIdSyncTask?.cancel();
|
_cloudIdSyncTask?.cancel();
|
||||||
_cloudIdSyncTask = null;
|
|
||||||
|
|
||||||
if (_linkedAlbumSyncTask != null) {
|
|
||||||
futures.add(_linkedAlbumSyncTask!.future);
|
|
||||||
}
|
|
||||||
_linkedAlbumSyncTask?.cancel();
|
_linkedAlbumSyncTask?.cancel();
|
||||||
_linkedAlbumSyncTask = null;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Future.wait(futures);
|
await Future.wait(
|
||||||
} on CanceledError {
|
[
|
||||||
// Ignore cancellation errors
|
_syncTask?.future,
|
||||||
|
_syncWebsocketTask?.future,
|
||||||
|
_cloudIdSyncTask?.future,
|
||||||
|
_linkedAlbumSyncTask?.future,
|
||||||
|
].nonNulls,
|
||||||
|
);
|
||||||
|
} catch (e) {
|
||||||
|
// Ignore cancellation errors and cleanup timeouts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_syncTask = null;
|
||||||
|
_syncWebsocketTask = null;
|
||||||
|
_cloudIdSyncTask = null;
|
||||||
|
_linkedAlbumSyncTask = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> cancelLocal() async {
|
Future<void> cancelLocal() async {
|
||||||
final futures = <Future>[];
|
|
||||||
|
|
||||||
if (_hashTask != null) {
|
|
||||||
futures.add(_hashTask!.future);
|
|
||||||
}
|
|
||||||
_hashTask?.cancel();
|
_hashTask?.cancel();
|
||||||
_hashTask = null;
|
|
||||||
|
|
||||||
if (_deviceAlbumSyncTask != null) {
|
|
||||||
futures.add(_deviceAlbumSyncTask!.future);
|
|
||||||
}
|
|
||||||
_deviceAlbumSyncTask?.cancel();
|
_deviceAlbumSyncTask?.cancel();
|
||||||
_deviceAlbumSyncTask = null;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Future.wait(futures);
|
await Future.wait([_hashTask?.future, _deviceAlbumSyncTask?.future].nonNulls);
|
||||||
} on CanceledError {
|
} catch (e) {
|
||||||
// Ignore cancellation errors
|
// Ignore cancellation errors and cleanup timeouts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_hashTask = null;
|
||||||
|
_deviceAlbumSyncTask = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// No need to cancel the task, as it can also be run when the user logs out
|
// No need to cancel the task, as it can also be run when the user logs out
|
||||||
@@ -133,7 +115,8 @@ class BackgroundSyncManager {
|
|||||||
.catchError((error) {
|
.catchError((error) {
|
||||||
onLocalSyncError?.call(error.toString());
|
onLocalSyncError?.call(error.toString());
|
||||||
_deviceAlbumSyncTask = null;
|
_deviceAlbumSyncTask = null;
|
||||||
});
|
})
|
||||||
|
.future;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> hashAssets() {
|
Future<void> hashAssets() {
|
||||||
@@ -156,7 +139,8 @@ class BackgroundSyncManager {
|
|||||||
.catchError((error) {
|
.catchError((error) {
|
||||||
onHashingError?.call(error.toString());
|
onHashingError?.call(error.toString());
|
||||||
_hashTask = null;
|
_hashTask = null;
|
||||||
});
|
})
|
||||||
|
.future;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<bool> syncRemote() {
|
Future<bool> syncRemote() {
|
||||||
@@ -170,7 +154,7 @@ class BackgroundSyncManager {
|
|||||||
computation: (ref) => ref.read(syncStreamServiceProvider).sync(),
|
computation: (ref) => ref.read(syncStreamServiceProvider).sync(),
|
||||||
debugLabel: 'remote-sync',
|
debugLabel: 'remote-sync',
|
||||||
);
|
);
|
||||||
return _syncTask!
|
return _syncTask!.future
|
||||||
.then((result) {
|
.then((result) {
|
||||||
final success = result ?? false;
|
final success = result ?? false;
|
||||||
onRemoteSyncComplete?.call(success);
|
onRemoteSyncComplete?.call(success);
|
||||||
@@ -193,7 +177,7 @@ class BackgroundSyncManager {
|
|||||||
_syncWebsocketTask = _handleWsAssetUploadReadyV1Batch(batchData);
|
_syncWebsocketTask = _handleWsAssetUploadReadyV1Batch(batchData);
|
||||||
return _syncWebsocketTask!.whenComplete(() {
|
return _syncWebsocketTask!.whenComplete(() {
|
||||||
_syncWebsocketTask = null;
|
_syncWebsocketTask = null;
|
||||||
});
|
}).future;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> syncWebsocketEditBatch(List<dynamic> batchData) {
|
Future<void> syncWebsocketEditBatch(List<dynamic> batchData) {
|
||||||
@@ -203,7 +187,7 @@ class BackgroundSyncManager {
|
|||||||
_syncWebsocketTask = _handleWsAssetEditReadyV1Batch(batchData);
|
_syncWebsocketTask = _handleWsAssetEditReadyV1Batch(batchData);
|
||||||
return _syncWebsocketTask!.whenComplete(() {
|
return _syncWebsocketTask!.whenComplete(() {
|
||||||
_syncWebsocketTask = null;
|
_syncWebsocketTask = null;
|
||||||
});
|
}).future;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> syncLinkedAlbum() {
|
Future<void> syncLinkedAlbum() {
|
||||||
@@ -214,7 +198,7 @@ class BackgroundSyncManager {
|
|||||||
_linkedAlbumSyncTask = runInIsolateGentle(computation: syncLinkedAlbumsIsolated, debugLabel: 'linked-album-sync');
|
_linkedAlbumSyncTask = runInIsolateGentle(computation: syncLinkedAlbumsIsolated, debugLabel: 'linked-album-sync');
|
||||||
return _linkedAlbumSyncTask!.whenComplete(() {
|
return _linkedAlbumSyncTask!.whenComplete(() {
|
||||||
_linkedAlbumSyncTask = null;
|
_linkedAlbumSyncTask = null;
|
||||||
});
|
}).future;
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> syncCloudIds() {
|
Future<void> syncCloudIds() {
|
||||||
@@ -224,7 +208,7 @@ class BackgroundSyncManager {
|
|||||||
|
|
||||||
onCloudIdSyncStart?.call();
|
onCloudIdSyncStart?.call();
|
||||||
|
|
||||||
_cloudIdSyncTask = runInIsolateGentle(computation: m.syncCloudIds);
|
_cloudIdSyncTask = runInIsolateGentle(computation: m.syncCloudIds, debugLabel: 'cloud-id-sync');
|
||||||
return _cloudIdSyncTask!
|
return _cloudIdSyncTask!
|
||||||
.whenComplete(() {
|
.whenComplete(() {
|
||||||
onCloudIdSyncComplete?.call();
|
onCloudIdSyncComplete?.call();
|
||||||
@@ -233,16 +217,17 @@ class BackgroundSyncManager {
|
|||||||
.catchError((error) {
|
.catchError((error) {
|
||||||
onCloudIdSyncError?.call(error.toString());
|
onCloudIdSyncError?.call(error.toString());
|
||||||
_cloudIdSyncTask = null;
|
_cloudIdSyncTask = null;
|
||||||
});
|
})
|
||||||
|
.future;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Cancelable<void> _handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
|
CancellableTask<void> _handleWsAssetUploadReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
|
||||||
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetUploadReadyV1Batch(batchData),
|
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetUploadReadyV1Batch(batchData),
|
||||||
debugLabel: 'websocket-batch',
|
debugLabel: 'websocket-batch',
|
||||||
);
|
);
|
||||||
|
|
||||||
Cancelable<void> _handleWsAssetEditReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
|
CancellableTask<void> _handleWsAssetEditReadyV1Batch(List<dynamic> batchData) => runInIsolateGentle(
|
||||||
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetEditReadyV1Batch(batchData),
|
computation: (ref) => ref.read(syncStreamServiceProvider).handleWsAssetEditReadyV1Batch(batchData),
|
||||||
debugLabel: 'websocket-edit',
|
debugLabel: 'websocket-edit',
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:io';
|
import 'dart:io';
|
||||||
import 'dart:math';
|
|
||||||
|
|
||||||
import 'package:auto_route/auto_route.dart';
|
import 'package:auto_route/auto_route.dart';
|
||||||
import 'package:background_downloader/background_downloader.dart';
|
import 'package:background_downloader/background_downloader.dart';
|
||||||
@@ -41,7 +40,6 @@ import 'package:immich_mobile/utils/debug_print.dart';
|
|||||||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||||
import 'package:immich_mobile/utils/licenses.dart';
|
import 'package:immich_mobile/utils/licenses.dart';
|
||||||
import 'package:immich_mobile/utils/migration.dart';
|
import 'package:immich_mobile/utils/migration.dart';
|
||||||
import 'package:immich_mobile/wm_executor.dart';
|
|
||||||
import 'package:immich_ui/immich_ui.dart';
|
import 'package:immich_ui/immich_ui.dart';
|
||||||
import 'package:intl/date_symbol_data_local.dart';
|
import 'package:intl/date_symbol_data_local.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
@@ -53,8 +51,6 @@ void main() async {
|
|||||||
final (isar, drift, logDb) = await Bootstrap.initDB();
|
final (isar, drift, logDb) = await Bootstrap.initDB();
|
||||||
await Bootstrap.initDomain(isar, drift, logDb);
|
await Bootstrap.initDomain(isar, drift, logDb);
|
||||||
await initApp();
|
await initApp();
|
||||||
// Warm-up isolate pool for worker manager
|
|
||||||
await workerManagerPatch.init(dynamicSpawning: true, isolatesCount: max(Platform.numberOfProcessors - 1, 5));
|
|
||||||
await migrateDatabaseIfNeeded(isar, drift);
|
await migrateDatabaseIfNeeded(isar, drift);
|
||||||
HttpSSLOptions.apply();
|
HttpSSLOptions.apply();
|
||||||
|
|
||||||
|
|||||||
@@ -160,7 +160,7 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
|
|||||||
_resumeBackup();
|
_resumeBackup();
|
||||||
}),
|
}),
|
||||||
_resumeBackup(),
|
_resumeBackup(),
|
||||||
backgroundManager.syncCloudIds(),
|
_safeRun(backgroundManager.syncCloudIds(), "syncCloudIds"),
|
||||||
]);
|
]);
|
||||||
} else {
|
} else {
|
||||||
await _safeRun(backgroundManager.hashAssets(), "hashAssets");
|
await _safeRun(backgroundManager.hashAssets(), "hashAssets");
|
||||||
@@ -218,7 +218,14 @@ class AppLifeCycleNotifier extends StateNotifier<AppLifeCycleEnum> {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
if (Store.isBetaTimelineEnabled) {
|
if (Store.isBetaTimelineEnabled) {
|
||||||
unawaited(_ref.read(backgroundWorkerLockServiceProvider).unlock());
|
unawaited(
|
||||||
|
Future.wait([
|
||||||
|
_ref.read(backgroundWorkerLockServiceProvider).unlock(),
|
||||||
|
_ref.read(nativeSyncApiProvider).cancelHashing(),
|
||||||
|
_ref.read(backgroundSyncProvider).cancel(),
|
||||||
|
_ref.read(backgroundSyncProvider).cancelLocal(),
|
||||||
|
]),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
await _performPause();
|
await _performPause();
|
||||||
} catch (e, stackTrace) {
|
} catch (e, stackTrace) {
|
||||||
|
|||||||
@@ -21,7 +21,13 @@ final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) {
|
|||||||
backupProvider.updateError(isSuccess == true ? BackupError.none : BackupError.syncFailed);
|
backupProvider.updateError(isSuccess == true ? BackupError.none : BackupError.syncFailed);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
onRemoteSyncError: syncStatusNotifier.errorRemoteSync,
|
onRemoteSyncError: (error) {
|
||||||
|
syncStatusNotifier.errorRemoteSync(error);
|
||||||
|
final backupProvider = ref.read(driftBackupProvider.notifier);
|
||||||
|
if (backupProvider.mounted) {
|
||||||
|
backupProvider.updateError(BackupError.syncFailed);
|
||||||
|
}
|
||||||
|
},
|
||||||
onLocalSyncStart: syncStatusNotifier.startLocalSync,
|
onLocalSyncStart: syncStatusNotifier.startLocalSync,
|
||||||
onLocalSyncComplete: syncStatusNotifier.completeLocalSync,
|
onLocalSyncComplete: syncStatusNotifier.completeLocalSync,
|
||||||
onLocalSyncError: syncStatusNotifier.errorLocalSync,
|
onLocalSyncError: syncStatusNotifier.errorLocalSync,
|
||||||
|
|||||||
@@ -1,95 +1,307 @@
|
|||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
|
import 'dart:isolate';
|
||||||
import 'dart:ui';
|
import 'dart:ui';
|
||||||
|
|
||||||
import 'package:flutter/services.dart';
|
import 'package:flutter/services.dart';
|
||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/domain/services/log.service.dart';
|
import 'package:immich_mobile/domain/services/log.service.dart';
|
||||||
import 'package:immich_mobile/entities/store.entity.dart';
|
import 'package:immich_mobile/entities/store.entity.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
|
||||||
import 'package:immich_mobile/providers/db.provider.dart';
|
import 'package:immich_mobile/providers/db.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart';
|
||||||
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||||
import 'package:immich_mobile/utils/bootstrap.dart';
|
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||||
import 'package:immich_mobile/utils/debug_print.dart';
|
import 'package:immich_mobile/utils/debug_print.dart';
|
||||||
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
import 'package:immich_mobile/utils/http_ssl_options.dart';
|
||||||
import 'package:immich_mobile/wm_executor.dart';
|
import 'package:isar/isar.dart';
|
||||||
import 'package:logging/logging.dart';
|
import 'package:logging/logging.dart';
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
|
||||||
|
|
||||||
class InvalidIsolateUsageException implements Exception {
|
class CancellableTask<T> {
|
||||||
const InvalidIsolateUsageException();
|
final Future<T?> future;
|
||||||
|
final void Function() cancel;
|
||||||
|
|
||||||
@override
|
const CancellableTask({required this.future, required this.cancel});
|
||||||
String toString() => "IsolateHelper should only be used from the root isolate";
|
|
||||||
}
|
|
||||||
|
|
||||||
// !! Should be used only from the root isolate
|
CancellableTask<T> whenComplete(void Function() action) {
|
||||||
Cancelable<T?> runInIsolateGentle<T>({
|
return CancellableTask(future: future.whenComplete(action), cancel: cancel);
|
||||||
required Future<T> Function(ProviderContainer ref) computation,
|
|
||||||
String? debugLabel,
|
|
||||||
}) {
|
|
||||||
final token = RootIsolateToken.instance;
|
|
||||||
if (token == null) {
|
|
||||||
throw const InvalidIsolateUsageException();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return workerManagerPatch.executeGentle((cancelledChecker) async {
|
CancellableTask<T> catchError(Function onError) {
|
||||||
T? result;
|
return CancellableTask(future: future.catchError(onError), cancel: cancel);
|
||||||
await runZonedGuarded(
|
}
|
||||||
() async {
|
|
||||||
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
|
|
||||||
DartPluginRegistrant.ensureInitialized();
|
|
||||||
|
|
||||||
final (isar, drift, logDb) = await Bootstrap.initDB();
|
CancellableTask<R> then<R>(FutureOr<R> Function(T?) onValue) {
|
||||||
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false, listenStoreUpdates: false);
|
return CancellableTask(future: future.then(onValue), cancel: cancel);
|
||||||
final ref = ProviderContainer(
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sealed class _IsolateMessage {
|
||||||
|
const _IsolateMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
class _InitMessage extends _IsolateMessage {
|
||||||
|
final SendPort sendPort;
|
||||||
|
const _InitMessage(this.sendPort);
|
||||||
|
}
|
||||||
|
|
||||||
|
class _CancelMessage extends _IsolateMessage {
|
||||||
|
const _CancelMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
class _ResultMessage extends _IsolateMessage {
|
||||||
|
final dynamic data;
|
||||||
|
const _ResultMessage(this.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
class _ErrorMessage extends _IsolateMessage {
|
||||||
|
final Object? error;
|
||||||
|
final StackTrace? stackTrace;
|
||||||
|
const _ErrorMessage(this.error, [this.stackTrace]);
|
||||||
|
}
|
||||||
|
|
||||||
|
class _DoneMessage extends _IsolateMessage {
|
||||||
|
const _DoneMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
class _IsolateTaskConfig<T> {
|
||||||
|
final Future<T> Function(ProviderContainer ref) computation;
|
||||||
|
final SendPort mainSendPort;
|
||||||
|
final RootIsolateToken rootToken;
|
||||||
|
final String debugLabel;
|
||||||
|
|
||||||
|
const _IsolateTaskConfig({
|
||||||
|
required this.computation,
|
||||||
|
required this.mainSendPort,
|
||||||
|
required this.rootToken,
|
||||||
|
required this.debugLabel,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
class _IsolateTaskRunner<T> {
|
||||||
|
final Completer<T?> _completer = Completer<T?>();
|
||||||
|
final ReceivePort _receivePort = ReceivePort();
|
||||||
|
final String debugLabel;
|
||||||
|
|
||||||
|
Isolate? _isolate;
|
||||||
|
SendPort? _isolateSendPort;
|
||||||
|
bool _isCancelled = false;
|
||||||
|
bool _isCleanedUp = false;
|
||||||
|
Timer? _cleanupTimeoutTimer;
|
||||||
|
|
||||||
|
_IsolateTaskRunner({required this.debugLabel});
|
||||||
|
|
||||||
|
Future<void> start(Future<T> Function(ProviderContainer ref) computation) async {
|
||||||
|
final token = RootIsolateToken.instance;
|
||||||
|
if (token == null) {
|
||||||
|
_completer.completeError(Exception("RootIsolateToken is not available. Isolate cannot be started."));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_receivePort.listen(_handleMessage);
|
||||||
|
|
||||||
|
final config = _IsolateTaskConfig<T>(
|
||||||
|
computation: computation,
|
||||||
|
mainSendPort: _receivePort.sendPort,
|
||||||
|
rootToken: token,
|
||||||
|
debugLabel: debugLabel,
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
_isolate = await Isolate.spawn(_isolateEntryPoint<T>, config, debugName: debugLabel);
|
||||||
|
} catch (error, stack) {
|
||||||
|
_completer.completeError(error, stack);
|
||||||
|
_cleanup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void cancel() {
|
||||||
|
if (_isCancelled || _isCleanedUp) return;
|
||||||
|
|
||||||
|
_isCancelled = true;
|
||||||
|
dPrint(() => "[$debugLabel] Cancellation requested");
|
||||||
|
|
||||||
|
_isolateSendPort?.send(const _CancelMessage());
|
||||||
|
_cleanupTimeoutTimer = Timer(const Duration(seconds: 4), () {
|
||||||
|
if (!_isCleanedUp) {
|
||||||
|
dPrint(() => "[$debugLabel] Cleanup timeout - force killing isolate");
|
||||||
|
_isolate?.kill(priority: Isolate.immediate);
|
||||||
|
if (!_completer.isCompleted) {
|
||||||
|
_completer.completeError(Exception("Isolate cleanup timed out for task: $debugLabel"));
|
||||||
|
}
|
||||||
|
_cleanup();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void _handleMessage(dynamic message) {
|
||||||
|
if (message is! _IsolateMessage) return;
|
||||||
|
|
||||||
|
switch (message) {
|
||||||
|
case _InitMessage(:var sendPort):
|
||||||
|
_isolateSendPort = sendPort;
|
||||||
|
dPrint(() => "[$debugLabel] Isolate initialized");
|
||||||
|
break;
|
||||||
|
|
||||||
|
case _ResultMessage(:var data):
|
||||||
|
_cleanup();
|
||||||
|
if (!_completer.isCompleted) {
|
||||||
|
_completer.complete(data as T?);
|
||||||
|
dPrint(() => "[$debugLabel] Isolate task completed with result - $data");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case _ErrorMessage(:var error, :var stackTrace):
|
||||||
|
_cleanup();
|
||||||
|
if (!_completer.isCompleted) {
|
||||||
|
dPrint(() => "[$debugLabel] Isolate task completed with error - $error");
|
||||||
|
_completer.completeError(error ?? Exception("Unknown error in isolate"), stackTrace ?? StackTrace.current);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case _DoneMessage():
|
||||||
|
dPrint(() => "[$debugLabel] Isolate cleanup completed");
|
||||||
|
_cleanup();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case _CancelMessage():
|
||||||
|
// Not expected to receive cancel from isolate
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void _cleanup() {
|
||||||
|
if (_isCleanedUp) return;
|
||||||
|
_isCleanedUp = true;
|
||||||
|
|
||||||
|
_cleanupTimeoutTimer?.cancel();
|
||||||
|
_receivePort.close();
|
||||||
|
_isolate?.kill(priority: Isolate.immediate);
|
||||||
|
_isolate = null;
|
||||||
|
_isolateSendPort = null;
|
||||||
|
|
||||||
|
dPrint(() => "[$debugLabel] Isolate cleaned up");
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<T?> get future => _completer.future;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _cleanupResources<T>(ProviderContainer? ref, Isar isar, Drift drift, DriftLogger logDb) async {
|
||||||
|
try {
|
||||||
|
final cleanupFutures = <Future>[
|
||||||
|
Store.dispose(),
|
||||||
|
LogService.I.dispose(),
|
||||||
|
logDb.close(),
|
||||||
|
drift.close(),
|
||||||
|
if (isar.isOpen) isar.close().catchError((_) => false),
|
||||||
|
];
|
||||||
|
|
||||||
|
ref?.dispose();
|
||||||
|
|
||||||
|
await Future.wait(cleanupFutures).timeout(
|
||||||
|
const Duration(seconds: 2),
|
||||||
|
onTimeout: () {
|
||||||
|
dPrint(() => "Cleanup timeout - some resources may not be closed");
|
||||||
|
return [];
|
||||||
|
},
|
||||||
|
);
|
||||||
|
} catch (error, stack) {
|
||||||
|
dPrint(() => "Error during isolate cleanup: $error with stack: $stack");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _isolateEntryPoint<T>(_IsolateTaskConfig<T> config) async {
|
||||||
|
final receivePort = ReceivePort();
|
||||||
|
config.mainSendPort.send(_InitMessage(receivePort.sendPort));
|
||||||
|
|
||||||
|
bool isCancelled = false;
|
||||||
|
ProviderContainer? ref;
|
||||||
|
final Isar isar;
|
||||||
|
final Drift drift;
|
||||||
|
final DriftLogger logDb;
|
||||||
|
|
||||||
|
try {
|
||||||
|
BackgroundIsolateBinaryMessenger.ensureInitialized(config.rootToken);
|
||||||
|
DartPluginRegistrant.ensureInitialized();
|
||||||
|
final (bootIsar, bootDrift, bootLogDb) = await Bootstrap.initDB();
|
||||||
|
await Bootstrap.initDomain(bootIsar, bootDrift, bootLogDb, shouldBufferLogs: false, listenStoreUpdates: false);
|
||||||
|
isar = bootIsar;
|
||||||
|
drift = bootDrift;
|
||||||
|
logDb = bootLogDb;
|
||||||
|
} catch (error, stack) {
|
||||||
|
dPrint(() => "[$config.debugLabel] Error during isolate bootstrap: $error");
|
||||||
|
config.mainSendPort.send(_ErrorMessage(error, stack));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final subscription = receivePort.listen((message) async {
|
||||||
|
if (message is _CancelMessage) {
|
||||||
|
isCancelled = true;
|
||||||
|
try {
|
||||||
|
receivePort.close();
|
||||||
|
await _cleanupResources(ref, isar, drift, logDb);
|
||||||
|
} catch (error, stack) {
|
||||||
|
dPrint(() => "Error during isolate cancellation cleanup: $error with stack: $stack");
|
||||||
|
} finally {
|
||||||
|
config.mainSendPort.send(const _ErrorMessage("Isolate task cancelled"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final log = Logger("IsolateWorker[${config.debugLabel}]");
|
||||||
|
await runZonedGuarded(
|
||||||
|
() async {
|
||||||
|
try {
|
||||||
|
ref = ProviderContainer(
|
||||||
overrides: [
|
overrides: [
|
||||||
// TODO: Remove once isar is removed
|
|
||||||
dbProvider.overrideWithValue(isar),
|
dbProvider.overrideWithValue(isar),
|
||||||
isarProvider.overrideWithValue(isar),
|
isarProvider.overrideWithValue(isar),
|
||||||
cancellationProvider.overrideWithValue(cancelledChecker),
|
cancellationProvider.overrideWithValue(() => isCancelled),
|
||||||
driftProvider.overrideWith(driftOverride(drift)),
|
driftProvider.overrideWith(driftOverride(drift)),
|
||||||
],
|
],
|
||||||
);
|
);
|
||||||
|
|
||||||
Logger log = Logger("IsolateLogger");
|
HttpSSLOptions.apply(applyNative: false);
|
||||||
|
final result = await config.computation(ref!);
|
||||||
|
|
||||||
try {
|
if (!isCancelled) {
|
||||||
HttpSSLOptions.apply(applyNative: false);
|
config.mainSendPort.send(_ResultMessage(result));
|
||||||
result = await computation(ref);
|
} else {
|
||||||
} on CanceledError {
|
log.fine("Task completed but was cancelled - not sending result");
|
||||||
log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}");
|
|
||||||
} catch (error, stack) {
|
|
||||||
log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack);
|
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
ref.dispose();
|
|
||||||
|
|
||||||
await Store.dispose();
|
|
||||||
await LogService.I.dispose();
|
|
||||||
await logDb.close();
|
|
||||||
await drift.close();
|
|
||||||
|
|
||||||
// Close Isar safely
|
|
||||||
try {
|
|
||||||
if (isar.isOpen) {
|
|
||||||
await isar.close();
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
dPrint(() => "Error closing Isar: $e");
|
|
||||||
}
|
|
||||||
} catch (error, stack) {
|
|
||||||
dPrint(() => "Error closing resources in isolate: $error, $stack");
|
|
||||||
} finally {
|
|
||||||
ref.dispose();
|
|
||||||
// Delay to ensure all resources are released
|
|
||||||
await Future.delayed(const Duration(seconds: 2));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
} catch (error, stack) {
|
||||||
(error, stack) {
|
log.severe("Error in isolate execution", error, stack);
|
||||||
dPrint(() => "Error in isolate $debugLabel zone: $error, $stack");
|
config.mainSendPort.send(_ErrorMessage(error, stack));
|
||||||
},
|
} finally {
|
||||||
);
|
try {
|
||||||
return result;
|
receivePort.close();
|
||||||
});
|
unawaited(subscription.cancel());
|
||||||
|
await _cleanupResources(ref, isar, drift, logDb);
|
||||||
|
} catch (error, stack) {
|
||||||
|
dPrint(() => "Error during isolate cleanup: $error with stack: $stack");
|
||||||
|
} finally {
|
||||||
|
unawaited(subscription.cancel());
|
||||||
|
config.mainSendPort.send(const _DoneMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(error, stack) async {
|
||||||
|
dPrint(() => "Uncaught error in isolate zone: $error, $stack");
|
||||||
|
receivePort.close();
|
||||||
|
unawaited(subscription.cancel());
|
||||||
|
await _cleanupResources(ref, isar, drift, logDb);
|
||||||
|
config.mainSendPort.send(_ErrorMessage(error, stack));
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
CancellableTask<T> runInIsolateGentle<T>({
|
||||||
|
required Future<T> Function(ProviderContainer ref) computation,
|
||||||
|
String? debugLabel,
|
||||||
|
}) {
|
||||||
|
final runner = _IsolateTaskRunner<T>(
|
||||||
|
debugLabel: debugLabel ?? 'isolate-task-${DateTime.now().millisecondsSinceEpoch}',
|
||||||
|
)..start(computation);
|
||||||
|
|
||||||
|
return CancellableTask<T>(future: runner.future, cancel: runner.cancel);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,251 +0,0 @@
|
|||||||
// part of 'package:worker_manager/worker_manager.dart';
|
|
||||||
// ignore_for_file: implementation_imports, avoid_print
|
|
||||||
|
|
||||||
import 'dart:async';
|
|
||||||
import 'dart:math';
|
|
||||||
|
|
||||||
import 'package:collection/collection.dart';
|
|
||||||
import 'package:flutter/foundation.dart';
|
|
||||||
import 'package:worker_manager/src/number_of_processors/processors_io.dart';
|
|
||||||
import 'package:worker_manager/src/worker/worker.dart';
|
|
||||||
import 'package:worker_manager/worker_manager.dart';
|
|
||||||
|
|
||||||
final workerManagerPatch = _Executor();
|
|
||||||
|
|
||||||
// [-2^54; 2^53] is compatible with dart2js, see core.int doc
|
|
||||||
const _minId = -9007199254740992;
|
|
||||||
const _maxId = 9007199254740992;
|
|
||||||
|
|
||||||
class Mixinable<T> {
|
|
||||||
late final itSelf = this as T;
|
|
||||||
}
|
|
||||||
|
|
||||||
mixin _ExecutorLogger on Mixinable<_Executor> {
|
|
||||||
var log = false;
|
|
||||||
|
|
||||||
@mustCallSuper
|
|
||||||
void init() {
|
|
||||||
logMessage("${itSelf._isolatesCount} workers have been spawned and initialized");
|
|
||||||
}
|
|
||||||
|
|
||||||
void logTaskAdded<R>(String uid) {
|
|
||||||
logMessage("added task with number $uid");
|
|
||||||
}
|
|
||||||
|
|
||||||
@mustCallSuper
|
|
||||||
void dispose() {
|
|
||||||
logMessage("worker_manager have been disposed");
|
|
||||||
}
|
|
||||||
|
|
||||||
@mustCallSuper
|
|
||||||
void _cancel(Task task) {
|
|
||||||
logMessage("Task ${task.id} have been canceled");
|
|
||||||
}
|
|
||||||
|
|
||||||
void logMessage(String message) {
|
|
||||||
if (log) print(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class _Executor extends Mixinable<_Executor> with _ExecutorLogger {
|
|
||||||
final _queue = PriorityQueue<Task>();
|
|
||||||
final _pool = <Worker>[];
|
|
||||||
var _nextTaskId = _minId;
|
|
||||||
var _dynamicSpawning = false;
|
|
||||||
var _isolatesCount = numberOfProcessors;
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<void> init({int? isolatesCount, bool? dynamicSpawning}) async {
|
|
||||||
if (_pool.isNotEmpty) {
|
|
||||||
print("worker_manager already warmed up, init is ignored. Dispose before init");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (isolatesCount != null) {
|
|
||||||
if (isolatesCount < 0) {
|
|
||||||
throw Exception("isolatesCount must be greater than 0");
|
|
||||||
}
|
|
||||||
|
|
||||||
_isolatesCount = isolatesCount;
|
|
||||||
}
|
|
||||||
_dynamicSpawning = dynamicSpawning ?? false;
|
|
||||||
await _ensureWorkersInitialized();
|
|
||||||
super.init();
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
|
||||||
Future<void> dispose() async {
|
|
||||||
_queue.clear();
|
|
||||||
for (final worker in _pool) {
|
|
||||||
worker.kill();
|
|
||||||
}
|
|
||||||
_pool.clear();
|
|
||||||
super.dispose();
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> execute<R>(Execute<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
|
|
||||||
return _createCancelable<R>(execution: execution, priority: priority);
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> executeNow<R>(ExecuteGentle<R> execution) {
|
|
||||||
final task = TaskGentle<R>(
|
|
||||||
id: "",
|
|
||||||
workPriority: WorkPriority.immediately,
|
|
||||||
execution: execution,
|
|
||||||
completer: Completer<R>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
Future<void> run() async {
|
|
||||||
try {
|
|
||||||
final result = await execution(() => task.canceled);
|
|
||||||
task.complete(result, null, null);
|
|
||||||
} catch (error, st) {
|
|
||||||
task.complete(null, error, st);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
run();
|
|
||||||
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> executeWithPort<R, T>(
|
|
||||||
ExecuteWithPort<R> execution, {
|
|
||||||
WorkPriority priority = WorkPriority.immediately,
|
|
||||||
required void Function(T value) onMessage,
|
|
||||||
}) {
|
|
||||||
return _createCancelable<R>(
|
|
||||||
execution: execution,
|
|
||||||
priority: priority,
|
|
||||||
onMessage: (message) => onMessage(message as T),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> executeGentle<R>(ExecuteGentle<R> execution, {WorkPriority priority = WorkPriority.immediately}) {
|
|
||||||
return _createCancelable<R>(execution: execution, priority: priority);
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> executeGentleWithPort<R, T>(
|
|
||||||
ExecuteGentleWithPort<R> execution, {
|
|
||||||
WorkPriority priority = WorkPriority.immediately,
|
|
||||||
required void Function(T value) onMessage,
|
|
||||||
}) {
|
|
||||||
return _createCancelable<R>(
|
|
||||||
execution: execution,
|
|
||||||
priority: priority,
|
|
||||||
onMessage: (message) => onMessage(message as T),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
void _createWorkers() {
|
|
||||||
for (var i = 0; i < _isolatesCount; i++) {
|
|
||||||
_pool.add(Worker());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<void> _initializeWorkers() async {
|
|
||||||
await Future.wait(_pool.map((e) => e.initialize()));
|
|
||||||
}
|
|
||||||
|
|
||||||
Cancelable<R> _createCancelable<R>({
|
|
||||||
required Function execution,
|
|
||||||
WorkPriority priority = WorkPriority.immediately,
|
|
||||||
void Function(Object value)? onMessage,
|
|
||||||
}) {
|
|
||||||
if (_nextTaskId + 1 == _maxId) {
|
|
||||||
_nextTaskId = _minId;
|
|
||||||
}
|
|
||||||
final id = _nextTaskId.toString();
|
|
||||||
_nextTaskId++;
|
|
||||||
late final Task<R> task;
|
|
||||||
final completer = Completer<R>();
|
|
||||||
if (execution is Execute<R>) {
|
|
||||||
task = TaskRegular<R>(id: id, workPriority: priority, execution: execution, completer: completer);
|
|
||||||
} else if (execution is ExecuteWithPort<R>) {
|
|
||||||
task = TaskWithPort<R>(
|
|
||||||
id: id,
|
|
||||||
workPriority: priority,
|
|
||||||
execution: execution,
|
|
||||||
completer: completer,
|
|
||||||
onMessage: onMessage!,
|
|
||||||
);
|
|
||||||
} else if (execution is ExecuteGentle<R>) {
|
|
||||||
task = TaskGentle<R>(id: id, workPriority: priority, execution: execution, completer: completer);
|
|
||||||
} else if (execution is ExecuteGentleWithPort<R>) {
|
|
||||||
task = TaskGentleWithPort<R>(
|
|
||||||
id: id,
|
|
||||||
workPriority: priority,
|
|
||||||
execution: execution,
|
|
||||||
completer: completer,
|
|
||||||
onMessage: onMessage!,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
_queue.add(task);
|
|
||||||
_schedule();
|
|
||||||
logTaskAdded(task.id);
|
|
||||||
return Cancelable(completer: task.completer, onCancel: () => _cancel(task));
|
|
||||||
}
|
|
||||||
|
|
||||||
Future<void> _ensureWorkersInitialized() async {
|
|
||||||
if (_pool.isEmpty) {
|
|
||||||
_createWorkers();
|
|
||||||
if (!_dynamicSpawning) {
|
|
||||||
await _initializeWorkers();
|
|
||||||
final poolSize = _pool.length;
|
|
||||||
final queueSize = _queue.length;
|
|
||||||
for (int i = 0; i <= min(poolSize, queueSize); i++) {
|
|
||||||
_schedule();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (_pool.every((worker) => worker.taskId != null)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (_dynamicSpawning) {
|
|
||||||
final freeWorker = _pool.firstWhereOrNull(
|
|
||||||
(worker) => worker.taskId == null && !worker.initialized && !worker.initializing,
|
|
||||||
);
|
|
||||||
await freeWorker?.initialize();
|
|
||||||
_schedule();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void _schedule() {
|
|
||||||
final availableWorker = _pool.firstWhereOrNull((worker) => worker.taskId == null && worker.initialized);
|
|
||||||
if (availableWorker == null) {
|
|
||||||
_ensureWorkersInitialized();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (_queue.isEmpty) return;
|
|
||||||
final task = _queue.removeFirst();
|
|
||||||
|
|
||||||
availableWorker
|
|
||||||
.work(task)
|
|
||||||
.then(
|
|
||||||
(value) {
|
|
||||||
//could be completed already by cancel and it is normal.
|
|
||||||
//Assuming that worker finished with error and cleaned gracefully
|
|
||||||
task.complete(value, null, null);
|
|
||||||
},
|
|
||||||
onError: (error, st) {
|
|
||||||
task.complete(null, error, st);
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.whenComplete(() {
|
|
||||||
if (_dynamicSpawning && _queue.isEmpty) availableWorker.kill();
|
|
||||||
_schedule();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@override
|
|
||||||
void _cancel(Task task) {
|
|
||||||
task.cancel();
|
|
||||||
_queue.remove(task);
|
|
||||||
final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id);
|
|
||||||
if (task is Gentle) {
|
|
||||||
targetWorker?.cancelGentle();
|
|
||||||
} else {
|
|
||||||
targetWorker?.kill();
|
|
||||||
if (!_dynamicSpawning) targetWorker?.initialize();
|
|
||||||
}
|
|
||||||
super._cancel(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -2154,14 +2154,6 @@ packages:
|
|||||||
url: "https://pub.dev"
|
url: "https://pub.dev"
|
||||||
source: hosted
|
source: hosted
|
||||||
version: "0.0.3"
|
version: "0.0.3"
|
||||||
worker_manager:
|
|
||||||
dependency: "direct main"
|
|
||||||
description:
|
|
||||||
name: worker_manager
|
|
||||||
sha256: "1bce9f894a0c187856f5fc0e150e7fe1facce326f048ca6172947754dac3d4f3"
|
|
||||||
url: "https://pub.dev"
|
|
||||||
source: hosted
|
|
||||||
version: "7.2.7"
|
|
||||||
xdg_directories:
|
xdg_directories:
|
||||||
dependency: transitive
|
dependency: transitive
|
||||||
description:
|
description:
|
||||||
|
|||||||
@@ -85,7 +85,6 @@ dependencies:
|
|||||||
url_launcher: ^6.3.2
|
url_launcher: ^6.3.2
|
||||||
uuid: ^4.5.1
|
uuid: ^4.5.1
|
||||||
wakelock_plus: ^1.3.0
|
wakelock_plus: ^1.3.0
|
||||||
worker_manager: ^7.2.7
|
|
||||||
|
|
||||||
dev_dependencies:
|
dev_dependencies:
|
||||||
auto_route_generator: ^9.0.0
|
auto_route_generator: ^9.0.0
|
||||||
|
|||||||
Reference in New Issue
Block a user