support conventional uploads

This commit is contained in:
mertalev
2025-10-12 19:18:52 -04:00
parent 3c72409712
commit 818bd51036
6 changed files with 190 additions and 52 deletions

View File

@@ -99,6 +99,9 @@ describe('/upload', () => {
visibility: 'timeline',
}),
);
const downloaded = await utils.downloadAsset(admin.accessToken, body.id);
expect(downloaded.size).toBe(content.byteLength);
expect(content.compare(await downloaded.bytes())).toBe(0);
});
it('should create a complete upload with Upload-Incomplete: ?0 if version is 3', async () => {
@@ -136,6 +139,99 @@ describe('/upload', () => {
);
});
it('should support conventional upload', async () => {
const content = randomBytes(1024);
const checksum = createHash('sha1').update(content).digest('base64');
const { status, headers, body } = await request(app)
.post('/upload')
.set('Authorization', `Bearer ${user.accessToken}`)
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', `sha=:${checksum}:`)
.set('Content-Type', 'image/jpeg')
.set('Upload-Length', '1024')
.send(content);
expect(status).toBe(200);
expect(headers['upload-complete']).toBeUndefined();
expect(headers['upload-incomplete']).toBeUndefined();
expect(headers['location']).toBeUndefined();
expect(body).toEqual(expect.objectContaining({ id: expect.any(String) }));
const asset = await utils.getAssetInfo(user.accessToken, body.id);
expect(asset).toEqual(
expect.objectContaining({
id: body.id,
checksum,
ownerId: user.userId,
exifInfo: expect.objectContaining({ fileSizeInByte: content.byteLength }),
originalFileName: 'test-image.jpg',
deviceAssetId: 'rufh',
deviceId: 'test',
isFavorite: true,
visibility: 'timeline',
}),
);
});
it('overwrite partial duplicate if conventional upload', { timeout: 1000 }, async () => {
const content = randomBytes(10240);
const checksum = createHash('sha1').update(content).digest('base64');
// simulate interrupted upload by starting a request and not completing it
const req = httpRequest({
hostname: 'localhost',
port: 2285,
path: '/upload',
method: 'POST',
headers: {
Authorization: `Bearer ${user.accessToken}`,
'X-Immich-Asset-Data': assetData,
'Repr-Digest': `sha=:${checksum}:`,
'Upload-Length': '1024',
'Content-Length': '1024',
'Content-Type': 'image/jpeg',
},
});
req.write(content.subarray(0, 512));
await setTimeout(50);
const { status, headers, body } = await request(app)
.post('/upload')
.set('Authorization', `Bearer ${user.accessToken}`)
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', `sha=:${checksum}:`)
.set('Content-Type', 'image/jpeg')
.set('Upload-Length', '10240')
.send(content);
expect(status).toBe(200);
expect(headers['upload-complete']).toBeUndefined();
expect(headers['upload-incomplete']).toBeUndefined();
expect(headers['location']).toBeUndefined();
expect(body).toEqual(expect.objectContaining({ id: expect.any(String) }));
const asset = await utils.getAssetInfo(user.accessToken, body.id);
expect(asset).toEqual(
expect.objectContaining({
id: body.id,
checksum,
ownerId: user.userId,
exifInfo: expect.objectContaining({ fileSizeInByte: content.byteLength }),
originalFileName: 'test-image.jpg',
deviceAssetId: 'rufh',
deviceId: 'test',
isFavorite: true,
visibility: 'timeline',
}),
);
const downloaded = await utils.downloadAsset(user.accessToken, body.id);
expect(downloaded.size).toBe(content.byteLength);
expect(content.compare(await downloaded.bytes())).toBe(0);
});
it('should reject when Upload-Complete: ?1 with mismatching Content-Length and Upload-Length', async () => {
const content = randomBytes(1000);
@@ -789,6 +885,12 @@ describe('/upload', () => {
.send(content.subarray(2000));
expect(secondResponse.status).toBe(200);
expect(secondResponse.headers['upload-complete']).toBe('?1');
expect(secondResponse.body).toEqual(expect.objectContaining({ id: expect.any(String) }));
const downloaded = await utils.downloadAsset(user.accessToken, secondResponse.body.id);
expect(downloaded.size).toBe(content.byteLength);
expect(content.compare(await downloaded.bytes())).toBe(0);
});
});

View File

@@ -561,6 +561,16 @@ export const utils = {
await utils.waitForQueueFinish(accessToken, 'sidecar');
await utils.waitForQueueFinish(accessToken, 'metadataExtraction');
},
downloadAsset: async (accessToken: string, id: string) => {
const downloadedRes = await fetch(`${baseUrl}/api/assets/${id}/original`, {
headers: asBearerAuth(accessToken),
});
if (!downloadedRes.ok) {
throw new Error(`Failed to download asset ${id}: ${downloadedRes.status} ${await downloadedRes.text()}`);
}
return await downloadedRes.blob();
},
};
utils.initSdk();

View File

