From 4d7f7b80da8449248ccc8c66853008a08c3b0cdc Mon Sep 17 00:00:00 2001 From: izzy Date: Fri, 28 Nov 2025 17:44:37 +0000 Subject: [PATCH] feat: refresh missing & checksum --- e2e/src/utils.ts | 57 ++++++++- server/src/enum.ts | 4 +- .../src/repositories/asset-job.repository.ts | 9 +- server/src/services/integrity.service.ts | 118 ++++++++++++++++-- server/src/types.ts | 10 +- 5 files changed, 185 insertions(+), 13 deletions(-) diff --git a/e2e/src/utils.ts b/e2e/src/utils.ts index 15bb112cd8..247df93955 100644 --- a/e2e/src/utils.ts +++ b/e2e/src/utils.ts @@ -6,6 +6,7 @@ import { CheckExistingAssetsDto, CreateAlbumDto, CreateLibraryDto, + JobCreateDto, MaintenanceAction, MetadataSearchDto, Permission, @@ -21,6 +22,7 @@ import { checkExistingAssets, createAlbum, createApiKey, + createJob, createLibrary, createPartner, createPerson, @@ -52,9 +54,12 @@ import { import { BrowserContext } from '@playwright/test'; import { exec, spawn } from 'node:child_process'; import { createHash } from 'node:crypto'; -import { existsSync, mkdirSync, renameSync, rmSync, writeFileSync } from 'node:fs'; +import { createWriteStream, existsSync, mkdirSync, renameSync, rmSync, writeFileSync } from 'node:fs'; +import { mkdtemp } from 'node:fs/promises'; import { tmpdir } from 'node:os'; -import { dirname, resolve } from 'node:path'; +import { dirname, join, resolve } from 'node:path'; +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; import { setTimeout as setAsyncTimeout } from 'node:timers/promises'; import { promisify } from 'node:util'; import pg from 'pg'; @@ -171,6 +176,7 @@ export const utils = { 'user', 'system_metadata', 'tag', + 'integrity_report', ]; const sql: string[] = []; @@ -481,6 +487,9 @@ export const utils = { tagAssets: (accessToken: string, tagId: string, assetIds: string[]) => tagAssets({ id: tagId, bulkIdsDto: { ids: assetIds } }, { headers: asBearerAuth(accessToken) }), + createJob: async (accessToken: string, jobCreateDto: JobCreateDto) => + createJob({ jobCreateDto }, { headers: asBearerAuth(accessToken) }), + queueCommand: async (accessToken: string, name: QueueName, queueCommandDto: QueueCommandDto) => runQueueCommandLegacy({ name, queueCommandDto }, { headers: asBearerAuth(accessToken) }), @@ -559,6 +568,50 @@ export const utils = { mkdirSync(`${testAssetDir}/temp`, { recursive: true }); }, + putFile(source: string, dest: string) { + return executeCommand('docker', ['cp', source, `immich-e2e-server:${dest}`]).promise; + }, + + async putTextFile(contents: string, dest: string) { + const dir = await mkdtemp(join(tmpdir(), 'test-')); + const fn = join(dir, 'file'); + await pipeline(Readable.from(contents), createWriteStream(fn)); + return executeCommand('docker', ['cp', fn, `immich-e2e-server:${dest}`]).promise; + }, + + async move(source: string, dest: string) { + return executeCommand('docker', ['exec', 'immich-e2e-server', 'mv', source, dest]).promise; + }, + + async copyFolder(source: string, dest: string) { + return executeCommand('docker', ['exec', 'immich-e2e-server', 'cp', '-r', source, dest]).promise; + }, + + async deleteFile(path: string) { + return executeCommand('docker', ['exec', 'immich-e2e-server', 'rm', path]).promise; + }, + + async deleteFolder(path: string) { + return executeCommand('docker', ['exec', 'immich-e2e-server', 'rm', '-r', path]).promise; + }, + + async truncateFolder(path: string) { + return executeCommand('docker', [ + 'exec', + 'immich-e2e-server', + 'find', + path, + '-type', + 'f', + '-exec', + 'truncate', + '-s', + '1', + '{}', + '\;', + ]).promise; + }, + resetAdminConfig: async (accessToken: string) => { const defaultConfig = await getConfigDefaults({ headers: asBearerAuth(accessToken) }); await updateConfig({ systemConfigDto: defaultConfig }, { headers: asBearerAuth(accessToken) }); diff --git a/server/src/enum.ts b/server/src/enum.ts index 2b51b294ff..35f724f985 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -654,10 +654,12 @@ export enum JobName { // Integrity IntegrityOrphanedFilesQueueAll = 'IntegrityOrphanedFilesQueueAll', IntegrityOrphanedFiles = 'IntegrityOrphanedFiles', - IntegrityOrphanedCheckReports = 'IntegrityOrphanedCheckReports', + IntegrityOrphanedFilesRefresh = 'IntegrityOrphanedRefresh', IntegrityMissingFilesQueueAll = 'IntegrityMissingFilesQueueAll', IntegrityMissingFiles = 'IntegrityMissingFiles', + IntegrityMissingFilesRefresh = 'IntegrityMissingFilesRefresh', IntegrityChecksumFiles = 'IntegrityChecksumFiles', + IntegrityChecksumFilesRefresh = 'IntegrityChecksumFilesRefresh', } export enum QueueCommand { diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index 8d5292ca0f..7c277e085d 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -321,7 +321,14 @@ export class AssetJobRepository { @GenerateSql({ params: [DummyValue.STRING], stream: true }) streamIntegrityReports(type: IntegrityReportType) { - return this.db.selectFrom('integrity_report').select(['id as reportId', 'path']).where('type', '=', type).stream(); + return this.db + .selectFrom('integrity_report') + .select(['integrity_report.id as reportId', 'integrity_report.path']) + .where('integrity_report.type', '=', type) + .$if(type === IntegrityReportType.ChecksumFail, (eb) => + eb.leftJoin('asset', 'integrity_report.path', 'asset.originalPath').select('asset.checksum'), + ) + .stream(); } @GenerateSql({ params: [], stream: true }) diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts index 06639c9395..7625b5935a 100644 --- a/server/src/services/integrity.service.ts +++ b/server/src/services/integrity.service.ts @@ -19,7 +19,12 @@ import { } from 'src/enum'; import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; -import { IIntegrityJob, IIntegrityOrphanedFilesJob, IIntegrityPathWithReportJob } from 'src/types'; +import { + IIntegrityJob, + IIntegrityOrphanedFilesJob, + IIntegrityPathWithChecksumJob, + IIntegrityPathWithReportJob, +} from 'src/types'; import { handlePromiseError } from 'src/utils/misc'; async function* chunk(generator: AsyncIterableIterator, n: number) { @@ -138,7 +143,7 @@ export class IntegrityService extends BaseService { let total = 0; for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ - name: JobName.IntegrityOrphanedCheckReports, + name: JobName.IntegrityOrphanedFilesRefresh, data: { items: batchReports, }, @@ -230,8 +235,8 @@ export class IntegrityService extends BaseService { return JobStatus.Success; } - @OnJob({ name: JobName.IntegrityOrphanedCheckReports, queue: QueueName.BackgroundTask }) - async handleOrphanedCheckReports({ items: paths }: IIntegrityPathWithReportJob): Promise { + @OnJob({ name: JobName.IntegrityOrphanedFilesRefresh, queue: QueueName.BackgroundTask }) + async handleOrphanedRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise { this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`); const results = await Promise.all( @@ -255,7 +260,23 @@ export class IntegrityService extends BaseService { @OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask }) async handleMissingFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise { if (refreshOnly) { - // TODO + this.logger.log(`Checking for out of date missing file reports...`); + + const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.MissingFile); + + let total = 0; + for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { + await this.jobRepository.queue({ + name: JobName.IntegrityMissingFilesRefresh, + data: { + items: batchReports, + }, + }); + + total += batchReports.length; + this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`); + } + this.logger.log('Refresh complete.'); return JobStatus.Success; } @@ -314,10 +335,48 @@ export class IntegrityService extends BaseService { return JobStatus.Success; } + @OnJob({ name: JobName.IntegrityMissingFilesRefresh, queue: QueueName.BackgroundTask }) + async handleMissingRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise { + this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`); + + const results = await Promise.all( + paths.map(({ reportId, path }) => + stat(path) + .then(() => reportId) + .catch(() => void 0), + ), + ); + + const reportIds = results.filter(Boolean) as string[]; + + if (reportIds.length > 0) { + await this.integrityReportRepository.deleteByIds(reportIds); + } + + this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`); + return JobStatus.Success; + } + @OnJob({ name: JobName.IntegrityChecksumFiles, queue: QueueName.BackgroundTask }) async handleChecksumFiles({ refreshOnly }: IIntegrityJob = {}): Promise { if (refreshOnly) { - // TODO + this.logger.log(`Checking for out of date checksum file reports...`); + + const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.ChecksumFail); + + let total = 0; + for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { + await this.jobRepository.queue({ + name: JobName.IntegrityChecksumFilesRefresh, + data: { + items: batchReports, + }, + }); + + total += batchReports.length; + this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`); + } + this.logger.log('Refresh complete.'); return JobStatus.Success; } @@ -358,6 +417,8 @@ export class IntegrityService extends BaseService { startMarker = undefined; for await (const { originalPath, checksum, createdAt, reportId } of assets) { + processed++; + try { const hash = createHash('sha1'); @@ -394,7 +455,6 @@ export class IntegrityService extends BaseService { }); } - processed++; if (processed % 100 === 0) { printStats(); } @@ -421,4 +481,48 @@ export class IntegrityService extends BaseService { return JobStatus.Success; } + + @OnJob({ name: JobName.IntegrityChecksumFilesRefresh, queue: QueueName.BackgroundTask }) + async handleChecksumRefresh({ items: paths }: IIntegrityPathWithChecksumJob): Promise { + this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`); + + const results = await Promise.all( + paths.map(async ({ reportId, path, checksum }) => { + console.info('chekc', reportId, path, checksum); + if (!checksum) return reportId; + + try { + const hash = createHash('sha1'); + + await pipeline([ + createReadStream(path), + new Writable({ + write(chunk, _encoding, callback) { + hash.update(chunk); + callback(); + }, + }), + ]); + + console.info('compare', checksum, hash.digest()); + if (checksum.equals(hash.digest())) { + return reportId; + } + } catch (error) { + if ((error as { code?: string }).code === 'ENOENT') { + return reportId; + } + } + }), + ); + + const reportIds = results.filter(Boolean) as string[]; + + if (reportIds.length > 0) { + await this.integrityReportRepository.deleteByIds(reportIds); + } + + this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`); + return JobStatus.Success; + } } diff --git a/server/src/types.ts b/server/src/types.ts index 5110b83836..779b14bba4 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -295,6 +295,10 @@ export interface IIntegrityPathWithReportJob { items: { path: string; reportId: string | null }[]; } +export interface IIntegrityPathWithChecksumJob { + items: { path: string; reportId: string | null; checksum?: Buffer | null }[]; +} + export interface JobCounts { active: number; completed: number; @@ -409,10 +413,12 @@ export type JobItem = // Integrity | { name: JobName.IntegrityOrphanedFilesQueueAll; data?: IIntegrityJob } | { name: JobName.IntegrityOrphanedFiles; data: IIntegrityOrphanedFilesJob } - | { name: JobName.IntegrityOrphanedCheckReports; data: IIntegrityPathWithReportJob } + | { name: JobName.IntegrityOrphanedFilesRefresh; data: IIntegrityPathWithReportJob } | { name: JobName.IntegrityMissingFilesQueueAll; data?: IIntegrityJob } | { name: JobName.IntegrityMissingFiles; data: IIntegrityPathWithReportJob } - | { name: JobName.IntegrityChecksumFiles; data?: IIntegrityJob }; + | { name: JobName.IntegrityMissingFilesRefresh; data: IIntegrityPathWithReportJob } + | { name: JobName.IntegrityChecksumFiles; data?: IIntegrityJob } + | { name: JobName.IntegrityChecksumFilesRefresh; data?: IIntegrityPathWithChecksumJob }; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];