diff --git a/mobile/lib/domain/utils/migrate_cloud_ids.dart b/mobile/lib/domain/utils/migrate_cloud_ids.dart index 6ff78823c2..904ea2b795 100644 --- a/mobile/lib/domain/utils/migrate_cloud_ids.dart +++ b/mobile/lib/domain/utils/migrate_cloud_ids.dart @@ -50,24 +50,51 @@ Future syncCloudIds(ProviderContainer ref) async { return; } - final mappingsToUpdate = await _fetchCloudIdMappings(db, currentUser.id); - // Deduplicate mappings as a single remote asset ID can match multiple local assets - final seenRemoteAssetIds = {}; - final uniqueMapping = mappingsToUpdate.where((mapping) { - if (!seenRemoteAssetIds.add(mapping.remoteAssetId)) { - logger.fine('Duplicate remote asset ID found: ${mapping.remoteAssetId}. Skipping duplicate entry.'); - return false; - } - return true; - }).toList(); - final assetApi = ref.read(apiServiceProvider).assetsApi; - if (canBulkUpdateMetadata) { - await _bulkUpdateCloudIds(assetApi, uniqueMapping); - return; + // Process cloud IDs in paginated batches + await _processCloudIdMappingsInBatches(db, currentUser.id, assetApi, canBulkUpdateMetadata, logger); +} + +Future _processCloudIdMappingsInBatches( + Drift drift, + String userId, + AssetsApi assetsApi, + bool canBulkUpdate, + Logger logger, +) async { + const pageSize = 20000; + int offset = 0; + final seenRemoteAssetIds = {}; + + while (true) { + final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, offset); + if (mappings.isEmpty) { + break; + } + + final uniqueMappings = <_CloudIdMapping>[]; + for (final mapping in mappings) { + if (seenRemoteAssetIds.add(mapping.remoteAssetId)) { + uniqueMappings.add(mapping); + } else { + logger.fine('Duplicate remote asset ID found: ${mapping.remoteAssetId}. Skipping duplicate entry.'); + } + } + + if (uniqueMappings.isNotEmpty) { + if (canBulkUpdate) { + await _bulkUpdateCloudIds(assetsApi, uniqueMappings); + } else { + await _sequentialUpdateCloudIds(assetsApi, uniqueMappings); + } + } + + offset += pageSize; + if (mappings.length < pageSize) { + break; + } } - await _sequentialUpdateCloudIds(assetApi, uniqueMapping); } Future _sequentialUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping> mappings) async { @@ -91,31 +118,26 @@ Future _sequentialUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping } Future _bulkUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping> mappings) async { - const batchSize = 10000; - for (int i = 0; i < mappings.length; i += batchSize) { - final endIndex = (i + batchSize > mappings.length) ? mappings.length : i + batchSize; - final batch = mappings.sublist(i, endIndex); - final items = []; - for (final mapping in batch) { - items.add( - AssetMetadataBulkUpsertItemDto( - assetId: mapping.remoteAssetId, - key: kMobileMetadataKey, - value: RemoteAssetMobileAppMetadata( - cloudId: mapping.localAsset.cloudId, - createdAt: mapping.localAsset.createdAt.toIso8601String(), - adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(), - latitude: mapping.localAsset.latitude?.toString(), - longitude: mapping.localAsset.longitude?.toString(), - ), + final items = []; + for (final mapping in mappings) { + items.add( + AssetMetadataBulkUpsertItemDto( + assetId: mapping.remoteAssetId, + key: kMobileMetadataKey, + value: RemoteAssetMobileAppMetadata( + cloudId: mapping.localAsset.cloudId, + createdAt: mapping.localAsset.createdAt.toIso8601String(), + adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(), + latitude: mapping.localAsset.latitude?.toString(), + longitude: mapping.localAsset.longitude?.toString(), ), - ); - } - try { - await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items)); - } catch (error, stack) { - Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack); - } + ), + ); + } + try { + await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items)); + } catch (error, stack) { + Logger('migrateCloudIds').warning('Failed to bulk update metadata', error, stack); } } @@ -141,31 +163,34 @@ Future _populateCloudIds(Drift drift) async { typedef _CloudIdMapping = ({String remoteAssetId, LocalAsset localAsset}); -Future> _fetchCloudIdMappings(Drift drift, String userId) async { +Future> _fetchCloudIdMappings(Drift drift, String userId, int limit, int offset) async { final query = - drift.remoteAssetEntity.select().join([ - leftOuterJoin( - drift.localAssetEntity, - drift.localAssetEntity.checksum.equalsExp(drift.remoteAssetEntity.checksum), - ), - leftOuterJoin( - drift.remoteAssetCloudIdEntity, - drift.remoteAssetEntity.id.equalsExp(drift.remoteAssetCloudIdEntity.assetId), - useColumns: false, - ), - ])..where( - // Only select assets that have a local cloud ID but either no remote cloud ID or a mismatched eTag - drift.localAssetEntity.id.isNotNull() & - drift.localAssetEntity.iCloudId.isNotNull() & - drift.remoteAssetEntity.ownerId.equals(userId) & - // Skip locked assets as we cannot update them without unlocking first - drift.remoteAssetEntity.visibility.isNotValue(AssetVisibility.locked.index) & - (drift.remoteAssetCloudIdEntity.cloudId.isNull() | - drift.remoteAssetCloudIdEntity.adjustmentTime.isNotExp(drift.localAssetEntity.adjustmentTime) | - drift.remoteAssetCloudIdEntity.latitude.isNotExp(drift.localAssetEntity.latitude) | - drift.remoteAssetCloudIdEntity.longitude.isNotExp(drift.localAssetEntity.longitude) | - drift.remoteAssetCloudIdEntity.createdAt.isNotExp(drift.localAssetEntity.createdAt)), - ); + drift.localAssetEntity.select().join([ + innerJoin( + drift.remoteAssetEntity, + drift.localAssetEntity.checksum.equalsExp(drift.remoteAssetEntity.checksum), + ), + leftOuterJoin( + drift.remoteAssetCloudIdEntity, + drift.remoteAssetEntity.id.equalsExp(drift.remoteAssetCloudIdEntity.assetId), + useColumns: false, + ), + ]) + ..where( + // Only select assets that have a local cloud ID but either no remote cloud ID or a mismatched eTag + drift.localAssetEntity.iCloudId.isNotNull() & + drift.remoteAssetEntity.ownerId.equals(userId) & + // Skip locked assets as we cannot update them without unlocking first + drift.remoteAssetEntity.visibility.isNotValue(AssetVisibility.locked.index) & + (drift.remoteAssetCloudIdEntity.cloudId.isNull() | + drift.remoteAssetCloudIdEntity.adjustmentTime.isNotExp(drift.localAssetEntity.adjustmentTime) | + drift.remoteAssetCloudIdEntity.latitude.isNotExp(drift.localAssetEntity.latitude) | + drift.remoteAssetCloudIdEntity.longitude.isNotExp(drift.localAssetEntity.longitude) | + drift.remoteAssetCloudIdEntity.createdAt.isNotExp(drift.localAssetEntity.createdAt)), + ) + ..orderBy([OrderingTerm.asc(drift.localAssetEntity.id)]) + ..limit(limit, offset: offset); + return query.map((row) { return ( remoteAssetId: row.read(drift.remoteAssetEntity.id)!,