From 963862b1b9967c589f7474037a2abbbe6e05b495 Mon Sep 17 00:00:00 2001 From: Mert <101130780+mertalev@users.noreply.github.com> Date: Wed, 3 Jun 2026 08:16:19 -0400 Subject: [PATCH] fix(mobile): proper background task cleanup (#28694) * event-based cancellation wire hash cancellation await cleanup remove forced kill add regression tests abort sync requests fix cleanup ordering in teardown exit isolate test background sync test sigabrt crash cleanup * abort local sync --- .../app/alextran/immich/sync/Messages.g.kt | 100 +++++++---- .../alextran/immich/sync/MessagesImpl26.kt | 12 +- .../alextran/immich/sync/MessagesImpl30.kt | 15 +- .../alextran/immich/sync/MessagesImplBase.kt | 39 ++++- .../background_sync_teardown_test.dart | 154 +++++++++++++++++ .../test_utils/fake_immich_server.dart | 115 ++++++++++++ .../Runner/Background/BackgroundWorker.swift | 10 +- mobile/ios/Runner/Sync/Messages.g.swift | 100 ++++++----- mobile/ios/Runner/Sync/MessagesImpl.swift | 147 +++++++++++----- .../services/background_worker.service.dart | 20 +-- mobile/lib/domain/services/hash.service.dart | 14 +- .../domain/services/local_sync.service.dart | 37 +++- mobile/lib/domain/services/log.service.dart | 12 +- mobile/lib/domain/services/store.service.dart | 6 + .../services/sync_linked_album.service.dart | 16 +- .../domain/services/sync_stream.service.dart | 23 ++- mobile/lib/domain/utils/background_sync.dart | 56 ++---- .../lib/domain/utils/migrate_cloud_ids.dart | 38 +++- .../repositories/sync_api.repository.dart | 3 +- mobile/lib/platform/native_sync_api.g.dart | 14 ++ .../infrastructure/cancel.provider.dart | 7 +- .../infrastructure/sync.provider.dart | 4 +- .../drift_album_api_repository.dart | 10 +- mobile/lib/utils/isolate.dart | 66 +++---- mobile/lib/utils/isolate_worker.dart | 163 ++++++++++++++++++ mobile/lib/wm_executor.dart | 140 ++++----------- mobile/pigeon/native_sync_api.dart | 11 +- .../services/sync_stream_service_test.dart | 32 ++-- .../test/unit/utils/isolate_worker_test.dart | 23 +++ 29 files changed, 990 insertions(+), 397 deletions(-) create mode 100644 mobile/integration_test/background_sync_teardown_test.dart create mode 100644 mobile/integration_test/test_utils/fake_immich_server.dart create mode 100644 mobile/lib/utils/isolate_worker.dart create mode 100644 mobile/test/unit/utils/isolate_worker_test.dart diff --git a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/Messages.g.kt b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/Messages.g.kt index 345302026d..02f1cb237d 100644 --- a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/Messages.g.kt +++ b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/Messages.g.kt @@ -542,16 +542,17 @@ private open class MessagesPigeonCodec : StandardMessageCodec() { /** Generated interface from Pigeon that represents a handler of messages from Flutter. */ interface NativeSyncApi { - fun shouldFullSync(): Boolean - fun getMediaChanges(): SyncDelta + fun shouldFullSync(callback: (Result) -> Unit) + fun getMediaChanges(callback: (Result) -> Unit) fun checkpointSync() fun clearSyncCheckpoint() - fun getAssetIdsForAlbum(albumId: String): List - fun getAlbums(): List + fun getAssetIdsForAlbum(albumId: String, callback: (Result>) -> Unit) + fun getAlbums(callback: (Result>) -> Unit) fun getAssetsCountSince(albumId: String, timestamp: Long): Long - fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List + fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?, callback: (Result>) -> Unit) fun hashAssets(assetIds: List, allowNetworkAccess: Boolean, callback: (Result>) -> Unit) fun cancelHashing() + fun cancelSync() fun getTrashedAssets(): Map> fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result) -> Unit) fun getCloudIdForAssetIds(assetIds: List): List @@ -570,27 +571,33 @@ interface NativeSyncApi { val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync$separatedMessageChannelSuffix", codec) if (api != null) { channel.setMessageHandler { _, reply -> - val wrapped: List = try { - listOf(api.shouldFullSync()) - } catch (exception: Throwable) { - MessagesPigeonUtils.wrapError(exception) + api.shouldFullSync{ result: Result -> + val error = result.exceptionOrNull() + if (error != null) { + reply.reply(MessagesPigeonUtils.wrapError(error)) + } else { + val data = result.getOrNull() + reply.reply(MessagesPigeonUtils.wrapResult(data)) + } } - reply.reply(wrapped) } } else { channel.setMessageHandler(null) } } run { - val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges$separatedMessageChannelSuffix", codec, taskQueue) + val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges$separatedMessageChannelSuffix", codec) if (api != null) { channel.setMessageHandler { _, reply -> - val wrapped: List = try { - listOf(api.getMediaChanges()) - } catch (exception: Throwable) { - MessagesPigeonUtils.wrapError(exception) + api.getMediaChanges{ result: Result -> + val error = result.exceptionOrNull() + if (error != null) { + reply.reply(MessagesPigeonUtils.wrapError(error)) + } else { + val data = result.getOrNull() + reply.reply(MessagesPigeonUtils.wrapResult(data)) + } } - reply.reply(wrapped) } } else { channel.setMessageHandler(null) @@ -629,32 +636,38 @@ interface NativeSyncApi { } } run { - val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum$separatedMessageChannelSuffix", codec, taskQueue) + val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum$separatedMessageChannelSuffix", codec) if (api != null) { channel.setMessageHandler { message, reply -> val args = message as List val albumIdArg = args[0] as String - val wrapped: List = try { - listOf(api.getAssetIdsForAlbum(albumIdArg)) - } catch (exception: Throwable) { - MessagesPigeonUtils.wrapError(exception) + api.getAssetIdsForAlbum(albumIdArg) { result: Result> -> + val error = result.exceptionOrNull() + if (error != null) { + reply.reply(MessagesPigeonUtils.wrapError(error)) + } else { + val data = result.getOrNull() + reply.reply(MessagesPigeonUtils.wrapResult(data)) + } } - reply.reply(wrapped) } } else { channel.setMessageHandler(null) } } run { - val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums$separatedMessageChannelSuffix", codec, taskQueue) + val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums$separatedMessageChannelSuffix", codec) if (api != null) { channel.setMessageHandler { _, reply -> - val wrapped: List = try { - listOf(api.getAlbums()) - } catch (exception: Throwable) { - MessagesPigeonUtils.wrapError(exception) + api.getAlbums{ result: Result> -> + val error = result.exceptionOrNull() + if (error != null) { + reply.reply(MessagesPigeonUtils.wrapError(error)) + } else { + val data = result.getOrNull() + reply.reply(MessagesPigeonUtils.wrapResult(data)) + } } - reply.reply(wrapped) } } else { channel.setMessageHandler(null) @@ -679,18 +692,21 @@ interface NativeSyncApi { } } run { - val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum$separatedMessageChannelSuffix", codec, taskQueue) + val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum$separatedMessageChannelSuffix", codec) if (api != null) { channel.setMessageHandler { message, reply -> val args = message as List val albumIdArg = args[0] as String val updatedTimeCondArg = args[1] as Long? - val wrapped: List = try { - listOf(api.getAssetsForAlbum(albumIdArg, updatedTimeCondArg)) - } catch (exception: Throwable) { - MessagesPigeonUtils.wrapError(exception) + api.getAssetsForAlbum(albumIdArg, updatedTimeCondArg) { result: Result> -> + val error = result.exceptionOrNull() + if (error != null) { + reply.reply(MessagesPigeonUtils.wrapError(error)) + } else { + val data = result.getOrNull() + reply.reply(MessagesPigeonUtils.wrapResult(data)) + } } - reply.reply(wrapped) } } else { channel.setMessageHandler(null) @@ -733,6 +749,22 @@ interface NativeSyncApi { channel.setMessageHandler(null) } } + run { + val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync$separatedMessageChannelSuffix", codec) + if (api != null) { + channel.setMessageHandler { _, reply -> + val wrapped: List = try { + api.cancelSync() + listOf(null) + } catch (exception: Throwable) { + MessagesPigeonUtils.wrapError(exception) + } + reply.reply(wrapped) + } + } else { + channel.setMessageHandler(null) + } + } run { val channel = BasicMessageChannel(binaryMessenger, "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$separatedMessageChannelSuffix", codec, taskQueue) if (api != null) { diff --git a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl26.kt b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl26.kt index 6d2c35d78f..180e23286c 100644 --- a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl26.kt +++ b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl26.kt @@ -4,7 +4,11 @@ import android.content.Context class NativeSyncApiImpl26(context: Context) : NativeSyncApiImplBase(context), NativeSyncApi { - override fun shouldFullSync(): Boolean { + override fun shouldFullSync(callback: (Result) -> Unit) { + runSync(callback) { shouldFullSync() } + } + + private fun shouldFullSync(): Boolean { return true } @@ -18,7 +22,11 @@ class NativeSyncApiImpl26(context: Context) : NativeSyncApiImplBase(context), Na // No-op for Android 10 and below } - override fun getMediaChanges(): SyncDelta { + override fun getMediaChanges(callback: (Result) -> Unit) { + runSync(callback) { getMediaChanges() } + } + + private fun getMediaChanges(): SyncDelta { throw IllegalStateException("Method not supported on this Android version.") } diff --git a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl30.kt b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl30.kt index ca54c9f823..4785b751c0 100644 --- a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl30.kt +++ b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImpl30.kt @@ -7,6 +7,8 @@ import android.os.Bundle import android.provider.MediaStore import androidx.annotation.RequiresApi import androidx.annotation.RequiresExtension +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive import kotlinx.serialization.json.Json @RequiresApi(Build.VERSION_CODES.Q) @@ -35,7 +37,11 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na } } - override fun shouldFullSync(): Boolean = + override fun shouldFullSync(callback: (Result) -> Unit) { + runSync(callback) { shouldFullSync() } + } + + private fun shouldFullSync(): Boolean = MediaStore.getVersion(ctx) != prefs.getString(SHARED_PREF_MEDIA_STORE_VERSION_KEY, null) override fun checkpointSync() { @@ -49,7 +55,11 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na } } - override fun getMediaChanges(): SyncDelta { + override fun getMediaChanges(callback: (Result) -> Unit) { + runSync(callback) { getMediaChanges() } + } + + private suspend fun getMediaChanges(): SyncDelta { val genMap = getSavedGenerationMap() val currentVolumes = MediaStore.getExternalVolumeNames(ctx) val changed = mutableListOf() @@ -58,6 +68,7 @@ class NativeSyncApiImpl30(context: Context) : NativeSyncApiImplBase(context), Na var hasChanges = genMap.keys != currentVolumes for (volume in currentVolumes) { + currentCoroutineContext().ensureActive() val currentGen = MediaStore.getGeneration(ctx, volume) val storedGen = genMap[volume] ?: 0 if (currentGen <= storedGen) { diff --git a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImplBase.kt b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImplBase.kt index 1f5ff2529e..18b771a613 100644 --- a/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImplBase.kt +++ b/mobile/android/app/src/main/kotlin/app/alextran/immich/sync/MessagesImplBase.kt @@ -45,12 +45,14 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa private val ctx: Context = context.applicationContext private var hashTask: Job? = null + private var syncJob: Job? = null private val mediaTrashDelegate = MediaTrashDelegate(ctx) companion object { private const val MAX_CONCURRENT_HASH_OPERATIONS = 16 private val hashSemaphore = Semaphore(MAX_CONCURRENT_HASH_OPERATIONS) private const val HASHING_CANCELLED_CODE = "HASH_CANCELLED" + private const val SYNC_CANCELLED_CODE = "SYNC_CANCELLED" // MediaStore.Files.FileColumns.SPECIAL_FORMAT — S Extensions 21+ // https://developer.android.com/reference/android/provider/MediaStore.Files.FileColumns#SPECIAL_FORMAT @@ -295,7 +297,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa return PlatformAssetPlaybackStyle.IMAGE } - fun getAlbums(): List { + fun getAlbums(callback: (Result>) -> Unit) { + runSync(callback) { getAlbums() } + } + + private suspend fun getAlbums(): List { val albums = mutableListOf() val albumsCount = mutableMapOf() @@ -322,6 +328,7 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa cursor.getColumnIndexOrThrow(MediaStore.Files.FileColumns.DATE_MODIFIED) while (cursor.moveToNext()) { + currentCoroutineContext().ensureActive() val id = cursor.getString(bucketIdColumn) val count = albumsCount.getOrDefault(id, 0) @@ -342,7 +349,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa .sortedBy { it.id } } - fun getAssetIdsForAlbum(albumId: String): List { + fun getAssetIdsForAlbum(albumId: String, callback: (Result>) -> Unit) { + runSync(callback) { getAssetIdsForAlbum(albumId) } + } + + private fun getAssetIdsForAlbum(albumId: String): List { val projection = arrayOf(MediaStore.MediaColumns._ID) return getCursor( @@ -366,7 +377,11 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa )?.use { cursor -> cursor.count.toLong() } ?: 0L - fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List { + fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?, callback: (Result>) -> Unit) { + runSync(callback) { getAssetsForAlbum(albumId, updatedTimeCond) } + } + + private fun getAssetsForAlbum(albumId: String, updatedTimeCond: Long?): List { var selection = "$BUCKET_SELECTION AND $MEDIA_SELECTION" val selectionArgs = mutableListOf(albumId, *MEDIA_SELECTION_ARGS) @@ -451,6 +466,24 @@ open class NativeSyncApiImplBase(context: Context) : ImmichPlugin(), ActivityAwa hashTask = null } + fun cancelSync() { + syncJob?.cancel() + syncJob = null + } + + protected fun runSync(callback: (Result) -> Unit, work: suspend () -> T) { + syncJob?.cancel() + syncJob = CoroutineScope(Dispatchers.IO).launch { + try { + completeWhenActive(callback, Result.success(work())) + } catch (e: CancellationException) { + completeWhenActive(callback, Result.failure(FlutterError(SYNC_CANCELLED_CODE, "Sync cancelled", null))) + } catch (e: Exception) { + completeWhenActive(callback, Result.failure(e)) + } + } + } + fun restoreFromTrashById(mediaId: String, type: Long, callback: (Result) -> Unit) { mediaTrashDelegate.restoreFromTrashById(mediaId, type) { completeWhenActive(callback, it) } } diff --git a/mobile/integration_test/background_sync_teardown_test.dart b/mobile/integration_test/background_sync_teardown_test.dart new file mode 100644 index 0000000000..0f125b7fcc --- /dev/null +++ b/mobile/integration_test/background_sync_teardown_test.dart @@ -0,0 +1,154 @@ +import 'dart:async'; + +import 'package:drift/drift.dart' show Value; +import 'package:flutter_test/flutter_test.dart'; +import 'package:immich_mobile/domain/models/store.model.dart'; +import 'package:immich_mobile/domain/utils/background_sync.dart'; +import 'package:immich_mobile/entities/store.entity.dart'; +import 'package:immich_mobile/infrastructure/entities/user.entity.drift.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; +import 'package:immich_mobile/main.dart' as app; +import 'package:immich_mobile/services/api.service.dart'; +import 'package:immich_mobile/utils/bootstrap.dart'; +import 'package:immich_mobile/wm_executor.dart'; +import 'package:integration_test/integration_test.dart'; +import 'package:openapi/api.dart'; + +import 'test_utils/fake_immich_server.dart'; + +void main() { + final binding = IntegrationTestWidgetsFlutterBinding.ensureInitialized(); + // These tests do real I/O without pumping a widget tree, so disable the fake async clock + binding.framePolicy = LiveTestWidgetsFlutterBindingFramePolicy.fullyLive; + + late Drift drift; + late FakeImmichServer server; + + setUpAll(() async { + await app.initApp(); + (drift, _) = await Bootstrap.initDomain(); + }); + + setUp(() async { + await workerManagerPatch.init(dynamicSpawning: true); + server = await FakeImmichServer.start(); + await ApiService().resolveAndSetEndpoint(server.endpoint); + await drift.delete(drift.userEntity).go(); + await Store.delete(StoreKey.syncMigrationStatus); + }); + + tearDown(() async { + await workerManagerPatch.dispose(); + await server.close(); + await Store.delete(StoreKey.serverEndpoint); + await Store.delete(StoreKey.syncMigrationStatus); + }); + + void sendUser(SyncStream stream, String id, String name) { + stream.send( + type: SyncEntityType.userV1.value, + data: SyncUserV1( + id: id, + name: name, + email: '$id@test.com', + hasProfileImage: false, + deletedAt: null, + profileChangedAt: DateTime.utc(2025), + ).toJson(), + ack: id, + ); + } + + Future dbReadable() async { + try { + await drift.customSelect('SELECT 1').get().timeout(const Duration(seconds: 5)); + return true; + } catch (_) { + return false; + } + } + + Future userCount() async => (await drift.select(drift.userEntity).get()).length; + + // Starts a remote sync and resolves once its /sync/stream request is open. + Future<(Future, SyncStream)> startSync() async { + final sync = BackgroundSyncManager().syncRemote(); + final stream = await server.streamOpened.timeout( + const Duration(seconds: 30), + onTimeout: () => fail('sync isolate never opened /sync/stream'), + ); + return (sync, stream); + } + + testWidgets('a full sync ingests streamed events into the shared DB', (tester) async { + expect(await userCount(), 0); + + final (sync, stream) = await startSync(); + + sendUser(stream, 'u1', 'Alice'); + sendUser(stream, 'u2', 'Bob'); + await stream.close(); + + final result = await sync.timeout( + const Duration(seconds: 30), + onTimeout: () => fail('sync did not complete after the stream ended'), + ); + expect(result, isTrue); + expect(await userCount(), 2); + expect(server.ackRequests, greaterThan(0)); + }); + + testWidgets('disposing the pool during an in-flight sync drains promptly', (tester) async { + final (sync, _) = await startSync(); + + final sw = Stopwatch()..start(); + await workerManagerPatch.dispose().timeout( + const Duration(seconds: 15), + onTimeout: () => fail('dispose() hung — worker did not drain and exit'), + ); + expect(sw.elapsed, lessThan(const Duration(seconds: 10)), reason: 'abort-driven, not socket-timeout bound'); + + expect(await sync.timeout(const Duration(seconds: 5), onTimeout: () => false), isFalse); + }); + + testWidgets('tearing down a worker blocked mid-write leaves the DB usable', (tester) async { + final (sync, stream) = await startSync(); + + // Hold an exclusive write transaction so the worker's write is blocked. The lock is taken only + // after the stream opens to avoid blocking the worker's own startup DB reads. + final releaseTxn = Completer(); + final txnHeld = Completer(); + final txn = drift.transaction(() async { + await drift.into(drift.userEntity).insert( + UserEntityCompanion.insert( + id: 'holder', + name: 'holder', + email: 'holder@test.com', + hasProfileImage: const Value(false), + profileChangedAt: Value(DateTime.utc(2025)), + ), + ); + txnHeld.complete(); + await releaseTxn.future; + }); + await txnHeld.future; + + sendUser(stream, 'u1', 'Alice'); + await stream.close(); + + // dispose() can only finish once the worker unwinds, which is blocked on the + // lock — so start it, release the lock, then await completion. + final disposed = workerManagerPatch.dispose(); + releaseTxn.complete(); + await txn; + await disposed.timeout( + const Duration(seconds: 15), + onTimeout: () => fail('dispose() hung after releasing the write lock'), + ); + await sync.timeout(const Duration(seconds: 5), onTimeout: () => false); + + expect(await dbReadable(), isTrue); + final users = await drift.select(drift.userEntity).get(); + expect(users.map((u) => u.id), contains('holder')); + }); +} diff --git a/mobile/integration_test/test_utils/fake_immich_server.dart b/mobile/integration_test/test_utils/fake_immich_server.dart new file mode 100644 index 0000000000..c434f83bc5 --- /dev/null +++ b/mobile/integration_test/test_utils/fake_immich_server.dart @@ -0,0 +1,115 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +/// A dummy localhost server that implements only the endpoints that remote-sync touches. +class FakeImmichServer { + FakeImmichServer._(this._server, this.version); + + final HttpServer _server; + final (int, int, int) version; + + final Completer _streamOpened = Completer(); + + int ackRequests = 0; + + String get endpoint => 'http://${_server.address.host}:${_server.port}/api'; + + /// Resolves when the sync isolate opens `POST /sync/stream`. + Future get streamOpened => _streamOpened.future; + + static Future start({(int, int, int) version = (3, 0, 0)}) async { + final server = await HttpServer.bind(InternetAddress.loopbackIPv4, 0); + final fake = FakeImmichServer._(server, version); + fake._listen(); + return fake; + } + + void _listen() { + // A connection torn down mid-write during teardown is expected + _server.listen((request) => unawaited(_route(request).catchError((_) {}))); + } + + Future _route(HttpRequest request) async { + final method = request.method; + final path = request.uri.path; + + if (method == 'GET' && path == '/api/server/ping') { + return _respondJson(request, {'res': 'pong'}); + } + if (method == 'GET' && path == '/api/server/version') { + final (major, minor, patch) = version; + return _respondJson(request, {'major': major, 'minor': minor, 'patch': patch}); + } + if (path == '/api/sync/ack') { + if (method != 'DELETE') { + ackRequests++; + } + return _respondEmpty(request); + } + if (method == 'POST' && path == '/api/sync/stream') { + return _openSyncStream(request); + } + return _respondEmpty(request, status: HttpStatus.notFound); + } + + Future _openSyncStream(HttpRequest request) async { + await request.drain(); + request.response + ..statusCode = HttpStatus.ok + ..headers.contentType = ContentType('application', 'jsonlines+json') + ..contentLength = -1 // chunked: stays open to stream incrementally + ..bufferOutput = false; + // Flush headers so the client's send() resolves and enters its read loop. + await request.response.flush(); + if (!_streamOpened.isCompleted) { + _streamOpened.complete(SyncStream._(request.response)); + } + } + + Future _respondJson(HttpRequest request, Object body) async { + await request.drain(); + request.response + ..statusCode = HttpStatus.ok + ..headers.contentType = ContentType.json + ..write(jsonEncode(body)); + await request.response.close(); + } + + Future _respondEmpty(HttpRequest request, {int status = HttpStatus.ok}) async { + await request.drain(); + request.response.statusCode = status; + await request.response.close(); + } + + Future close() async { + if (_streamOpened.isCompleted) { + await (await _streamOpened.future).close(); + } + await _server.close(force: true); + } +} + +/// Handle to the open `/sync/stream` response: push jsonlines events, then end. +class SyncStream { + SyncStream._(this._response); + + final HttpResponse _response; + bool _closed = false; + + /// [data] should be a Sync*V1 DTO's `toJson()` so the parser's `fromJson` round-trips it. + void send({required String type, required Object data, required String ack}) { + if (_closed) { + return; + } + _response.write('${jsonEncode({'type': type, 'data': data, 'ack': ack})}\n'); + } + + Future close() async { + if (_closed) { + return; + } + _closed = true; + await _response.close(); + } +} diff --git a/mobile/ios/Runner/Background/BackgroundWorker.swift b/mobile/ios/Runner/Background/BackgroundWorker.swift index c5b5e1778a..ad583065f0 100644 --- a/mobile/ios/Runner/Background/BackgroundWorker.swift +++ b/mobile/ios/Runner/Background/BackgroundWorker.swift @@ -121,8 +121,8 @@ class BackgroundWorker: BackgroundWorkerBgHostApi { /** * Cancels the currently running background task, either due to timeout or external request. - * Sends a cancel signal to the Flutter side and sets up a fallback timer to ensure - * the completion handler is eventually called even if Flutter doesn't respond. + * Only tears down the engine after Dart confirms it's drained. If Dart overruns iOS's grace window, + * the expiration handler still calls setTaskCompleted and iOS suspends us. */ func close() { if isComplete { @@ -132,12 +132,6 @@ class BackgroundWorker: BackgroundWorkerBgHostApi { flutterApi?.cancel { result in self.complete(success: false) } - - // Fallback safety mechanism: ensure completion is called within 2 seconds - // This prevents the background task from hanging indefinitely if Flutter doesn't respond - Timer.scheduledTimer(withTimeInterval: 2, repeats: false) { _ in - self.complete(success: false) - } } diff --git a/mobile/ios/Runner/Sync/Messages.g.swift b/mobile/ios/Runner/Sync/Messages.g.swift index d18a153bb7..a752785c5b 100644 --- a/mobile/ios/Runner/Sync/Messages.g.swift +++ b/mobile/ios/Runner/Sync/Messages.g.swift @@ -526,16 +526,17 @@ class MessagesPigeonCodec: FlutterStandardMessageCodec, @unchecked Sendable { /// Generated protocol from Pigeon that represents a handler of messages from Flutter. protocol NativeSyncApi { - func shouldFullSync() throws -> Bool - func getMediaChanges() throws -> SyncDelta + func shouldFullSync(completion: @escaping (Result) -> Void) + func getMediaChanges(completion: @escaping (Result) -> Void) func checkpointSync() throws func clearSyncCheckpoint() throws - func getAssetIdsForAlbum(albumId: String) throws -> [String] - func getAlbums() throws -> [PlatformAlbum] + func getAssetIdsForAlbum(albumId: String, completion: @escaping (Result<[String], Error>) -> Void) + func getAlbums(completion: @escaping (Result<[PlatformAlbum], Error>) -> Void) func getAssetsCountSince(albumId: String, timestamp: Int64) throws -> Int64 - func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset] + func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?, completion: @escaping (Result<[PlatformAsset], Error>) -> Void) func hashAssets(assetIds: [String], allowNetworkAccess: Bool, completion: @escaping (Result<[HashResult], Error>) -> Void) func cancelHashing() throws + func cancelSync() throws func getTrashedAssets() throws -> [String: [PlatformAsset]] func restoreFromTrashById(mediaId: String, type: Int64, completion: @escaping (Result) -> Void) func getCloudIdForAssetIds(assetIds: [String]) throws -> [CloudIdResult] @@ -555,26 +556,28 @@ class NativeSyncApiSetup { let shouldFullSyncChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.shouldFullSync\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) if let api = api { shouldFullSyncChannel.setMessageHandler { _, reply in - do { - let result = try api.shouldFullSync() - reply(wrapResult(result)) - } catch { - reply(wrapError(error)) + api.shouldFullSync { result in + switch result { + case .success(let res): + reply(wrapResult(res)) + case .failure(let error): + reply(wrapError(error)) + } } } } else { shouldFullSyncChannel.setMessageHandler(nil) } - let getMediaChangesChannel = taskQueue == nil - ? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) - : FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue) + let getMediaChangesChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getMediaChanges\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) if let api = api { getMediaChangesChannel.setMessageHandler { _, reply in - do { - let result = try api.getMediaChanges() - reply(wrapResult(result)) - } catch { - reply(wrapError(error)) + api.getMediaChanges { result in + switch result { + case .success(let res): + reply(wrapResult(res)) + case .failure(let error): + reply(wrapError(error)) + } } } } else { @@ -606,33 +609,33 @@ class NativeSyncApiSetup { } else { clearSyncCheckpointChannel.setMessageHandler(nil) } - let getAssetIdsForAlbumChannel = taskQueue == nil - ? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) - : FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue) + let getAssetIdsForAlbumChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetIdsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) if let api = api { getAssetIdsForAlbumChannel.setMessageHandler { message, reply in let args = message as! [Any?] let albumIdArg = args[0] as! String - do { - let result = try api.getAssetIdsForAlbum(albumId: albumIdArg) - reply(wrapResult(result)) - } catch { - reply(wrapError(error)) + api.getAssetIdsForAlbum(albumId: albumIdArg) { result in + switch result { + case .success(let res): + reply(wrapResult(res)) + case .failure(let error): + reply(wrapError(error)) + } } } } else { getAssetIdsForAlbumChannel.setMessageHandler(nil) } - let getAlbumsChannel = taskQueue == nil - ? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) - : FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue) + let getAlbumsChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAlbums\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) if let api = api { getAlbumsChannel.setMessageHandler { _, reply in - do { - let result = try api.getAlbums() - reply(wrapResult(result)) - } catch { - reply(wrapError(error)) + api.getAlbums { result in + switch result { + case .success(let res): + reply(wrapResult(res)) + case .failure(let error): + reply(wrapError(error)) + } } } } else { @@ -656,19 +659,19 @@ class NativeSyncApiSetup { } else { getAssetsCountSinceChannel.setMessageHandler(nil) } - let getAssetsForAlbumChannel = taskQueue == nil - ? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) - : FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue) + let getAssetsForAlbumChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getAssetsForAlbum\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) if let api = api { getAssetsForAlbumChannel.setMessageHandler { message, reply in let args = message as! [Any?] let albumIdArg = args[0] as! String let updatedTimeCondArg: Int64? = nilOrValue(args[1]) - do { - let result = try api.getAssetsForAlbum(albumId: albumIdArg, updatedTimeCond: updatedTimeCondArg) - reply(wrapResult(result)) - } catch { - reply(wrapError(error)) + api.getAssetsForAlbum(albumId: albumIdArg, updatedTimeCond: updatedTimeCondArg) { result in + switch result { + case .success(let res): + reply(wrapResult(res)) + case .failure(let error): + reply(wrapError(error)) + } } } } else { @@ -707,6 +710,19 @@ class NativeSyncApiSetup { } else { cancelHashingChannel.setMessageHandler(nil) } + let cancelSyncChannel = FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) + if let api = api { + cancelSyncChannel.setMessageHandler { _, reply in + do { + try api.cancelSync() + reply(wrapResult(nil)) + } catch { + reply(wrapError(error)) + } + } + } else { + cancelSyncChannel.setMessageHandler(nil) + } let getTrashedAssetsChannel = taskQueue == nil ? FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec) : FlutterBasicMessageChannel(name: "dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets\(channelSuffix)", binaryMessenger: binaryMessenger, codec: codec, taskQueue: taskQueue) diff --git a/mobile/ios/Runner/Sync/MessagesImpl.swift b/mobile/ios/Runner/Sync/MessagesImpl.swift index e6903defeb..ddfd023690 100644 --- a/mobile/ios/Runner/Sync/MessagesImpl.swift +++ b/mobile/ios/Runner/Sync/MessagesImpl.swift @@ -39,6 +39,9 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { private static let hashCancelledCode = "HASH_CANCELLED" private static let hashCancelled = Result<[HashResult], Error>.failure(PigeonError(code: hashCancelledCode, message: "Hashing cancelled", details: nil)) + private var syncTask: Task? + private static let syncCancelledCode = "SYNC_CANCELLED" + private static let syncCancelled = PigeonError(code: syncCancelledCode, message: "Sync cancelled", details: nil) init(with defaults: UserDefaults = .standard) { self.defaults = defaults @@ -71,7 +74,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { saveChangeToken(token: PHPhotoLibrary.shared().currentChangeToken) } - func shouldFullSync() -> Bool { + func shouldFullSync(completion: @escaping (Result) -> Void) { + runSync(completion) { $0.shouldFullSync() } + } + + private func shouldFullSync() -> Bool { guard #available(iOS 16, *), PHPhotoLibrary.authorizationStatus(for: .readWrite) == .authorized, let storedToken = getChangeToken() else { @@ -87,12 +94,17 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { return false } - func getAlbums() throws -> [PlatformAlbum] { + func getAlbums(completion: @escaping (Result<[PlatformAlbum], Error>) -> Void) { + runSync(completion) { try $0.getAlbums() } + } + + private func getAlbums() throws -> [PlatformAlbum] { var albums: [PlatformAlbum] = [] - albumTypes.forEach { type in + for type in albumTypes { let collections = PHAssetCollection.fetchAssetCollections(with: type, subtype: .any, options: nil) for i in 0.. SyncDelta { + func getMediaChanges(completion: @escaping (Result) -> Void) { + runSync(completion) { try $0.getMediaChanges() } + } + + private func getMediaChanges() throws -> SyncDelta { guard #available(iOS 16, *) else { throw PigeonError(code: "UNSUPPORTED_OS", message: "This feature requires iOS 16 or later.", details: nil) } @@ -146,51 +162,49 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { return SyncDelta(hasChanges: false, updates: [], deletes: [], assetAlbums: [:]) } - do { - let changes = try PHPhotoLibrary.shared().fetchPersistentChanges(since: storedToken) + let changes = try PHPhotoLibrary.shared().fetchPersistentChanges(since: storedToken) + + var updatedAssets: Set = [] + var deletedAssets: Set = [] + + for change in changes { + try Task.checkCancellation() + guard let details = try? change.changeDetails(for: PHObjectType.asset) else { continue } - var updatedAssets: Set = [] - var deletedAssets: Set = [] + let updated = details.updatedLocalIdentifiers.union(details.insertedLocalIdentifiers) + deletedAssets.formUnion(details.deletedLocalIdentifiers) - for change in changes { - guard let details = try? change.changeDetails(for: PHObjectType.asset) else { continue } + if (updated.isEmpty) { continue } + + let options = PHFetchOptions() + options.includeHiddenAssets = false + let result = PHAsset.fetchAssets(withLocalIdentifiers: Array(updated), options: options) + for i in 0..) -> [String: [String]] { guard !assets.isEmpty else { return [:] @@ -213,7 +227,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { return albumAssets } - func getAssetIdsForAlbum(albumId: String) throws -> [String] { + func getAssetIdsForAlbum(albumId: String, completion: @escaping (Result<[String], Error>) -> Void) { + runSync(completion) { try $0.getAssetIdsForAlbum(albumId: albumId) } + } + + private func getAssetIdsForAlbum(albumId: String) throws -> [String] { let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil) guard let album = collections.firstObject else { return [] @@ -223,9 +241,14 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { let options = PHFetchOptions() options.includeHiddenAssets = false let assets = getAssetsFromAlbum(in: album, options: options) - assets.enumerateObjects { (asset, _, _) in + assets.enumerateObjects { (asset, _, stop) in + if Task.isCancelled { + stop.pointee = true + return + } ids.append(asset.localIdentifier) } + try Task.checkCancellation() return ids } @@ -243,7 +266,11 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { return Int64(assets.count) } - func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset] { + func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?, completion: @escaping (Result<[PlatformAsset], Error>) -> Void) { + runSync(completion) { try $0.getAssetsForAlbum(albumId: albumId, updatedTimeCond: updatedTimeCond) } + } + + private func getAssetsForAlbum(albumId: String, updatedTimeCond: Int64?) throws -> [PlatformAsset] { let collections = PHAssetCollection.fetchAssetCollections(withLocalIdentifiers: [albumId], options: nil) guard let album = collections.firstObject else { return [] @@ -262,9 +289,14 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { } var assets: [PlatformAsset] = [] - result.enumerateObjects { (asset, _, _) in + result.enumerateObjects { (asset, _, stop) in + if Task.isCancelled { + stop.pointee = true + return + } assets.append(asset.toPlatformAsset()) } + try Task.checkCancellation() return assets } @@ -324,6 +356,31 @@ class NativeSyncApiImpl: ImmichPlugin, NativeSyncApi, FlutterPlugin { hashTask = nil } + func cancelSync() { + syncTask?.cancel() + syncTask = nil + } + + private func runSync( + _ completion: @escaping (Result) -> Void, + _ work: @escaping (NativeSyncApiImpl) throws -> T + ) { + syncTask?.cancel() + syncTask = Task { [weak self] in + guard let self else { return nil } + let result: Result + do { + result = .success(try work(self)) + } catch is CancellationError { + result = .failure(Self.syncCancelled) + } catch { + result = .failure(error) + } + self.completeWhenActive(for: completion, with: result) + return nil + } + } + private func hashAsset(_ asset: PHAsset, allowNetworkAccess: Bool) async -> HashResult? { class RequestRef { var id: PHAssetResourceDataRequestID? diff --git a/mobile/lib/domain/services/background_worker.service.dart b/mobile/lib/domain/services/background_worker.service.dart index d28f7ff14b..eadcf7c9db 100644 --- a/mobile/lib/domain/services/background_worker.service.dart +++ b/mobile/lib/domain/services/background_worker.service.dart @@ -188,20 +188,14 @@ class BackgroundWorkerBgService extends BackgroundWorkerFlutterApi { if (!_cancellationToken.isCompleted) { _cancellationToken.complete(); } - final cleanupFutures = [ - nativeSyncApi?.cancelHashing(), - workerManagerPatch.dispose().catchError((_) async { - // Discard any errors on the dispose call - return; - }), - LogService.I.dispose(), - Store.dispose(), - backgroundSyncManager?.cancel(), - _drift.optimize(allTables: true), - ]; - - await Future.wait(cleanupFutures.nonNulls); + // Workers share one sqlite connection, so DB teardown must wait until every worker has stopped using it. + await Future.wait([ + if (backgroundSyncManager != null) backgroundSyncManager.cancel(), + if (nativeSyncApi != null) nativeSyncApi.cancelHashing(), + ]); + await workerManagerPatch.dispose().catchError((_) async {}); + await Future.wait([LogService.I.dispose(), Store.dispose(), _drift.optimize(allTables: true)]); await _drift.close(); await _driftLogger.close(); diff --git a/mobile/lib/domain/services/hash.service.dart b/mobile/lib/domain/services/hash.service.dart index e2938a79ad..e4c332b283 100644 --- a/mobile/lib/domain/services/hash.service.dart +++ b/mobile/lib/domain/services/hash.service.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:flutter/services.dart'; import 'package:immich_mobile/constants/constants.dart'; import 'package:immich_mobile/domain/models/album/local_album.model.dart'; @@ -17,7 +19,7 @@ class HashService { final DriftLocalAssetRepository _localAssetRepository; final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository; final NativeSyncApi _nativeSyncApi; - final bool Function()? _cancelChecker; + final Completer? _cancellation; final _log = Logger('HashService'); HashService({ @@ -25,11 +27,15 @@ class HashService { required this._localAssetRepository, required this._trashedLocalAssetRepository, required this._nativeSyncApi, - this._cancelChecker, + this._cancellation, int? batchSize, - }) : _batchSize = batchSize ?? kBatchHashFileLimit; + }) : _batchSize = batchSize ?? kBatchHashFileLimit { + // Stop the in-flight native hash call promptly on cancellation; the loops + // below also observe [isCancelled] to bail between batches. + _cancellation?.future.then((_) => _nativeSyncApi.cancelHashing().onError(_log.warning)); + } - bool get isCancelled => _cancelChecker?.call() ?? false; + bool get isCancelled => _cancellation?.isCompleted ?? false; Future hashAssets() async { _log.info("Starting hashing of assets"); diff --git a/mobile/lib/domain/services/local_sync.service.dart b/mobile/lib/domain/services/local_sync.service.dart index 77ded0ba4d..feb104f90d 100644 --- a/mobile/lib/domain/services/local_sync.service.dart +++ b/mobile/lib/domain/services/local_sync.service.dart @@ -2,6 +2,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'package:flutter/foundation.dart'; +import 'package:flutter/services.dart'; import 'package:immich_mobile/domain/models/album/local_album.model.dart'; import 'package:immich_mobile/domain/models/asset/base_asset.model.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; @@ -17,6 +18,8 @@ import 'package:immich_mobile/utils/datetime_helpers.dart'; import 'package:immich_mobile/utils/diff.dart'; import 'package:logging/logging.dart'; +const String _kSyncCancelledCode = "SYNC_CANCELLED"; + class LocalSyncService { final DriftLocalAlbumRepository _localAlbumRepository; // ignore: unused_field @@ -25,6 +28,7 @@ class LocalSyncService { final DriftTrashedLocalAssetRepository _trashedLocalAssetRepository; final AssetMediaRepository _assetMediaRepository; final IPermissionRepository _permissionRepository; + final Completer? _cancellation; final Logger _log = Logger("DeviceSyncService"); LocalSyncService({ @@ -34,7 +38,12 @@ class LocalSyncService { required this._trashedLocalAssetRepository, required this._assetMediaRepository, required this._permissionRepository, - }); + this._cancellation, + }) { + _cancellation?.future.then((_) => _nativeSyncApi.cancelSync().onError(_log.warning)); + } + + bool get _isCancelled => _cancellation?.isCompleted ?? false; Future sync({bool full = false}) async { final Stopwatch stopwatch = Stopwatch()..start(); @@ -81,6 +90,10 @@ class LocalSyncService { // detect album deletions from the native side if (CurrentPlatform.isAndroid) { for (final album in dbAlbums) { + if (_isCancelled) { + _log.warning("Local sync cancelled. Stopped processing albums."); + return; + } final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id); await _localAlbumRepository.syncDeletes(album.id, deviceIds); } @@ -91,6 +104,10 @@ class LocalSyncService { // does not include changes for cloud albums. final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums(); for (final album in cloudAlbums) { + if (_isCancelled) { + _log.warning("Local sync cancelled. Stopped processing cloud albums."); + return; + } final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id); if (dbAlbum == null) { _log.warning("Cloud album ${album.name} not found in local database. Skipping sync."); @@ -102,6 +119,12 @@ class LocalSyncService { await _mapIosCloudIds(newAssets); } await _nativeSyncApi.checkpointSync(); + } on PlatformException catch (e, s) { + if (e.code == _kSyncCancelledCode) { + _log.warning("Local sync cancelled"); + } else { + _log.severe("Error performing device sync", e, s); + } } catch (e, s) { _log.severe("Error performing device sync", e, s); } finally { @@ -129,12 +152,21 @@ class LocalSyncService { await _nativeSyncApi.checkpointSync(); stopwatch.stop(); _log.info("Full device sync took - ${stopwatch.elapsedMilliseconds}ms"); + } on PlatformException catch (e, s) { + if (e.code == _kSyncCancelledCode) { + _log.warning("Full device sync cancelled"); + } else { + _log.severe("Error performing full device sync", e, s); + } } catch (e, s) { _log.severe("Error performing full device sync", e, s); } } Future addAlbum(LocalAlbum album) async { + if (_isCancelled) { + return; + } try { _log.fine("Adding device album ${album.name}"); @@ -162,6 +194,9 @@ class LocalSyncService { // The deviceAlbum is ignored since we are going to refresh it anyways FutureOr updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum) async { + if (_isCancelled) { + return false; + } try { _log.fine("Syncing device album ${dbAlbum.name}"); diff --git a/mobile/lib/domain/services/log.service.dart b/mobile/lib/domain/services/log.service.dart index 216f030b12..b612b3ce91 100644 --- a/mobile/lib/domain/services/log.service.dart +++ b/mobile/lib/domain/services/log.service.dart @@ -112,10 +112,16 @@ class LogService { return _flushBuffer(); } - Future dispose() { + Future dispose() async { _flushTimer?.cancel(); - _logSubscription.cancel(); - return _flushBuffer(); + _flushTimer = null; + await _logSubscription.cancel(); + await _flushBuffer(); + // Allow a subsequent init() (e.g. when a worker isolate is reused) to + // create a fresh instance instead of returning this disposed one. + if (identical(_instance, this)) { + _instance = null; + } } Future _flushBuffer() async { diff --git a/mobile/lib/domain/services/store.service.dart b/mobile/lib/domain/services/store.service.dart index 16ed64e6d3..758622a43b 100644 --- a/mobile/lib/domain/services/store.service.dart +++ b/mobile/lib/domain/services/store.service.dart @@ -54,7 +54,13 @@ class StoreService { /// Disposes the store and cancels the subscription. To reuse the store call init() again Future dispose() async { await _storeUpdateSubscription?.cancel(); + _storeUpdateSubscription = null; _cache.clear(); + // Allow a subsequent init() (e.g. when a worker isolate is reused) to + // create a fresh instance instead of returning this disposed one. + if (identical(_instance, this)) { + _instance = null; + } } /// Returns the cached value for [key], or `null` diff --git a/mobile/lib/domain/services/sync_linked_album.service.dart b/mobile/lib/domain/services/sync_linked_album.service.dart index 3bc76083b8..ddcd6721d7 100644 --- a/mobile/lib/domain/services/sync_linked_album.service.dart +++ b/mobile/lib/domain/services/sync_linked_album.service.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/models/album/local_album.model.dart'; import 'package:immich_mobile/domain/models/store.model.dart'; @@ -5,6 +7,7 @@ import 'package:immich_mobile/domain/services/store.service.dart'; import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/remote_album.repository.dart'; import 'package:immich_mobile/providers/infrastructure/album.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/store.provider.dart'; import 'package:immich_mobile/repositories/drift_album_api_repository.dart'; import 'package:immich_mobile/utils/debug_print.dart'; @@ -16,6 +19,7 @@ final syncLinkedAlbumServiceProvider = Provider( ref.watch(remoteAlbumRepository), ref.watch(driftAlbumApiRepositoryProvider), ref.watch(storeServiceProvider), + cancellation: ref.watch(cancellationProvider), ), ); @@ -24,13 +28,15 @@ class SyncLinkedAlbumService { final DriftRemoteAlbumRepository _remoteAlbumRepository; final DriftAlbumApiRepository _albumApiRepository; final StoreService _storeService; + final Completer? _cancellation; SyncLinkedAlbumService( this._localAlbumRepository, this._remoteAlbumRepository, this._albumApiRepository, - this._storeService, - ); + this._storeService, { + this._cancellation, + }); final _log = Logger("SyncLinkedAlbumService"); @@ -55,7 +61,11 @@ class SyncLinkedAlbumService { final assetIds = await _remoteAlbumRepository.getLinkedAssetIds(userId, localAlbum.id, linkedRemoteAlbumId); _log.fine("Syncing ${assetIds.length} assets to remote album: ${remoteAlbum.name}"); if (assetIds.isNotEmpty) { - final album = await _albumApiRepository.addAssets(remoteAlbum.id, assetIds); + final album = await _albumApiRepository.addAssets( + remoteAlbum.id, + assetIds, + abortTrigger: _cancellation?.future, + ); await _remoteAlbumRepository.addAssets(remoteAlbum.id, album.added); } }), diff --git a/mobile/lib/domain/services/sync_stream.service.dart b/mobile/lib/domain/services/sync_stream.service.dart index 200dca2418..08109b25d3 100644 --- a/mobile/lib/domain/services/sync_stream.service.dart +++ b/mobile/lib/domain/services/sync_stream.service.dart @@ -38,7 +38,7 @@ class SyncStreamService { final IPermissionRepository _permissionRepository; final SyncMigrationRepository _syncMigrationRepository; final ApiService _api; - final bool Function()? _cancelChecker; + final Completer? _cancellation; SyncStreamService({ required this._syncApiRepository, @@ -49,10 +49,10 @@ class SyncStreamService { required this._permissionRepository, required this._syncMigrationRepository, required this._api, - this._cancelChecker, + this._cancellation, }); - bool get isCancelled => _cancelChecker?.call() ?? false; + bool get isCancelled => _cancellation?.isCompleted ?? false; Future sync() async { _logger.info("Remote sync request for user"); @@ -80,10 +80,15 @@ class SyncStreamService { _handleEvents, serverVersion: serverSemVer, onReset: () => shouldReset = true, + abortSignal: _cancellation?.future, ); if (shouldReset) { _logger.info("Resetting sync state as requested by server"); - await _syncApiRepository.streamChanges(_handleEvents, serverVersion: serverSemVer); + await _syncApiRepository.streamChanges( + _handleEvents, + serverVersion: serverSemVer, + abortSignal: _cancellation?.future, + ); } previousLength = migrations.length; @@ -318,7 +323,7 @@ class SyncStreamService { } Future handleWsAssetUploadReadyV1Batch(List batchData) async { - if (batchData.isEmpty) { + if (batchData.isEmpty || isCancelled) { return; } @@ -361,7 +366,7 @@ class SyncStreamService { } Future handleWsAssetUploadReadyV2Batch(List batchData) async { - if (batchData.isEmpty) { + if (batchData.isEmpty || isCancelled) { return; } @@ -404,6 +409,9 @@ class SyncStreamService { } Future handleWsAssetEditReadyV1(dynamic data) async { + if (isCancelled) { + return; + } _logger.info('Processing AssetEditReadyV1 event'); try { @@ -444,6 +452,9 @@ class SyncStreamService { } Future handleWsAssetEditReadyV2(dynamic data) async { + if (isCancelled) { + return; + } _logger.info('Processing AssetEditReadyV2 event'); try { diff --git a/mobile/lib/domain/utils/background_sync.dart b/mobile/lib/domain/utils/background_sync.dart index 030e77cd54..82f397d9b6 100644 --- a/mobile/lib/domain/utils/background_sync.dart +++ b/mobile/lib/domain/utils/background_sync.dart @@ -50,53 +50,27 @@ class BackgroundSyncManager { }); Future cancel() async { - final futures = []; - - if (_syncTask != null) { - futures.add(_syncTask!.future); + final tasks = [ + _syncTask, + _syncWebsocketTask, + _cloudIdSyncTask, + _linkedAlbumSyncTask, + _deviceAlbumSyncTask, + _hashTask, + ]; + final futures = [ + for (final task in tasks) + if (task != null) task.future, + ]; + for (final task in tasks) { + task?.cancel(); } - _syncTask?.cancel(); _syncTask = null; - - if (_syncWebsocketTask != null) { - futures.add(_syncWebsocketTask!.future); - } - _syncWebsocketTask?.cancel(); _syncWebsocketTask = null; - - if (_cloudIdSyncTask != null) { - futures.add(_cloudIdSyncTask!.future); - } - _cloudIdSyncTask?.cancel(); _cloudIdSyncTask = null; - - if (_linkedAlbumSyncTask != null) { - futures.add(_linkedAlbumSyncTask!.future); - } - _linkedAlbumSyncTask?.cancel(); _linkedAlbumSyncTask = null; - - try { - await Future.wait(futures); - } on CanceledError { - // Ignore cancellation errors - } - } - - Future cancelLocal() async { - final futures = []; - - if (_hashTask != null) { - futures.add(_hashTask!.future); - } - _hashTask?.cancel(); - _hashTask = null; - - if (_deviceAlbumSyncTask != null) { - futures.add(_deviceAlbumSyncTask!.future); - } - _deviceAlbumSyncTask?.cancel(); _deviceAlbumSyncTask = null; + _hashTask = null; try { await Future.wait(futures); diff --git a/mobile/lib/domain/utils/migrate_cloud_ids.dart b/mobile/lib/domain/utils/migrate_cloud_ids.dart index 32188b4838..efef6e8327 100644 --- a/mobile/lib/domain/utils/migrate_cloud_ids.dart +++ b/mobile/lib/domain/utils/migrate_cloud_ids.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:drift/drift.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/constants/constants.dart'; @@ -9,6 +11,7 @@ import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart'; import 'package:immich_mobile/platform/native_sync_api.g.dart'; import 'package:immich_mobile/providers/api.provider.dart'; +import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/providers/infrastructure/sync.provider.dart'; import 'package:immich_mobile/providers/server_info.provider.dart'; @@ -51,9 +54,10 @@ Future syncCloudIds(ProviderContainer ref) async { } final assetApi = ref.read(apiServiceProvider).assetsApi; + final cancellation = ref.read(cancellationProvider); // Process cloud IDs in paginated batches - await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger); + await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger, cancellation); } Future _processCloudIdMappingsInBatches( @@ -62,12 +66,17 @@ Future _processCloudIdMappingsInBatches( AssetsApi assetsApi, bool canBulkUpdate, Logger logger, + Completer cancellation, ) async { const pageSize = 20000; String? lastLocalId; final seenRemoteAssetIds = {}; while (true) { + if (cancellation.isCompleted) { + logger.warning('Cloud ID migration cancelled. Stopping batch processing.'); + break; + } final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, lastLocalId); if (mappings.isEmpty) { break; @@ -98,9 +107,9 @@ Future _processCloudIdMappingsInBatches( if (items.isNotEmpty) { if (canBulkUpdate) { - await _bulkUpdateCloudIds(assetsApi, items); + await _bulkUpdateCloudIds(assetsApi, items, cancellation.future); } else { - await _sequentialUpdateCloudIds(assetsApi, items); + await _sequentialUpdateCloudIds(assetsApi, items, cancellation); } } @@ -111,20 +120,35 @@ Future _processCloudIdMappingsInBatches( } } -Future _sequentialUpdateCloudIds(AssetsApi assetsApi, List items) async { +Future _sequentialUpdateCloudIds( + AssetsApi assetsApi, + List items, + Completer cancellation, +) async { for (final item in items) { + if (cancellation.isCompleted) { + break; + } final upsertItem = AssetMetadataUpsertItemDto(key: item.key, value: item.value); try { - await assetsApi.updateAssetMetadata(item.assetId, AssetMetadataUpsertDto(items: [upsertItem])); + await assetsApi.updateAssetMetadata( + item.assetId, + AssetMetadataUpsertDto(items: [upsertItem]), + abortTrigger: cancellation.future, + ); } catch (error, stack) { Logger('migrateCloudIds').warning('Failed to update metadata for asset ${item.assetId}', error, stack); } } } -Future _bulkUpdateCloudIds(AssetsApi assetsApi, List items) async { +Future _bulkUpdateCloudIds( + AssetsApi assetsApi, + List items, + Future abortTrigger, +) async { try { - await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items)); + await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items), abortTrigger: abortTrigger); } catch (error, stack) { Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack); } diff --git a/mobile/lib/infrastructure/repositories/sync_api.repository.dart b/mobile/lib/infrastructure/repositories/sync_api.repository.dart index 8b8475d31f..65214f3846 100644 --- a/mobile/lib/infrastructure/repositories/sync_api.repository.dart +++ b/mobile/lib/infrastructure/repositories/sync_api.repository.dart @@ -29,6 +29,7 @@ class SyncApiRepository { Function()? onReset, int batchSize = kSyncEventBatchSize, http.Client? httpClient, + Future? abortSignal, }) async { final stopwatch = Stopwatch()..start(); final client = httpClient ?? NetworkRepository.client; @@ -36,7 +37,7 @@ class SyncApiRepository { final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'}; - final request = http.Request('POST', Uri.parse(endpoint)); + final request = http.AbortableRequest('POST', Uri.parse(endpoint), abortTrigger: abortSignal); request.headers.addAll(headers); request.body = jsonEncode( SyncStreamDto( diff --git a/mobile/lib/platform/native_sync_api.g.dart b/mobile/lib/platform/native_sync_api.g.dart index ff6ca7bf9d..bd979af87b 100644 --- a/mobile/lib/platform/native_sync_api.g.dart +++ b/mobile/lib/platform/native_sync_api.g.dart @@ -635,6 +635,20 @@ class NativeSyncApi { _extractReplyValueOrThrow(pigeonVar_replyList, pigeonVar_channelName, isNullValid: true); } + Future cancelSync() async { + final pigeonVar_channelName = + 'dev.flutter.pigeon.immich_mobile.NativeSyncApi.cancelSync$pigeonVar_messageChannelSuffix'; + final pigeonVar_channel = BasicMessageChannel( + pigeonVar_channelName, + pigeonChannelCodec, + binaryMessenger: pigeonVar_binaryMessenger, + ); + final Future pigeonVar_sendFuture = pigeonVar_channel.send(null); + final pigeonVar_replyList = await pigeonVar_sendFuture as List?; + + _extractReplyValueOrThrow(pigeonVar_replyList, pigeonVar_channelName, isNullValid: true); + } + Future>> getTrashedAssets() async { final pigeonVar_channelName = 'dev.flutter.pigeon.immich_mobile.NativeSyncApi.getTrashedAssets$pigeonVar_messageChannelSuffix'; diff --git a/mobile/lib/providers/infrastructure/cancel.provider.dart b/mobile/lib/providers/infrastructure/cancel.provider.dart index 6851861e1a..9d4a6790f2 100644 --- a/mobile/lib/providers/infrastructure/cancel.provider.dart +++ b/mobile/lib/providers/infrastructure/cancel.provider.dart @@ -1,8 +1,9 @@ +import 'dart:async'; + import 'package:hooks_riverpod/hooks_riverpod.dart'; -/// Provider holding a boolean function that returns true when cancellation is requested. -/// A computation running in the isolate uses the function to implement cooperative cancellation. -final cancellationProvider = Provider( +/// Holds the isolate's cancellation signal. +final cancellationProvider = Provider>( // This will be overridden in the isolate's container. // Throwing ensures it's not used without an override. (ref) => throw UnimplementedError( diff --git a/mobile/lib/providers/infrastructure/sync.provider.dart b/mobile/lib/providers/infrastructure/sync.provider.dart index 75c8e09326..700b51f12d 100644 --- a/mobile/lib/providers/infrastructure/sync.provider.dart +++ b/mobile/lib/providers/infrastructure/sync.provider.dart @@ -26,7 +26,7 @@ final syncStreamServiceProvider = Provider( permissionRepository: ref.watch(permissionRepositoryProvider), syncMigrationRepository: ref.watch(syncMigrationRepositoryProvider), api: ref.watch(apiServiceProvider), - cancelChecker: ref.watch(cancellationProvider), + cancellation: ref.watch(cancellationProvider), ), ); @@ -42,6 +42,7 @@ final localSyncServiceProvider = Provider( assetMediaRepository: ref.watch(assetMediaRepositoryProvider), permissionRepository: ref.watch(permissionRepositoryProvider), nativeSyncApi: ref.watch(nativeSyncApiProvider), + cancellation: ref.watch(cancellationProvider), ), ); @@ -51,5 +52,6 @@ final hashServiceProvider = Provider( localAssetRepository: ref.watch(localAssetRepository), nativeSyncApi: ref.watch(nativeSyncApiProvider), trashedLocalAssetRepository: ref.watch(trashedLocalAssetRepository), + cancellation: ref.watch(cancellationProvider), ), ); diff --git a/mobile/lib/repositories/drift_album_api_repository.dart b/mobile/lib/repositories/drift_album_api_repository.dart index 445f5763a2..ee57352fb1 100644 --- a/mobile/lib/repositories/drift_album_api_repository.dart +++ b/mobile/lib/repositories/drift_album_api_repository.dart @@ -47,8 +47,14 @@ class DriftAlbumApiRepository extends ApiRepository { return (removed: removed, failed: failed); } - Future<({List added, List failed})> addAssets(String albumId, Iterable assetIds) async { - final response = await checkNull(_api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList()))); + Future<({List added, List failed})> addAssets( + String albumId, + Iterable assetIds, { + Future? abortTrigger, + }) async { + final response = await checkNull( + _api.addAssetsToAlbum(albumId, BulkIdsDto(ids: assetIds.toList()), abortTrigger: abortTrigger), + ); final List added = [], failed = []; for (final dto in response) { if (dto.success) { diff --git a/mobile/lib/utils/isolate.dart b/mobile/lib/utils/isolate.dart index 20b56d4875..ab3b19b78f 100644 --- a/mobile/lib/utils/isolate.dart +++ b/mobile/lib/utils/isolate.dart @@ -8,10 +8,9 @@ import 'package:immich_mobile/entities/store.entity.dart'; import 'package:immich_mobile/providers/infrastructure/cancel.provider.dart'; import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/utils/bootstrap.dart'; -import 'package:immich_mobile/utils/debug_print.dart'; import 'package:immich_mobile/wm_executor.dart'; import 'package:logging/logging.dart'; -import 'package:worker_manager/worker_manager.dart'; +import 'package:worker_manager/worker_manager.dart' show Cancelable; class InvalidIsolateUsageException implements Exception { const InvalidIsolateUsageException(); @@ -30,50 +29,27 @@ Cancelable runInIsolateGentle({ throw const InvalidIsolateUsageException(); } - return workerManagerPatch.executeGentle((cancelledChecker) async { - T? result; - await runZonedGuarded( - () async { - BackgroundIsolateBinaryMessenger.ensureInitialized(token); - DartPluginRegistrant.ensureInitialized(); + return workerManagerPatch.executeGentle((onCancel) async { + BackgroundIsolateBinaryMessenger.ensureInitialized(token); + DartPluginRegistrant.ensureInitialized(); - final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false); - final ref = ProviderContainer( - overrides: [ - cancellationProvider.overrideWithValue(cancelledChecker), - driftProvider.overrideWith(driftOverride(drift)), - ], - ); - - Logger log = Logger("IsolateLogger"); - - try { - result = await computation(ref); - } on CanceledError { - log.warning("Computation cancelled ${debugLabel == null ? '' : ' for $debugLabel'}"); - } catch (error, stack) { - log.severe("Error in runInIsolateGentle ${debugLabel == null ? '' : ' for $debugLabel'}", error, stack); - } finally { - try { - ref.dispose(); - - await Store.dispose(); - await LogService.I.dispose(); - await logDb.close(); - await drift.close(); - } catch (error, stack) { - dPrint(() => "Error closing resources in isolate: $error, $stack"); - } finally { - ref.dispose(); - // Delay to ensure all resources are released - await Future.delayed(const Duration(seconds: 2)); - } - } - }, - (error, stack) { - dPrint(() => "Error in isolate $debugLabel zone: $error, $stack"); - }, + final log = Logger("IsolateLogger"); + final (drift, logDb) = await Bootstrap.initDomain(shouldBufferLogs: false, listenStoreUpdates: false); + final ref = ProviderContainer( + overrides: [cancellationProvider.overrideWithValue(onCancel), driftProvider.overrideWith(driftOverride(drift))], ); - return result; + + try { + return await computation(ref); + } catch (error, stack) { + log.severe("Error in runInIsolateGentle${debugLabel == null ? '' : ' for $debugLabel'}", error, stack); + return null; + } finally { + ref.dispose(); + await Store.dispose(); + await LogService.I.dispose(); + await logDb.close(); + await drift.close(); + } }); } diff --git a/mobile/lib/utils/isolate_worker.dart b/mobile/lib/utils/isolate_worker.dart new file mode 100644 index 0000000000..60048c2c81 --- /dev/null +++ b/mobile/lib/utils/isolate_worker.dart @@ -0,0 +1,163 @@ +// Forked from worker_manager's `WorkerImpl` (src/worker/worker_io.dart): a +// `CancelRequest` completes the computation's [Completer] (so it can await +// cancellation and unwind) instead of flipping a polled flag, and [shutdown] +// lets the isolate drain and exit on its own rather than force-killing it. Only +// the gentle-with-cancellation path immich uses is kept. +// +// ignore_for_file: implementation_imports + +import 'dart:async'; +import 'dart:isolate'; + +import 'package:worker_manager/src/scheduling/task.dart'; +import 'package:worker_manager/src/worker/cancel_request.dart'; +import 'package:worker_manager/src/worker/result.dart'; + +/// A worker computation that receives a [Completer] which completes on +/// cancellation: await its future to react promptly, or read `isCompleted`. +typedef GentleExecution = FutureOr Function(Completer onCancel); + +class _Shutdown { + const _Shutdown(); +} + +class IsolateWorker { + IsolateWorker(); + + Isolate? _isolate; + RawReceivePort? _receivePort; + SendPort? _sendPort; + Completer? _sendPortReceived; + Completer? _result; + + String? taskId; + + bool get initialized => _sendPortReceived?.isCompleted ?? false; + + bool get initializing { + final sendPortReceived = _sendPortReceived; + return sendPortReceived != null && !sendPortReceived.isCompleted; + } + + Future initialize() async { + final sendPortReceived = _sendPortReceived = Completer(); + final receivePort = _receivePort = RawReceivePort(); + receivePort.handler = (Object message) { + if (message is SendPort) { + _sendPort = message; + sendPortReceived.complete(); + } else if (message is ResultSuccess) { + _result?.complete(message.value); + _afterTask(); + } else if (message is ResultError) { + _result?.completeError(message.error, message.stackTrace); + _afterTask(); + } + }; + _isolate = await Isolate.spawn(_isolateEntry, receivePort.sendPort, errorsAreFatal: false); + await sendPortReceived.future; + } + + Future work(Task task) async { + taskId = task.id; + final result = _result = Completer(); + _sendPort!.send(task.execution); + return await (result.future as Future); + } + + /// Cancels the current task without retiring the worker. + void cancelGentle() => _sendPort?.send(CancelRequest()); + + /// Cancels any in-flight task and awaits the isolate exiting on its own — no + /// force-kill, so `finally` blocks and native cleanup always run. + /// + /// Detaches the slot up front so a concurrent [initialize] can revive it + /// without colliding (revival installs fresh ports while this drains the ones + /// it captured locally). A revived worker is always idle, so the still-live + /// receive-port handler can't misroute a result. + Future shutdown() async { + final sendPortReceived = _sendPortReceived; + if (sendPortReceived != null && !sendPortReceived.isCompleted) { + await sendPortReceived.future; + } + + final isolate = _isolate; + final receivePort = _receivePort; + final sendPort = _sendPort; + if (isolate == null || receivePort == null || sendPort == null) { + return; + } + _isolate = null; + _sendPort = null; + _sendPortReceived = null; + // Not _result: an in-flight task still delivers it before exiting; nulling + // here would drop that and hang work()'s caller. + + final exited = Completer(); + final exitPort = RawReceivePort(); + exitPort.handler = (_) { + if (!exited.isCompleted) { + exited.complete(); + } + exitPort.close(); + }; + isolate.addOnExitListener(exitPort.sendPort); + sendPort.send(const _Shutdown()); + await exited.future; + receivePort.close(); + } + + void _afterTask() { + taskId = null; + _result = null; + } + + static void _isolateEntry(SendPort sendPort) { + final receivePort = RawReceivePort(); + sendPort.send(receivePort.sendPort); + // One task at a time, so a single completer suffices; null between tasks. + Completer? onCancel; + void cancel() { + if (onCancel?.isCompleted == false) { + onCancel!.complete(); + } + } + + var shuttingDown = false; + var running = false; + receivePort.handler = (message) async { + if (message is _Shutdown) { + shuttingDown = true; + cancel(); + if (!running) { + Isolate.exit(); + } + return; + } + if (message is CancelRequest) { + cancel(); + return; + } + final execution = message as GentleExecution; + onCancel = Completer(); + running = true; + Result result; + try { + result = ResultSuccess(await execution(onCancel!)); + } catch (error, stackTrace) { + result = ResultError(error, stackTrace); + } finally { + onCancel = null; + running = false; + } + if (shuttingDown) { + // An isolate that has used platform channels can't exit on its own (Flutter's BackgroundIsolateBinaryMessenger + // opens an undisposable port), so closing our ports isn't enough. Isolate.exit delivers the result as its final + // message and terminates. It's abrupt (skips pending finally/microtasks) but safe here: the computation and its + // `finally` are already done and there's no await before this, so nothing pending is skipped. + Isolate.exit(sendPort, result); + } + sendPort.send(result); + }; + } +} diff --git a/mobile/lib/wm_executor.dart b/mobile/lib/wm_executor.dart index 2eb31fe300..e873c5f76d 100644 --- a/mobile/lib/wm_executor.dart +++ b/mobile/lib/wm_executor.dart @@ -6,8 +6,8 @@ import 'dart:math'; import 'package:collection/collection.dart'; import 'package:flutter/foundation.dart'; +import 'package:immich_mobile/utils/isolate_worker.dart'; import 'package:worker_manager/src/number_of_processors/processors_io.dart'; -import 'package:worker_manager/src/worker/worker.dart'; import 'package:worker_manager/worker_manager.dart'; final workerManagerPatch = _Executor(); @@ -16,6 +16,13 @@ final workerManagerPatch = _Executor(); const _minId = -9007199254740992; const _maxId = 9007199254740992; +class _GentleTask extends Task implements Gentle { + @override + final GentleExecution execution; + + _GentleTask({required super.id, required super.completer, required super.workPriority, required this.execution}); +} + class Mixinable { late final itSelf = this as T; } @@ -51,13 +58,13 @@ mixin _ExecutorLogger on Mixinable<_Executor> { class _Executor extends Mixinable<_Executor> with _ExecutorLogger { final _queue = PriorityQueue(); - final _pool = []; + final _pool = []; var _nextTaskId = _minId; var _dynamicSpawning = false; var _isolatesCount = numberOfProcessors; @visibleForTesting - UnmodifiableListView get pool => UnmodifiableListView(_pool); + UnmodifiableListView get pool => UnmodifiableListView(_pool); @override Future init({int? isolatesCount, bool? dynamicSpawning}) async { @@ -80,117 +87,37 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger { @override Future dispose() async { _queue.clear(); - for (final worker in _pool) { - if (worker.initialized || worker.initializing) { - worker.kill(); - } - } + final shutdown = _pool.map((worker) => worker.shutdown()).toList(growable: false); _pool.clear(); + await Future.wait(shutdown); super.dispose(); } - Cancelable execute(Execute execution, {WorkPriority priority = WorkPriority.immediately}) { - return _createCancelable(execution: execution, priority: priority); - } - - Cancelable executeNow(ExecuteGentle execution) { - final task = TaskGentle( - id: "", - workPriority: WorkPriority.immediately, - execution: execution, - completer: Completer(), - ); - - Future run() async { - try { - final result = await execution(() => task.canceled); - task.complete(result, null, null); - } catch (error, st) { - task.complete(null, error, st); - } - } - - run(); - return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); - } - - Cancelable executeWithPort( - ExecuteWithPort execution, { - WorkPriority priority = WorkPriority.immediately, - required void Function(T value) onMessage, - }) { - return _createCancelable( - execution: execution, - priority: priority, - onMessage: (message) => onMessage(message as T), - ); - } - - Cancelable executeGentle(ExecuteGentle execution, {WorkPriority priority = WorkPriority.immediately}) { - return _createCancelable(execution: execution, priority: priority); - } - - Cancelable executeGentleWithPort( - ExecuteGentleWithPort execution, { - WorkPriority priority = WorkPriority.immediately, - required void Function(T value) onMessage, - }) { - return _createCancelable( - execution: execution, - priority: priority, - onMessage: (message) => onMessage(message as T), - ); - } - - void _createWorkers() { - for (var i = 0; i < _isolatesCount; i++) { - _pool.add(Worker()); - } - } - - Future _initializeWorkers() async { - await Future.wait(_pool.map((e) => e.initialize())); - } - - Cancelable _createCancelable({ - required Function execution, - WorkPriority priority = WorkPriority.immediately, - void Function(Object value)? onMessage, - }) { + /// Runs [execution] on a worker isolate; its [Completer] completes when the + /// returned [Cancelable] is cancelled. + Cancelable executeGentle(GentleExecution execution, {WorkPriority priority = WorkPriority.immediately}) { if (_nextTaskId + 1 == _maxId) { _nextTaskId = _minId; } final id = _nextTaskId.toString(); _nextTaskId++; - late final Task task; - final completer = Completer(); - if (execution is ExecuteWithPort) { - task = TaskWithPort( - id: id, - workPriority: priority, - execution: execution, - completer: completer, - onMessage: onMessage!, - ); - } else if (execution is ExecuteGentle) { - task = TaskGentle(id: id, workPriority: priority, execution: execution, completer: completer); - } else if (execution is ExecuteGentleWithPort) { - task = TaskGentleWithPort( - id: id, - workPriority: priority, - execution: execution, - completer: completer, - onMessage: onMessage!, - ); - } else if (execution is Execute) { - task = TaskRegular(id: id, workPriority: priority, execution: execution, completer: completer); - } + final task = _GentleTask(id: id, workPriority: priority, execution: execution, completer: Completer()); _queue.add(task); _schedule(); logTaskAdded(task.id); return Cancelable(completer: task.completer, onCancel: () => _cancel(task)); } + void _createWorkers() { + for (var i = 0; i < _isolatesCount; i++) { + _pool.add(IsolateWorker()); + } + } + + Future _initializeWorkers() async { + await Future.wait(_pool.map((e) => e.initialize())); + } + Future _ensureWorkersInitialized() async { if (_pool.isEmpty) { _createWorkers(); @@ -240,7 +167,9 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger { ) .whenComplete(() { if (_dynamicSpawning && _queue.isEmpty) { - availableWorker.kill(); + // Retire the idle worker; shutdown() nulls its fields so the husk + // stays pooled and is revived by initialize() if work arrives. + unawaited(availableWorker.shutdown()); } _schedule(); }); @@ -250,15 +179,8 @@ class _Executor extends Mixinable<_Executor> with _ExecutorLogger { void _cancel(Task task) { task.cancel(); _queue.remove(task); - final targetWorker = _pool.firstWhereOrNull((worker) => worker.taskId == task.id); - if (task is Gentle) { - targetWorker?.cancelGentle(); - } else { - targetWorker?.kill(); - if (!_dynamicSpawning) { - targetWorker?.initialize(); - } - } + // All tasks are gentle: signal cancellation; the worker unwinds on its own. + _pool.firstWhereOrNull((worker) => worker.taskId == task.id)?.cancelGentle(); super._cancel(task); } } diff --git a/mobile/pigeon/native_sync_api.dart b/mobile/pigeon/native_sync_api.dart index 9775973694..433b154cd1 100644 --- a/mobile/pigeon/native_sync_api.dart +++ b/mobile/pigeon/native_sync_api.dart @@ -105,25 +105,26 @@ class CloudIdResult { @HostApi() abstract class NativeSyncApi { + @async bool shouldFullSync(); - @TaskQueue(type: TaskQueueType.serialBackgroundThread) + @async SyncDelta getMediaChanges(); void checkpointSync(); void clearSyncCheckpoint(); - @TaskQueue(type: TaskQueueType.serialBackgroundThread) + @async List getAssetIdsForAlbum(String albumId); - @TaskQueue(type: TaskQueueType.serialBackgroundThread) + @async List getAlbums(); @TaskQueue(type: TaskQueueType.serialBackgroundThread) int getAssetsCountSince(String albumId, int timestamp); - @TaskQueue(type: TaskQueueType.serialBackgroundThread) + @async List getAssetsForAlbum(String albumId, {int? updatedTimeCond}); @async @@ -132,6 +133,8 @@ abstract class NativeSyncApi { void cancelHashing(); + void cancelSync(); + @TaskQueue(type: TaskQueueType.serialBackgroundThread) Map> getTrashedAssets(); diff --git a/mobile/test/domain/services/sync_stream_service_test.dart b/mobile/test/domain/services/sync_stream_service_test.dart index 80272d9310..e033229408 100644 --- a/mobile/test/domain/services/sync_stream_service_test.dart +++ b/mobile/test/domain/services/sync_stream_service_test.dart @@ -36,13 +36,6 @@ class _AbortCallbackWrapper { class _MockAbortCallbackWrapper extends Mock implements _AbortCallbackWrapper {} -class _CancellationWrapper { - const _CancellationWrapper(); - - bool call() => false; -} - -class _MockCancellationWrapper extends Mock implements _CancellationWrapper {} void main() { late SyncStreamService sut; @@ -94,9 +87,13 @@ void main() { when(() => mockAbortCallbackWrapper()).thenReturn(false); - when(() => mockSyncApiRepo.streamChanges(any(), serverVersion: any(named: 'serverVersion'))).thenAnswer(( - invocation, - ) async { + when( + () => mockSyncApiRepo.streamChanges( + any(), + serverVersion: any(named: 'serverVersion'), + abortSignal: any(named: 'abortSignal'), + ), + ).thenAnswer((invocation) async { handleEventsCallback = invocation.positionalArguments.first; }); @@ -105,6 +102,7 @@ void main() { any(), onReset: any(named: 'onReset'), serverVersion: any(named: 'serverVersion'), + abortSignal: any(named: 'abortSignal'), ), ).thenAnswer((invocation) async { handleEventsCallback = invocation.positionalArguments.first; @@ -233,8 +231,7 @@ void main() { }); test("aborts and stops processing if cancelled during iteration", () async { - final cancellationChecker = _MockCancellationWrapper(); - when(() => cancellationChecker()).thenReturn(false); + final cancellation = Completer(); sut = SyncStreamService( syncApiRepository: mockSyncApiRepo, @@ -243,7 +240,7 @@ void main() { trashedLocalAssetRepository: mockTrashedLocalAssetRepo, assetMediaRepository: mockAssetMediaRepo, permissionRepository: mockPermissionRepo, - cancelChecker: cancellationChecker.call, + cancellation: cancellation, api: mockApi, syncMigrationRepository: mockSyncMigrationRepo, ); @@ -252,7 +249,7 @@ void main() { final events = [SyncStreamStub.userDeleteV1, SyncStreamStub.userV1Admin, SyncStreamStub.partnerDeleteV1]; when(() => mockSyncStreamRepo.deleteUsersV1(any())).thenAnswer((_) async { - when(() => cancellationChecker()).thenReturn(true); + cancellation.complete(); }); await handleEventsCallback(events, mockAbortCallbackWrapper.call, mockResetCallbackWrapper.call); @@ -267,8 +264,7 @@ void main() { }); test("aborts and stops processing if cancelled before processing batch", () async { - final cancellationChecker = _MockCancellationWrapper(); - when(() => cancellationChecker()).thenReturn(false); + final cancellation = Completer(); final processingCompleter = Completer(); bool handler1Started = false; @@ -284,7 +280,7 @@ void main() { trashedLocalAssetRepository: mockTrashedLocalAssetRepo, assetMediaRepository: mockAssetMediaRepo, permissionRepository: mockPermissionRepo, - cancelChecker: cancellationChecker.call, + cancellation: cancellation, api: mockApi, syncMigrationRepository: mockSyncMigrationRepo, ); @@ -303,7 +299,7 @@ void main() { expect(handler1Started, isTrue); // Signal cancellation while handler 1 is waiting - when(() => cancellationChecker()).thenReturn(true); + cancellation.complete(); await pumpEventQueue(); processingCompleter.complete(); diff --git a/mobile/test/unit/utils/isolate_worker_test.dart b/mobile/test/unit/utils/isolate_worker_test.dart new file mode 100644 index 0000000000..5b79395f52 --- /dev/null +++ b/mobile/test/unit/utils/isolate_worker_test.dart @@ -0,0 +1,23 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:immich_mobile/wm_executor.dart'; + +void main() { + tearDown(workerManagerPatch.dispose); + + test('dispose() drains a cancelled task and delivers its result', () async { + await workerManagerPatch.init(isolatesCount: 1, dynamicSpawning: false); + + final task = workerManagerPatch.executeGentle((onCancel) async { + await onCancel.future; + return 'drained'; + }); + + await workerManagerPatch.dispose(); + + expect( + await task.timeout(const Duration(seconds: 5)), + 'drained', + reason: 'the worker must finish and return its result, not be killed mid-task', + ); + }); +}