From 60683bd91e51b41137f37b8a9d1710827106db78 Mon Sep 17 00:00:00 2001 From: shenlong <139912620+shenlong-tanwen@users.noreply.github.com> Date: Fri, 5 Jun 2026 22:45:41 +0530 Subject: [PATCH] fix: cross isolate drift watchers (#28862) Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com> --- .../repositories/db.repository.dart | 66 +++++++++++++++-- .../providers/background_sync.provider.dart | 9 +-- mobile/lib/utils/bootstrap.dart | 3 +- mobile/pubspec.lock | 2 +- mobile/pubspec.yaml | 1 + .../test/infrastructure/db_watchers_test.dart | 72 +++++++++++++++++++ 6 files changed, 137 insertions(+), 16 deletions(-) create mode 100644 mobile/test/infrastructure/db_watchers_test.dart diff --git a/mobile/lib/infrastructure/repositories/db.repository.dart b/mobile/lib/infrastructure/repositories/db.repository.dart index b6089b3cb2..5571426665 100644 --- a/mobile/lib/infrastructure/repositories/db.repository.dart +++ b/mobile/lib/infrastructure/repositories/db.repository.dart @@ -2,6 +2,8 @@ import 'dart:async'; import 'dart:io'; import 'package:drift/drift.dart'; +// ignore: implementation_imports, invalid_use_of_internal_member +import 'package:drift/src/runtime/executor/stream_queries.dart' show StreamQueryStore; import 'package:drift_sqlite_async/drift_sqlite_async.dart'; import 'package:flutter/foundation.dart'; import 'package:immich_mobile/infrastructure/entities/asset_edit.entity.dart'; @@ -36,6 +38,7 @@ import 'package:logging/logging.dart'; import 'package:path/path.dart' as p; import 'package:path_provider/path_provider.dart'; import 'package:sqlite3/sqlite3.dart'; +import 'package:sqlite3_connection_pool/sqlite3_connection_pool.dart'; import 'package:sqlite_async/native.dart'; import 'package:sqlite_async/sqlite_async.dart'; @@ -68,9 +71,19 @@ import 'package:sqlite_async/sqlite_async.dart'; include: {'package:immich_mobile/infrastructure/entities/merged_asset.drift'}, ) class Drift extends $Drift { - Drift(super.executor); + final SqliteConnectionPool? _updatePool; - Drift.sqlite(SqliteConnection db) : super(SqliteAsyncDriftConnection(db)); + Drift(super.executor) : _updatePool = null; + + Drift.sqlite(SqliteConnection db, SqliteConnectionPool updatePool) + : _updatePool = updatePool, + super(DatabaseConnection(SqliteAsyncQueryExecutor(db), streamQueries: _DriftPoolStreamQueries(updatePool))); + + @override + Future close() async { + await super.close(); + _updatePool?.close(); + } Future reset() async { // https://github.com/simolus3/drift/commit/bd80a46264b6dd833ef4fd87fffc03f5a832ab41#diff-3f879e03b4a35779344ef16170b9353608dd9c42385f5402ec6035aac4dd8a04R76-R94 @@ -325,11 +338,51 @@ class DriftDatabaseRepository { Future transaction(Future Function() callback) => _db.transaction(callback); } +// ignore: invalid_use_of_internal_member +final class _DriftPoolStreamQueries extends StreamQueryStore { + _DriftPoolStreamQueries(this._pool); + + final SqliteConnectionPool _pool; + + @override + void handleTableUpdates(Set updates) { + if (updates.isEmpty) { + return; + } + _pool.dispatchUpdateNotification([for (final update in updates) update.table]); + } + + @override + Stream> updatesForSync(TableUpdateQuery query) { + return _pool.updatedTables + .map((tables) => {for (final table in tables) TableUpdate(table)}) + .where((updates) => updates.any(query.matches)); + } +} + Future openSqliteConnection({required String name}) async { + return _openImmichDatabase(await _databaseFile(name)); +} + +Future<(SqliteConnection, SqliteConnectionPool)> openSqliteConnectionWithUpdatePool({required String name}) async { + final file = await _databaseFile(name); + final db = _openImmichDatabase(file); + await db.initialize(); + final updatePool = SqliteConnectionPool.open( + name: file.path, + openConnections: () => throw StateError('Pool for "$name" should already be open via sqlite_async'), + ); + return (db, updatePool); +} + +Future _databaseFile(String name) async { final dbFolder = await getApplicationDocumentsDirectory(); - final file = File(p.join(dbFolder.path, '$name.sqlite')); + return File(p.join(dbFolder.path, '$name.sqlite')); +} + +SqliteDatabase _openImmichDatabase(File file) { return SqliteDatabase.withFactory( - _ImmichSqliteOpenFactory( + ImmichSqliteOpenFactory( path: file.path, sqliteOptions: const SqliteOptions( journalMode: SqliteJournalMode.wal, // PRAGMA journal_mode (writer only) @@ -340,8 +393,9 @@ Future openSqliteConnection({required String name}) async { ); } -final class _ImmichSqliteOpenFactory extends NativeSqliteOpenFactory { - _ImmichSqliteOpenFactory({required super.path, super.sqliteOptions}); +@visibleForTesting +final class ImmichSqliteOpenFactory extends NativeSqliteOpenFactory { + ImmichSqliteOpenFactory({required super.path, super.sqliteOptions}); @override List pragmaStatements(SqliteOpenOptions options) { diff --git a/mobile/lib/providers/background_sync.provider.dart b/mobile/lib/providers/background_sync.provider.dart index ce8991f45b..37b3145eb4 100644 --- a/mobile/lib/providers/background_sync.provider.dart +++ b/mobile/lib/providers/background_sync.provider.dart @@ -1,13 +1,10 @@ -import 'package:drift/drift.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:immich_mobile/domain/utils/background_sync.dart'; import 'package:immich_mobile/providers/backup/drift_backup.provider.dart'; -import 'package:immich_mobile/providers/infrastructure/db.provider.dart'; import 'package:immich_mobile/providers/sync_status.provider.dart'; final backgroundSyncProvider = Provider((ref) { final syncStatusNotifier = ref.read(syncStatusProvider.notifier); - final db = ref.read(driftProvider); final manager = BackgroundSyncManager( onRemoteSyncStart: () { @@ -23,14 +20,10 @@ final backgroundSyncProvider = Provider((ref) { if (backupProvider.mounted) { backupProvider.updateError(isSuccess == true ? BackupError.none : BackupError.syncFailed); } - db.notifyUpdates({TableUpdate.onTable(db.remoteAssetEntity, kind: UpdateKind.update)}); }, onRemoteSyncError: syncStatusNotifier.errorRemoteSync, onLocalSyncStart: syncStatusNotifier.startLocalSync, - onLocalSyncComplete: () { - syncStatusNotifier.completeLocalSync(); - db.notifyUpdates({TableUpdate.onTable(db.localAssetEntity, kind: UpdateKind.update)}); - }, + onLocalSyncComplete: syncStatusNotifier.completeLocalSync, onLocalSyncError: syncStatusNotifier.errorLocalSync, onHashingStart: syncStatusNotifier.startHashJob, onHashingComplete: syncStatusNotifier.completeHashJob, diff --git a/mobile/lib/utils/bootstrap.dart b/mobile/lib/utils/bootstrap.dart index 1c27d7ea93..119df5b804 100644 --- a/mobile/lib/utils/bootstrap.dart +++ b/mobile/lib/utils/bootstrap.dart @@ -44,7 +44,8 @@ void configureFileDownloaderNotifications() { abstract final class Bootstrap { static Future<(Drift, DriftLogger)> initDomain({bool listenStoreUpdates = true, bool shouldBufferLogs = true}) async { await configureSqliteCache(); - final drift = Drift.sqlite(await openSqliteConnection(name: 'immich')); + final (db, updatePool) = await openSqliteConnectionWithUpdatePool(name: 'immich'); + final drift = Drift.sqlite(db, updatePool); final logDb = DriftLogger.sqlite(await openSqliteConnection(name: 'immich_logs')); final DriftStoreRepository storeRepo = DriftStoreRepository(drift); diff --git a/mobile/pubspec.lock b/mobile/pubspec.lock index 9323cedf11..63341a6c2a 100644 --- a/mobile/pubspec.lock +++ b/mobile/pubspec.lock @@ -1636,7 +1636,7 @@ packages: source: hosted version: "3.3.2" sqlite3_connection_pool: - dependency: transitive + dependency: "direct main" description: name: sqlite3_connection_pool sha256: "9d2b3b398b03c96743fd071521fc665be73c33c9cd5c56d87196baff8d8b4398" diff --git a/mobile/pubspec.yaml b/mobile/pubspec.yaml index 67b9f2a17e..b78112d533 100644 --- a/mobile/pubspec.yaml +++ b/mobile/pubspec.yaml @@ -68,6 +68,7 @@ dependencies: stream_transform: ^2.1.1 sqlite3: ^3.3.2 sqlite_async: 0.14.2 + sqlite3_connection_pool: ^0.2.6 thumbhash: 0.1.0+1 timezone: ^0.9.4 url_launcher: ^6.3.2 diff --git a/mobile/test/infrastructure/db_watchers_test.dart b/mobile/test/infrastructure/db_watchers_test.dart new file mode 100644 index 0000000000..2975df226c --- /dev/null +++ b/mobile/test/infrastructure/db_watchers_test.dart @@ -0,0 +1,72 @@ +import 'dart:io'; + +import 'package:drift/drift.dart'; +import 'package:flutter/foundation.dart'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:immich_mobile/infrastructure/entities/store.entity.drift.dart'; +import 'package:immich_mobile/infrastructure/repositories/db.repository.dart'; +import 'package:sqlite3_connection_pool/sqlite3_connection_pool.dart'; +import 'package:sqlite_async/sqlite_async.dart'; + +void main() { + late Directory dir; + late String path; + + Future<(Drift, SqliteDatabase, SqliteConnectionPool)> openDb() async { + final sqliteDb = SqliteDatabase.withFactory(ImmichSqliteOpenFactory(path: path)); + await sqliteDb.initialize(); + final pool = SqliteConnectionPool.open( + name: path, + openConnections: () => throw StateError('pool should already be open'), + ); + return (Drift.sqlite(sqliteDb, pool), sqliteDb, pool); + } + + setUp(() async { + dir = await Directory.systemTemp.createTemp('drift_pool_stream'); + path = '${dir.path}/immich.sqlite'; + }); + + tearDown(() async { + await dir.delete(recursive: true); + }); + + test('watch() in main isolate sees a write from a background isolate', () async { + final (db, dbConnection, _) = await openDb(); + final initialRows = await db.select(db.storeEntity).get(); + expect(initialRows, isEmpty); + + addTearDown(() async { + await db.close(); + await dbConnection.close(); + }); + + final rowCounts = db.select(db.storeEntity).watch().map((rows) => rows.length); + final emissionFuture = expectLater(rowCounts, emitsThrough(1)); + + await compute(_writerTask, path); + await emissionFuture; + }); +} + +Future _writerTask(String path) async { + final (db, dbConnection, sqlitePool) = await _openDb(path); + + try { + await db.into(db.storeEntity).insert(const StoreEntityCompanion(id: Value(1), intValue: Value(42))); + } finally { + await db.close(); + await dbConnection.close(); + sqlitePool.close(); + } +} + +Future<(Drift, SqliteDatabase, SqliteConnectionPool)> _openDb(String path) async { + final sqliteDb = SqliteDatabase.withFactory(ImmichSqliteOpenFactory(path: path)); + await sqliteDb.initialize(); + final pool = SqliteConnectionPool.open( + name: path, + openConnections: () => throw StateError('pool should already be open'), + ); + return (Drift.sqlite(sqliteDb, pool), sqliteDb, pool); +}