Compare commits

...

4 Commits

Author SHA1 Message Date
timonrieger 918c78c25b collapse in AlbumUpdate 2026-06-15 13:59:32 +02:00
timonrieger b7bf647dc4 patch medium test 2026-06-15 13:59:32 +02:00
timonrieger 66c06a866d fix medium tests 2026-06-15 13:59:32 +02:00
timonrieger 4403519579 feat: add album asset event handling 2026-06-15 13:59:32 +02:00
11 changed files with 64 additions and 35 deletions
@@ -159,8 +159,8 @@ class RemoteAlbumService {
return updatedAlbum;
}
FutureOr<(DateTime, DateTime)> getDateRange(String albumId) {
return _repository.getDateRange(albumId);
Stream<(DateTime, DateTime)> watchDateRange(String albumId) {
return _repository.watchDateRange(albumId);
}
Future<List<UserDto>> getSharedUsers(String albumId) {
@@ -217,7 +217,7 @@ class DriftRemoteAlbumRepository extends DriftDatabaseRepository {
});
}
FutureOr<(DateTime, DateTime)> getDateRange(String albumId) {
Stream<(DateTime, DateTime)> watchDateRange(String albumId) {
final query = _db.remoteAlbumAssetEntity.selectOnly()
..where(_db.remoteAlbumAssetEntity.albumId.equals(albumId))
..addColumns([_db.remoteAssetEntity.createdAt.min(), _db.remoteAssetEntity.createdAt.max()])
@@ -229,7 +229,7 @@ class DriftRemoteAlbumRepository extends DriftDatabaseRepository {
final minDate = row.read(_db.remoteAssetEntity.createdAt.min());
final maxDate = row.read(_db.remoteAssetEntity.createdAt.max());
return (minDate ?? DateTime.now(), maxDate ?? DateTime.now());
}).getSingle();
}).watchSingle();
}
Future<List<UserDto>> getSharedUsers(String albumId) async {
@@ -313,9 +313,9 @@ class RemoteAlbumNotifier extends Notifier<RemoteAlbumState> {
}
}
final remoteAlbumDateRangeProvider = FutureProvider.family<(DateTime, DateTime), String>((ref, albumId) async {
final remoteAlbumDateRangeProvider = StreamProvider.autoDispose.family<(DateTime, DateTime), String>((ref, albumId) {
final service = ref.watch(remoteAlbumServiceProvider);
return service.getDateRange(albumId);
return service.watchDateRange(albumId);
});
final remoteAlbumSharedUsersProvider = FutureProvider.autoDispose.family<List<UserDto>, String>((ref, albumId) async {
@@ -103,6 +103,7 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
socket.on('AssetUploadReadyV2', _handleSyncAssetUploadReadyV2);
socket.on('AssetEditReadyV1', _handleSyncAssetEditReadyV1);
socket.on('AssetEditReadyV2', _handleSyncAssetEditReadyV2);
socket.on('on_album_update', _handleAlbumUpdate);
socket.on('on_config_update', _handleOnConfigUpdate);
socket.on('on_new_release', _handleReleaseUpdates);
} catch (e) {
@@ -184,6 +185,10 @@ class WebsocketNotifier extends StateNotifier<WebsocketState> {
unawaited(_ref.read(backgroundSyncProvider).syncWebsocketEditV1(data));
}
void _handleAlbumUpdate(dynamic _) {
unawaited(_ref.read(backgroundSyncProvider).syncRemote());
}
void _handleSyncAssetEditReadyV2(dynamic data) {
unawaited(_ref.read(backgroundSyncProvider).syncWebsocketEditV2(data));
}
+1 -1
View File
@@ -38,7 +38,7 @@ type EventMap = {
ConfigValidate: [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
// album events
AlbumUpdate: [{ id: string; recipientId: string }];
AlbumUpdate: [{ id: string; userIds: string[]; recipientIds: string[] }];
AlbumInvite: [{ id: string; userId: string; senderName: string }];
// asset events
@@ -37,6 +37,7 @@ export interface ClientEventMap {
on_asset_hidden: [string];
on_asset_restore: [string[]];
on_asset_stack_update: string[];
on_album_update: [string];
on_person_thumbnail: [string];
on_server_version: [ServerVersionResponseDto];
on_config_update: [];
+6 -3
View File
@@ -807,7 +807,8 @@ describe(AlbumService.name, () => {
expect(mocks.album.addAssetIds).toHaveBeenCalledWith(album.id, [asset1.id, asset2.id, asset3.id]);
expect(mocks.event.emit).toHaveBeenCalledWith('AlbumUpdate', {
id: album.id,
recipientId: owner.id,
userIds: album.albumUsers.map(({ user }) => user.id),
recipientIds: [owner.id],
});
});
@@ -1057,11 +1058,13 @@ describe(AlbumService.name, () => {
]);
expect(mocks.event.emit).toHaveBeenCalledWith('AlbumUpdate', {
id: album1.id,
recipientId: owner1.id,
userIds: album1.albumUsers.map(({ user }) => user.id),
recipientIds: [owner1.id],
});
expect(mocks.event.emit).toHaveBeenCalledWith('AlbumUpdate', {
id: album2.id,
recipientId: owner2.id,
userIds: album2.albumUsers.map(({ user }) => user.id),
recipientIds: [owner2.id],
});
});
+18 -13
View File
@@ -190,11 +190,9 @@ export class AlbumService extends BaseService {
auth.user.id,
);
const allUsersExceptUs = album.albumUsers.map(({ user }) => user.id).filter((userId) => userId !== auth.user.id);
for (const recipientId of allUsersExceptUs) {
await this.eventRepository.emit('AlbumUpdate', { id, recipientId });
}
const userIds = album.albumUsers.map(({ user }) => user.id);
const recipientIds = userIds.filter((userId) => userId !== auth.user.id);
await this.eventRepository.emit('AlbumUpdate', { id, userIds, recipientIds });
}
return results;
@@ -223,7 +221,7 @@ export class AlbumService extends BaseService {
}
const albumAssetValues: { albumId: string; assetId: string }[] = [];
const events: { id: string; recipients: string[] }[] = [];
const events: { id: string; userIds: string[]; recipientIds: string[] }[] = [];
for (const albumId of allowedAlbumIds) {
const existingAssetIds = await this.albumRepository.getAssetIds(albumId, [...allowedAssetIds]);
const notPresentAssetIds = [...allowedAssetIds].filter((id) => !existingAssetIds.has(id));
@@ -246,15 +244,14 @@ export class AlbumService extends BaseService {
},
auth.user.id,
);
const allUsersExceptUs = album.albumUsers.map(({ user }) => user.id).filter((userId) => userId !== auth.user.id);
events.push({ id: albumId, recipients: allUsersExceptUs });
const userIds = album.albumUsers.map(({ user }) => user.id);
const recipientIds = userIds.filter((userId) => userId !== auth.user.id);
events.push({ id: albumId, userIds, recipientIds });
}
await this.albumRepository.addAssetIdsToAlbums(albumAssetValues);
for (const event of events) {
for (const recipientId of event.recipients) {
await this.eventRepository.emit('AlbumUpdate', { id: event.id, recipientId });
}
await this.eventRepository.emit('AlbumUpdate', event);
}
return results;
@@ -271,8 +268,16 @@ export class AlbumService extends BaseService {
);
const removedIds = results.filter(({ success }) => success).map(({ id }) => id);
if (removedIds.length > 0 && album.albumThumbnailAssetId && removedIds.includes(album.albumThumbnailAssetId)) {
await this.albumRepository.updateThumbnails();
if (removedIds.length > 0) {
if (album.albumThumbnailAssetId && removedIds.includes(album.albumThumbnailAssetId)) {
await this.albumRepository.updateThumbnails();
}
await this.eventRepository.emit('AlbumUpdate', {
id,
userIds: album.albumUsers.map(({ user }) => user.id),
recipientIds: [],
});
}
return results;
@@ -2,7 +2,6 @@ import { defaults, SystemConfig } from 'src/config';
import { SystemConfigDto } from 'src/dtos/system-config.dto';
import { AssetFileType, JobName, JobStatus, UserMetadataKey } from 'src/enum';
import { NotificationService } from 'src/services/notification.service';
import { INotifyAlbumUpdateJob } from 'src/types';
import { AlbumFactory } from 'test/factories/album.factory';
import { AssetFileFactory } from 'test/factories/asset-file.factory';
import { AssetFactory } from 'test/factories/asset.factory';
@@ -157,13 +156,21 @@ describe(NotificationService.name, () => {
});
describe('onAlbumUpdateEvent', () => {
it('should queue notify album update event', async () => {
await sut.onAlbumUpdate({ id: 'album', recipientId: '42' });
expect(mocks.job.queue).toHaveBeenCalledWith({
it('should send a websocket event to every user and queue notify jobs for recipients', async () => {
await sut.onAlbumUpdate({ id: 'album', userIds: ['1', '42'], recipientIds: ['42'] });
expect(mocks.websocket.clientSend).toHaveBeenCalledWith('on_album_update', '1', 'album');
expect(mocks.websocket.clientSend).toHaveBeenCalledWith('on_album_update', '42', 'album');
expect(mocks.job.queue).toHaveBeenCalledExactlyOnceWith({
name: JobName.NotifyAlbumUpdate,
data: { id: 'album', recipientId: '42', delay: 300_000 },
});
});
it('should not queue email jobs when there are no recipients', async () => {
await sut.onAlbumUpdate({ id: 'album', userIds: ['1'], recipientIds: [] });
expect(mocks.websocket.clientSend).toHaveBeenCalledWith('on_album_update', '1', 'album');
expect(mocks.job.queue).not.toHaveBeenCalled();
});
});
describe('onAlbumInviteEvent', () => {
@@ -522,7 +529,7 @@ describe(NotificationService.name, () => {
});
it('should add new recipients for new images if job is already queued', async () => {
await sut.onAlbumUpdate({ id: '1', recipientId: '2' } as INotifyAlbumUpdateJob);
await sut.onAlbumUpdate({ id: '1', userIds: ['2'], recipientIds: ['2'] });
expect(mocks.job.removeJob).toHaveBeenCalledWith(JobName.NotifyAlbumUpdate, '1/2');
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.NotifyAlbumUpdate,
+12 -6
View File
@@ -217,12 +217,18 @@ export class NotificationService extends BaseService {
}
@OnEvent({ name: 'AlbumUpdate' })
async onAlbumUpdate({ id, recipientId }: ArgOf<'AlbumUpdate'>) {
await this.jobRepository.removeJob(JobName.NotifyAlbumUpdate, `${id}/${recipientId}`);
await this.jobRepository.queue({
name: JobName.NotifyAlbumUpdate,
data: { id, recipientId, delay: NotificationService.albumUpdateEmailDelayMs },
});
async onAlbumUpdate({ id, userIds, recipientIds }: ArgOf<'AlbumUpdate'>) {
for (const userId of userIds) {
this.websocketRepository.clientSend('on_album_update', userId, id);
}
for (const recipientId of recipientIds) {
await this.jobRepository.removeJob(JobName.NotifyAlbumUpdate, `${id}/${recipientId}`);
await this.jobRepository.queue({
name: JobName.NotifyAlbumUpdate,
data: { id, recipientId, delay: NotificationService.albumUpdateEmailDelayMs },
});
}
}
@OnEvent({ name: 'AlbumInvite' })
@@ -9,6 +9,7 @@ import { AssetRepository } from 'src/repositories/asset.repository';
import { ConfigRepository } from 'src/repositories/config.repository';
import { CryptoRepository } from 'src/repositories/crypto.repository';
import { DatabaseRepository } from 'src/repositories/database.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { PluginRepository } from 'src/repositories/plugin.repository';
import { StorageRepository } from 'src/repositories/storage.repository';
@@ -39,7 +40,7 @@ class WorkflowTestContext extends MediumTestContext<WorkflowExecutionService> {
UserRepository,
WorkflowRepository,
],
mock: [ConfigRepository],
mock: [ConfigRepository, EventRepository],
});
}
@@ -52,6 +53,7 @@ class WorkflowTestContext extends MediumTestContext<WorkflowExecutionService> {
mockData.resourcePaths.corePlugin = '../packages/plugin-core';
mockData.plugins.external.allow = false;
this.getMock(ConfigRepository).getEnv.mockReturnValue(mockData);
this.getMock(EventRepository).emit.mockResolvedValue();
this.get(LoggingRepository).setLogLevel(LogLevel.Verbose);
await this.sut.onPluginSync();