From d3680871ef5d4d83a85dad8fff00e7f4db1b30f0 Mon Sep 17 00:00:00 2001 From: bo0tzz Date: Wed, 10 Jun 2026 15:31:32 +0200 Subject: [PATCH] feat: warn if microservices worker is missing (#28869) * feat: warn if microservices worker is missing * fix: ci --- server/src/repositories/job.repository.ts | 41 ++++++++++++++++++- server/src/services/queue.service.ts | 7 ++++ .../test/repositories/job.repository.mock.ts | 2 + 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index 5bb5276db7..14a242e63b 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -5,7 +5,7 @@ import { JobsOptions, Queue, Worker } from 'bullmq'; import { setTimeout } from 'node:timers/promises'; import { JobConfig } from 'src/decorators'; import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto'; -import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum'; +import { ImmichWorker, JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum'; import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; @@ -19,10 +19,14 @@ type JobMapItem = { label: string; }; +const WORKER_WATCH_INTERVAL_MS = 30_000; + @Injectable() export class JobRepository { private workers: Partial> = {}; private handlers: Partial> = {}; + private workerWatcher?: ReturnType; + private microservicesPresent = true; constructor( private moduleRef: ModuleRef, @@ -90,11 +94,44 @@ export class JobRepository { this.workers[queueName] = new Worker( queueName, (job) => this.eventRepository.emit('JobRun', queueName, job as JobItem), - { ...bull.config, concurrency: 1 }, + { ...bull.config, concurrency: 1, name: ImmichWorker.Microservices }, ); } } + watchWorkers() { + this.workerWatcher ??= setInterval(() => void this.checkWorkers(), WORKER_WATCH_INTERVAL_MS); + } + + teardown() { + if (this.workerWatcher) { + clearInterval(this.workerWatcher); + this.workerWatcher = undefined; + } + } + + private async checkWorkers() { + let present: boolean; + try { + const suffix = `:w:${ImmichWorker.Microservices}`; + const workers = await this.getQueue(QueueName.BackgroundTask).getWorkers(); + present = workers.some((worker) => worker.rawname?.endsWith(suffix)); + } catch { + return; + } + + if (this.microservicesPresent !== present) { + if (present) { + this.logger.log('Microservices worker connected.'); + } else { + this.logger.warn( + 'No microservices worker is connected. Background jobs will not be processed until one is running.', + ); + } + } + this.microservicesPresent = present; + } + async run({ name, data }: JobItem) { const item = this.handlers[name as JobName]; if (!item) { diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index d11c9180b2..bf744aa01d 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -80,9 +80,16 @@ export class QueueService extends BaseService { this.jobRepository.setup(this.services); if (this.worker === ImmichWorker.Microservices) { this.jobRepository.startWorkers(); + } else if (this.worker === ImmichWorker.Api) { + this.jobRepository.watchWorkers(); } } + @OnEvent({ name: 'AppShutdown' }) + onShutdown() { + this.jobRepository.teardown(); + } + private updateConcurrency(config: SystemConfig) { this.logger.debug(`Updating queue concurrency settings`); for (const queueName of Object.values(QueueName)) { diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index 4fc5460c8a..e9fb895b09 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -6,6 +6,8 @@ export const newJobRepositoryMock = (): Mocked