feat: warn if microservices worker is missing (#28869)

* feat: warn if microservices worker is missing

* fix: ci
This commit is contained in:
bo0tzz
2026-06-10 15:31:32 +02:00
committed by GitHub
parent b9b1cc2f65
commit d3680871ef
3 changed files with 48 additions and 2 deletions
+39 -2
View File
@@ -5,7 +5,7 @@ import { JobsOptions, Queue, Worker } from 'bullmq';
import { setTimeout } from 'node:timers/promises';
import { JobConfig } from 'src/decorators';
import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto';
import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum';
import { ImmichWorker, JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum';
import { ConfigRepository } from 'src/repositories/config.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
@@ -19,10 +19,14 @@ type JobMapItem = {
label: string;
};
const WORKER_WATCH_INTERVAL_MS = 30_000;
@Injectable()
export class JobRepository {
private workers: Partial<Record<QueueName, Worker>> = {};
private handlers: Partial<Record<JobName, JobMapItem>> = {};
private workerWatcher?: ReturnType<typeof setInterval>;
private microservicesPresent = true;
constructor(
private moduleRef: ModuleRef,
@@ -90,11 +94,44 @@ export class JobRepository {
this.workers[queueName] = new Worker(
queueName,
(job) => this.eventRepository.emit('JobRun', queueName, job as JobItem),
{ ...bull.config, concurrency: 1 },
{ ...bull.config, concurrency: 1, name: ImmichWorker.Microservices },
);
}
}
watchWorkers() {
this.workerWatcher ??= setInterval(() => void this.checkWorkers(), WORKER_WATCH_INTERVAL_MS);
}
teardown() {
if (this.workerWatcher) {
clearInterval(this.workerWatcher);
this.workerWatcher = undefined;
}
}
private async checkWorkers() {
let present: boolean;
try {
const suffix = `:w:${ImmichWorker.Microservices}`;
const workers = await this.getQueue(QueueName.BackgroundTask).getWorkers();
present = workers.some((worker) => worker.rawname?.endsWith(suffix));
} catch {
return;
}
if (this.microservicesPresent !== present) {
if (present) {
this.logger.log('Microservices worker connected.');
} else {
this.logger.warn(
'No microservices worker is connected. Background jobs will not be processed until one is running.',
);
}
}
this.microservicesPresent = present;
}
async run({ name, data }: JobItem) {
const item = this.handlers[name as JobName];
if (!item) {
+7
View File
@@ -80,9 +80,16 @@ export class QueueService extends BaseService {
this.jobRepository.setup(this.services);
if (this.worker === ImmichWorker.Microservices) {
this.jobRepository.startWorkers();
} else if (this.worker === ImmichWorker.Api) {
this.jobRepository.watchWorkers();
}
}
@OnEvent({ name: 'AppShutdown' })
onShutdown() {
this.jobRepository.teardown();
}
private updateConcurrency(config: SystemConfig) {
this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
@@ -6,6 +6,8 @@ export const newJobRepositoryMock = (): Mocked<RepositoryInterface<JobRepository
return {
setup: vitest.fn(),
startWorkers: vitest.fn(),
watchWorkers: vitest.fn(),
teardown: vitest.fn(),
run: vitest.fn(),
setConcurrency: vitest.fn(),
empty: vitest.fn(),