@@ -59,10 +59,11 @@ describe(AssetUploadController.name, () => {
expect(ctx.authenticate).toHaveBeenCalled();
});
it('should require Upload-Draft-Interop-Version header', async () => {
it('should require at least version 3 of Upload-Draft-Interop-Version header if provided', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Upload-Draft-Interop-Version', '2')
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
@@ -71,7 +72,7 @@ describe(AssetUploadController.name, () => {
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining(['version must be an integer number', 'version must not be less than 3']),
message: expect.arrayContaining(['version must not be less than 3']),
}),
);
});
@@ -102,17 +103,15 @@ describe(AssetUploadController.name, () => {
expect(body).toEqual(expect.objectContaining({ message: 'Missing repr-digest header' }));
});
it('should require Upload-Complete header', async () => {
it('should allow conventional upload without Upload-Complete header', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Repr-Digest', checksum)
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'Expected valid upload-complete header' }));
expect(status).toBe(201);
});
it('should require Upload-Length header for incomplete upload', async () => {
@@ -255,7 +254,7 @@ describe(AssetUploadController.name, () => {
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'Expected valid upload-complete header' }));
expect(body).toEqual(expect.objectContaining({ message: 'upload-complete must be a structured boolean value' }));
});
it('should validate Upload-Length is a positive integer', async () => {
@@ -323,10 +322,11 @@ describe(AssetUploadController.name, () => {
.patch(`/upload/${uploadId}`)
.set('Upload-Draft-Interop-Version', '8')
.set('Upload-Offset', '0')
.set('Content-Type', 'application/partial-upload')
.send(Buffer.from('test'));
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'Expected valid upload-complete header' }));
expect(body).toEqual(expect.objectContaining({ message: ['uploadComplete must be a boolean value'] }));
});
it('should validate UUID parameter', async () => {

View File

@@ -1,7 +1,7 @@
import { BadRequestException } from '@nestjs/common';
import { ApiProperty } from '@nestjs/swagger';
import { Expose, plainToInstance, Transform, Type } from 'class-transformer';
import { Equals, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator';
import { Equals, IsBoolean, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator';
import { ImmichHeader } from 'src/enum';
import { Optional, ValidateBoolean, ValidateDate } from 'src/validation';
import { parseDictionary } from 'structured-headers';
@@ -50,27 +50,22 @@ export class UploadAssetDataDto {
iCloudId!: string;
}
class BaseRufhHeadersDto {
@Expose({ name: Header.InteropVersion })
@Min(3)
@IsInt()
@Type(() => Number)
version!: number;
}
export class BaseUploadHeadersDto extends BaseRufhHeadersDto {
export class BaseUploadHeadersDto {
@Expose({ name: Header.ContentLength })
@Min(0)
@IsInt()
@Type(() => Number)
contentLength!: number;
@Expose()
@Transform(({ obj }) => isUploadComplete(obj))
uploadComplete!: boolean;
}
export class StartUploadDto extends BaseUploadHeadersDto {
@Expose({ name: Header.InteropVersion })
@Optional()
@Min(3)
@IsInt()
@Type(() => Number)
version?: number;
@Expose({ name: ImmichHeader.AssetData })
@ValidateNested()
@Transform(({ value }) => {
@@ -121,15 +116,25 @@ export class StartUploadDto extends BaseUploadHeadersDto {
}
const contentLength = obj[Header.ContentLength];
if (contentLength && isUploadComplete(obj)) {
if (contentLength && isUploadComplete(obj) !== false) {
return Number(contentLength);
}
throw new BadRequestException(`Missing ${Header.UploadLength} header`);
})
uploadLength!: number;
@Expose()
@Transform(({ obj }) => isUploadComplete(obj))
uploadComplete?: boolean;
}
export class ResumeUploadDto extends BaseUploadHeadersDto {
@Expose({ name: Header.InteropVersion })
@Min(3)
@IsInt()
@Type(() => Number)
version!: number;
@Expose({ name: Header.ContentType })
@ValidateIf((o) => o.version && o.version >= 6)
@Equals('application/partial-upload')
@@ -147,9 +152,20 @@ export class ResumeUploadDto extends BaseUploadHeadersDto {
@IsInt()
@Type(() => Number)
uploadOffset!: number;
@Expose()
@IsBoolean()
@Transform(({ obj }) => isUploadComplete(obj))
uploadComplete!: boolean;
}
export class GetUploadStatusDto extends BaseRufhHeadersDto {}
export class GetUploadStatusDto {
@Expose({ name: Header.InteropVersion })
@Min(3)
@IsInt()
@Type(() => Number)
version!: number;
}
export class UploadOkDto {
@ApiProperty()
@@ -159,12 +175,14 @@ export class UploadOkDto {
const STRUCTURED_TRUE = '?1';
const STRUCTURED_FALSE = '?0';
function isUploadComplete(obj: any): boolean {
function isUploadComplete(obj: any) {
const uploadComplete = obj[Header.UploadComplete];
if (uploadComplete === STRUCTURED_TRUE) {
return true;
} else if (uploadComplete === STRUCTURED_FALSE) {
return false;
} else if (uploadComplete !== undefined) {
throw new BadRequestException('upload-complete must be a structured boolean value');
}
const uploadIncomplete = obj[Header.UploadIncomplete];
@@ -172,6 +190,7 @@ function isUploadComplete(obj: any): boolean {
return false;
} else if (uploadIncomplete === STRUCTURED_FALSE) {
return true;
} else if (uploadComplete !== undefined) {
throw new BadRequestException('upload-incomplete must be a structured boolean value');
}
throw new BadRequestException(`Expected valid ${Header.UploadComplete} header`);
}

View File

@@ -64,8 +64,6 @@ describe(AssetUploadService.name, () => {
1024,
undefined,
);
expect(mocks.storage.mkdir).toHaveBeenCalledWith(expect.stringContaining(authStub.user1.user.id));
});
it('should determine asset type from filename extension', async () => {

View File

@@ -2,8 +2,8 @@ import { BadRequestException, Injectable, InternalServerErrorException } from '@
import { Response } from 'express';
import { DateTime } from 'luxon';
import { createHash } from 'node:crypto';
import { extname, join } from 'node:path';
import { Readable } from 'node:stream';
import { dirname, extname, join } from 'node:path';
import { Readable, Writable } from 'node:stream';
import { SystemConfig } from 'src/config';
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core';
@@ -55,6 +55,8 @@ export class AssetUploadService extends BaseService {
async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise<void> {
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
const { uploadComplete, assetData, uploadLength, contentLength, version } = dto;
const isComplete = uploadComplete !== false;
const isResumable = version && version >= 3 && uploadComplete !== undefined;
const { backup } = await this.getConfig({ withCache: true });
const asset = await this.onStart(auth, dto);
@@ -64,35 +66,49 @@ export class AssetUploadService extends BaseService {
}
const location = `/api/upload/${asset.id}`;
if (version <= MAX_RUFH_INTEROP_VERSION) {
if (isResumable) {
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
// this is a 5xx to indicate the client should do offset retrieval and resume
res.status(500).send('Incomplete asset already exists');
return;
}
// this is a 5xx to indicate the client should do offset retrieval and resume
res.status(500).send('Incomplete asset already exists');
return;
}
if (uploadComplete && uploadLength !== contentLength) {
if (isComplete && uploadLength !== contentLength) {
return this.sendInconsistentLength(res);
}
const location = `/api/upload/${asset.id}`;
if (version <= MAX_RUFH_INTEROP_VERSION) {
if (isResumable) {
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
}
this.addRequest(asset.id, req);
await this.databaseRepository.withUuidLock(asset.id, async () => {
// conventional upload, check status again with lock acquired before overwriting
if (asset.isDuplicate) {
const existingAsset = await this.assetRepository.getCompletionMetadata(asset.id, auth.user.id);
if (existingAsset?.status !== AssetStatus.Partial) {
return this.sendAlreadyCompleted(res);
}
}
await this.storageRepository.mkdir(dirname(asset.path));
let checksumBuffer: Buffer | undefined;
const writeStream = this.pipe(req, asset.path, contentLength);
if (uploadComplete) {
const writeStream = asset.isDuplicate
? this.storageRepository.createWriteStream(asset.path)
: this.storageRepository.createOrAppendWriteStream(asset.path);
this.pipe(req, writeStream, contentLength);
if (isComplete) {
const hash = createHash('sha1');
req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
this.setCompleteHeader(res, dto.version, uploadComplete);
if (!uploadComplete) {
if (isResumable) {
this.setCompleteHeader(res, version, uploadComplete);
}
if (!isComplete) {
res.status(201).set('Location', location).setHeader('Upload-Limit', this.getUploadLimits(backup)).send();
return;
}
@@ -142,7 +158,8 @@ export class AssetUploadService extends BaseService {
return;
}
const writeStream = this.pipe(req, path, contentLength);
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
this.pipe(req, writeStream, contentLength);
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
this.setCompleteHeader(res, version, uploadComplete);
if (!uploadComplete) {
@@ -300,7 +317,6 @@ export class AssetUploadService extends BaseService {
return { id: duplicate.id, path, status: duplicate.status, isDuplicate: true };
}
await this.storageRepository.mkdir(folder);
return { id: assetId, path, status: AssetStatus.Partial, isDuplicate: false };
}
@@ -342,8 +358,7 @@ export class AssetUploadService extends BaseService {
}
}
private pipe(req: Readable, path: string, size: number) {
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
private pipe(req: Readable, writeStream: Writable, size: number) {
let receivedLength = 0;
req.on('data', (data: Buffer) => {
receivedLength += data.length;
@@ -359,8 +374,6 @@ export class AssetUploadService extends BaseService {
}
writeStream.end();
});
return writeStream;
}
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number, limits: string): void {
@@ -427,12 +440,8 @@ export class AssetUploadService extends BaseService {
}
}
private setCompleteHeader(res: Response, interopVersion: number | null, isComplete: boolean): void {
if (!interopVersion) {
return;
}
if (interopVersion > 3) {
private setCompleteHeader(res: Response, interopVersion: number | undefined, isComplete: boolean): void {
if (interopVersion === undefined || interopVersion > 3) {
res.setHeader('Upload-Complete', isComplete ? '?1' : '?0');
} else {
res.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1');