fix: cross isolate drift watchers (#28862)

Co-authored-by: shenlong-tanwen <139912620+shalong-tanwen@users.noreply.github.com>
This commit is contained in:
shenlong
2026-06-05 22:45:41 +05:30
committed by GitHub
parent b6938614b2
commit 60683bd91e
6 changed files with 137 additions and 16 deletions
@@ -2,6 +2,8 @@ import 'dart:async';
import 'dart:io';
import 'package:drift/drift.dart';
// ignore: implementation_imports, invalid_use_of_internal_member
import 'package:drift/src/runtime/executor/stream_queries.dart' show StreamQueryStore;
import 'package:drift_sqlite_async/drift_sqlite_async.dart';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/infrastructure/entities/asset_edit.entity.dart';
@@ -36,6 +38,7 @@ import 'package:logging/logging.dart';
import 'package:path/path.dart' as p;
import 'package:path_provider/path_provider.dart';
import 'package:sqlite3/sqlite3.dart';
import 'package:sqlite3_connection_pool/sqlite3_connection_pool.dart';
import 'package:sqlite_async/native.dart';
import 'package:sqlite_async/sqlite_async.dart';
@@ -68,9 +71,19 @@ import 'package:sqlite_async/sqlite_async.dart';
include: {'package:immich_mobile/infrastructure/entities/merged_asset.drift'},
)
class Drift extends $Drift {
Drift(super.executor);
final SqliteConnectionPool? _updatePool;
Drift.sqlite(SqliteConnection db) : super(SqliteAsyncDriftConnection(db));
Drift(super.executor) : _updatePool = null;
Drift.sqlite(SqliteConnection db, SqliteConnectionPool updatePool)
: _updatePool = updatePool,
super(DatabaseConnection(SqliteAsyncQueryExecutor(db), streamQueries: _DriftPoolStreamQueries(updatePool)));
@override
Future<void> close() async {
await super.close();
_updatePool?.close();
}
Future<void> reset() async {
// https://github.com/simolus3/drift/commit/bd80a46264b6dd833ef4fd87fffc03f5a832ab41#diff-3f879e03b4a35779344ef16170b9353608dd9c42385f5402ec6035aac4dd8a04R76-R94
@@ -325,11 +338,51 @@ class DriftDatabaseRepository {
Future<T> transaction<T>(Future<T> Function() callback) => _db.transaction(callback);
}
// ignore: invalid_use_of_internal_member
final class _DriftPoolStreamQueries extends StreamQueryStore {
_DriftPoolStreamQueries(this._pool);
final SqliteConnectionPool _pool;
@override
void handleTableUpdates(Set<TableUpdate> updates) {
if (updates.isEmpty) {
return;
}
_pool.dispatchUpdateNotification([for (final update in updates) update.table]);
}
@override
Stream<Set<TableUpdate>> updatesForSync(TableUpdateQuery query) {
return _pool.updatedTables
.map((tables) => {for (final table in tables) TableUpdate(table)})
.where((updates) => updates.any(query.matches));
}
}
Future<SqliteConnection> openSqliteConnection({required String name}) async {
return _openImmichDatabase(await _databaseFile(name));
}
Future<(SqliteConnection, SqliteConnectionPool)> openSqliteConnectionWithUpdatePool({required String name}) async {
final file = await _databaseFile(name);
final db = _openImmichDatabase(file);
await db.initialize();
final updatePool = SqliteConnectionPool.open(
name: file.path,
openConnections: () => throw StateError('Pool for "$name" should already be open via sqlite_async'),
);
return (db, updatePool);
}
Future<File> _databaseFile(String name) async {
final dbFolder = await getApplicationDocumentsDirectory();
final file = File(p.join(dbFolder.path, '$name.sqlite'));
return File(p.join(dbFolder.path, '$name.sqlite'));
}
SqliteDatabase _openImmichDatabase(File file) {
return SqliteDatabase.withFactory(
_ImmichSqliteOpenFactory(
ImmichSqliteOpenFactory(
path: file.path,
sqliteOptions: const SqliteOptions(
journalMode: SqliteJournalMode.wal, // PRAGMA journal_mode (writer only)
@@ -340,8 +393,9 @@ Future<SqliteConnection> openSqliteConnection({required String name}) async {
);
}
final class _ImmichSqliteOpenFactory extends NativeSqliteOpenFactory {
_ImmichSqliteOpenFactory({required super.path, super.sqliteOptions});
@visibleForTesting
final class ImmichSqliteOpenFactory extends NativeSqliteOpenFactory {
ImmichSqliteOpenFactory({required super.path, super.sqliteOptions});
@override
List<String> pragmaStatements(SqliteOpenOptions options) {
@@ -1,13 +1,10 @@
import 'package:drift/drift.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/utils/background_sync.dart';
import 'package:immich_mobile/providers/backup/drift_backup.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/providers/sync_status.provider.dart';
final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) {
final syncStatusNotifier = ref.read(syncStatusProvider.notifier);
final db = ref.read(driftProvider);
final manager = BackgroundSyncManager(
onRemoteSyncStart: () {
@@ -23,14 +20,10 @@ final backgroundSyncProvider = Provider<BackgroundSyncManager>((ref) {
if (backupProvider.mounted) {
backupProvider.updateError(isSuccess == true ? BackupError.none : BackupError.syncFailed);
}
db.notifyUpdates({TableUpdate.onTable(db.remoteAssetEntity, kind: UpdateKind.update)});
},
onRemoteSyncError: syncStatusNotifier.errorRemoteSync,
onLocalSyncStart: syncStatusNotifier.startLocalSync,
onLocalSyncComplete: () {
syncStatusNotifier.completeLocalSync();
db.notifyUpdates({TableUpdate.onTable(db.localAssetEntity, kind: UpdateKind.update)});
},
onLocalSyncComplete: syncStatusNotifier.completeLocalSync,
onLocalSyncError: syncStatusNotifier.errorLocalSync,
onHashingStart: syncStatusNotifier.startHashJob,
onHashingComplete: syncStatusNotifier.completeHashJob,
+2 -1
View File
@@ -44,7 +44,8 @@ void configureFileDownloaderNotifications() {
abstract final class Bootstrap {
static Future<(Drift, DriftLogger)> initDomain({bool listenStoreUpdates = true, bool shouldBufferLogs = true}) async {
await configureSqliteCache();
final drift = Drift.sqlite(await openSqliteConnection(name: 'immich'));
final (db, updatePool) = await openSqliteConnectionWithUpdatePool(name: 'immich');
final drift = Drift.sqlite(db, updatePool);
final logDb = DriftLogger.sqlite(await openSqliteConnection(name: 'immich_logs'));
final DriftStoreRepository storeRepo = DriftStoreRepository(drift);
+1 -1
View File
@@ -1636,7 +1636,7 @@ packages:
source: hosted
version: "3.3.2"
sqlite3_connection_pool:
dependency: transitive
dependency: "direct main"
description:
name: sqlite3_connection_pool
sha256: "9d2b3b398b03c96743fd071521fc665be73c33c9cd5c56d87196baff8d8b4398"
+1
View File
@@ -68,6 +68,7 @@ dependencies:
stream_transform: ^2.1.1
sqlite3: ^3.3.2
sqlite_async: 0.14.2
sqlite3_connection_pool: ^0.2.6
thumbhash: 0.1.0+1
timezone: ^0.9.4
url_launcher: ^6.3.2
@@ -0,0 +1,72 @@
import 'dart:io';
import 'package:drift/drift.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:immich_mobile/infrastructure/entities/store.entity.drift.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:sqlite3_connection_pool/sqlite3_connection_pool.dart';
import 'package:sqlite_async/sqlite_async.dart';
void main() {
late Directory dir;
late String path;
Future<(Drift, SqliteDatabase, SqliteConnectionPool)> openDb() async {
final sqliteDb = SqliteDatabase.withFactory(ImmichSqliteOpenFactory(path: path));
await sqliteDb.initialize();
final pool = SqliteConnectionPool.open(
name: path,
openConnections: () => throw StateError('pool should already be open'),
);
return (Drift.sqlite(sqliteDb, pool), sqliteDb, pool);
}
setUp(() async {
dir = await Directory.systemTemp.createTemp('drift_pool_stream');
path = '${dir.path}/immich.sqlite';
});
tearDown(() async {
await dir.delete(recursive: true);
});
test('watch() in main isolate sees a write from a background isolate', () async {
final (db, dbConnection, _) = await openDb();
final initialRows = await db.select(db.storeEntity).get();
expect(initialRows, isEmpty);
addTearDown(() async {
await db.close();
await dbConnection.close();
});
final rowCounts = db.select(db.storeEntity).watch().map((rows) => rows.length);
final emissionFuture = expectLater(rowCounts, emitsThrough(1));
await compute(_writerTask, path);
await emissionFuture;
});
}
Future<void> _writerTask(String path) async {
final (db, dbConnection, sqlitePool) = await _openDb(path);
try {
await db.into(db.storeEntity).insert(const StoreEntityCompanion(id: Value(1), intValue: Value(42)));
} finally {
await db.close();
await dbConnection.close();
sqlitePool.close();
}
}
Future<(Drift, SqliteDatabase, SqliteConnectionPool)> _openDb(String path) async {
final sqliteDb = SqliteDatabase.withFactory(ImmichSqliteOpenFactory(path: path));
await sqliteDb.initialize();
final pool = SqliteConnectionPool.open(
name: path,
openConnections: () => throw StateError('pool should already be open'),
);
return (Drift.sqlite(sqliteDb, pool), sqliteDb, pool);
}