paginate cloud id fetch in migrate cloud id

This commit is contained in:
shenlong-tanwen
2026-01-25 21:03:17 +05:30
parent d0bb2d7837
commit 1a0cd87fed

View File

@@ -50,24 +50,51 @@ Future<void> syncCloudIds(ProviderContainer ref) async {
return;
}
final mappingsToUpdate = await _fetchCloudIdMappings(db, currentUser.id);
// Deduplicate mappings as a single remote asset ID can match multiple local assets
final seenRemoteAssetIds = <String>{};
final uniqueMapping = mappingsToUpdate.where((mapping) {
if (!seenRemoteAssetIds.add(mapping.remoteAssetId)) {
logger.fine('Duplicate remote asset ID found: ${mapping.remoteAssetId}. Skipping duplicate entry.');
return false;
}
return true;
}).toList();
final assetApi = ref.read(apiServiceProvider).assetsApi;
if (canBulkUpdateMetadata) {
await _bulkUpdateCloudIds(assetApi, uniqueMapping);
return;
// Process cloud IDs in paginated batches
await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger);
}
Future<void> _processCloudIdMappingsInBatches(
Drift drift,
String userId,
AssetsApi assetsApi,
bool canBulkUpdate,
Logger logger,
) async {
const pageSize = 20000;
int offset = 0;
final seenRemoteAssetIds = <String>{};
while (true) {
final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, offset);
if (mappings.isEmpty) {
break;
}
final uniqueMappings = <_CloudIdMapping>[];
for (final mapping in mappings) {
if (seenRemoteAssetIds.add(mapping.remoteAssetId)) {
uniqueMappings.add(mapping);
} else {
logger.fine('Duplicate remote asset ID found: ${mapping.remoteAssetId}. Skipping duplicate entry.');
}
}
if (uniqueMappings.isNotEmpty) {
if (canBulkUpdate) {
await _bulkUpdateCloudIds(assetsApi, uniqueMappings);
} else {
await _sequentialUpdateCloudIds(assetsApi, uniqueMappings);
}
}
offset += pageSize;
if (mappings.length < pageSize) {
break;
}
}
await _sequentialUpdateCloudIds(assetApi, uniqueMapping);
}
Future<void> _sequentialUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping> mappings) async {
@@ -91,31 +118,26 @@ Future<void> _sequentialUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping
}
Future<void> _bulkUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping> mappings) async {
const batchSize = 10000;
for (int i = 0; i < mappings.length; i += batchSize) {
final endIndex = (i + batchSize > mappings.length) ? mappings.length : i + batchSize;
final batch = mappings.sublist(i, endIndex);
final items = <AssetMetadataBulkUpsertItemDto>[];
for (final mapping in batch) {
items.add(
AssetMetadataBulkUpsertItemDto(
assetId: mapping.remoteAssetId,
key: kMobileMetadataKey,
value: RemoteAssetMobileAppMetadata(
cloudId: mapping.localAsset.cloudId,
createdAt: mapping.localAsset.createdAt.toIso8601String(),
adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(),
latitude: mapping.localAsset.latitude?.toString(),
longitude: mapping.localAsset.longitude?.toString(),
),
final items = <AssetMetadataBulkUpsertItemDto>[];
for (final mapping in mappings) {
items.add(
AssetMetadataBulkUpsertItemDto(
assetId: mapping.remoteAssetId,
key: kMobileMetadataKey,
value: RemoteAssetMobileAppMetadata(
cloudId: mapping.localAsset.cloudId,
createdAt: mapping.localAsset.createdAt.toIso8601String(),
adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(),
latitude: mapping.localAsset.latitude?.toString(),
longitude: mapping.localAsset.longitude?.toString(),
),
);
}
try {
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items));
} catch (error, stack) {
Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack);
}
),
);
}
try {
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items));
} catch (error, stack) {
Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack);
}
}
@@ -141,31 +163,34 @@ Future<void> _populateCloudIds(Drift drift) async {
typedef _CloudIdMapping = ({String remoteAssetId, LocalAsset localAsset});
Future<List<_CloudIdMapping>> _fetchCloudIdMappings(Drift drift, String userId) async {
Future<List<_CloudIdMapping>> _fetchCloudIdMappings(Drift drift, String userId, int limit, int offset) async {
final query =
drift.remoteAssetEntity.select().join([
leftOuterJoin(
drift.localAssetEntity,
drift.localAssetEntity.checksum.equalsExp(drift.remoteAssetEntity.checksum),
),
leftOuterJoin(
drift.remoteAssetCloudIdEntity,
drift.remoteAssetEntity.id.equalsExp(drift.remoteAssetCloudIdEntity.assetId),
useColumns: false,
),
])..where(
// Only select assets that have a local cloud ID but either no remote cloud ID or a mismatched eTag
drift.localAssetEntity.id.isNotNull() &
drift.localAssetEntity.iCloudId.isNotNull() &
drift.remoteAssetEntity.ownerId.equals(userId) &
// Skip locked assets as we cannot update them without unlocking first
drift.remoteAssetEntity.visibility.isNotValue(AssetVisibility.locked.index) &
(drift.remoteAssetCloudIdEntity.cloudId.isNull() |
drift.remoteAssetCloudIdEntity.adjustmentTime.isNotExp(drift.localAssetEntity.adjustmentTime) |
drift.remoteAssetCloudIdEntity.latitude.isNotExp(drift.localAssetEntity.latitude) |
drift.remoteAssetCloudIdEntity.longitude.isNotExp(drift.localAssetEntity.longitude) |
drift.remoteAssetCloudIdEntity.createdAt.isNotExp(drift.localAssetEntity.createdAt)),
);
drift.localAssetEntity.select().join([
innerJoin(
drift.remoteAssetEntity,
drift.localAssetEntity.checksum.equalsExp(drift.remoteAssetEntity.checksum),
),
leftOuterJoin(
drift.remoteAssetCloudIdEntity,
drift.remoteAssetEntity.id.equalsExp(drift.remoteAssetCloudIdEntity.assetId),
useColumns: false,
),
])
..where(
// Only select assets that have a local cloud ID but either no remote cloud ID or a mismatched eTag
drift.localAssetEntity.iCloudId.isNotNull() &
drift.remoteAssetEntity.ownerId.equals(userId) &
// Skip locked assets as we cannot update them without unlocking first
drift.remoteAssetEntity.visibility.isNotValue(AssetVisibility.locked.index) &
(drift.remoteAssetCloudIdEntity.cloudId.isNull() |
drift.remoteAssetCloudIdEntity.adjustmentTime.isNotExp(drift.localAssetEntity.adjustmentTime) |
drift.remoteAssetCloudIdEntity.latitude.isNotExp(drift.localAssetEntity.latitude) |
drift.remoteAssetCloudIdEntity.longitude.isNotExp(drift.localAssetEntity.longitude) |
drift.remoteAssetCloudIdEntity.createdAt.isNotExp(drift.localAssetEntity.createdAt)),
)
..orderBy([OrderingTerm.asc(drift.localAssetEntity.id)])
..limit(limit, offset: offset);
return query.map((row) {
return (
remoteAssetId: row.read(drift.remoteAssetEntity.id)!,