pr review

This commit is contained in:
shenlong-tanwen
2026-01-26 09:13:47 +05:30
parent 1a0cd87fed
commit f4dfbfbb3b
2 changed files with 32 additions and 42 deletions

View File

@@ -64,76 +64,63 @@ Future<void> _processCloudIdMappingsInBatches(
Logger logger,
) async {
const pageSize = 20000;
int offset = 0;
String? lastLocalId;
final seenRemoteAssetIds = <String>{};
while (true) {
final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, offset);
final mappings = await _fetchCloudIdMappings(drift, userId, pageSize, lastLocalId);
if (mappings.isEmpty) {
break;
}
final uniqueMappings = <_CloudIdMapping>[];
final items = <AssetMetadataBulkUpsertItemDto>[];
for (final mapping in mappings) {
if (seenRemoteAssetIds.add(mapping.remoteAssetId)) {
uniqueMappings.add(mapping);
items.add(
AssetMetadataBulkUpsertItemDto(
assetId: mapping.remoteAssetId,
key: kMobileMetadataKey,
value: RemoteAssetMobileAppMetadata(
cloudId: mapping.localAsset.cloudId,
createdAt: mapping.localAsset.createdAt.toIso8601String(),
adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(),
latitude: mapping.localAsset.latitude?.toString(),
longitude: mapping.localAsset.longitude?.toString(),
),
),
);
} else {
logger.fine('Duplicate remote asset ID found: ${mapping.remoteAssetId}. Skipping duplicate entry.');
}
}
if (uniqueMappings.isNotEmpty) {
if (items.isNotEmpty) {
if (canBulkUpdate) {
await _bulkUpdateCloudIds(assetsApi, uniqueMappings);
await _bulkUpdateCloudIds(assetsApi, items);
} else {
await _sequentialUpdateCloudIds(assetsApi, uniqueMappings);
await _sequentialUpdateCloudIds(assetsApi, items);
}
}
offset += pageSize;
lastLocalId = mappings.last.localAsset.id;
if (mappings.length < pageSize) {
break;
}
}
}
Future<void> _sequentialUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping> mappings) async {
for (final mapping in mappings) {
final item = AssetMetadataUpsertItemDto(
key: kMobileMetadataKey,
value: RemoteAssetMobileAppMetadata(
cloudId: mapping.localAsset.cloudId,
createdAt: mapping.localAsset.createdAt.toIso8601String(),
adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(),
latitude: mapping.localAsset.latitude?.toString(),
longitude: mapping.localAsset.longitude?.toString(),
),
);
Future<void> _sequentialUpdateCloudIds(AssetsApi assetsApi, List<AssetMetadataBulkUpsertItemDto> items) async {
for (final item in items) {
final upsertItem = AssetMetadataUpsertItemDto(key: item.key, value: item.value);
try {
await assetsApi.updateAssetMetadata(mapping.remoteAssetId, AssetMetadataUpsertDto(items: [item]));
await assetsApi.updateAssetMetadata(item.assetId, AssetMetadataUpsertDto(items: [upsertItem]));
} catch (error, stack) {
Logger('migrateCloudIds').warning('Failed to update metadata for asset ${mapping.remoteAssetId}', error, stack);
Logger('migrateCloudIds').warning('Failed to update metadata for asset ${item.assetId}', error, stack);
}
}
}
Future<void> _bulkUpdateCloudIds(AssetsApi assetsApi, List<_CloudIdMapping> mappings) async {
final items = <AssetMetadataBulkUpsertItemDto>[];
for (final mapping in mappings) {
items.add(
AssetMetadataBulkUpsertItemDto(
assetId: mapping.remoteAssetId,
key: kMobileMetadataKey,
value: RemoteAssetMobileAppMetadata(
cloudId: mapping.localAsset.cloudId,
createdAt: mapping.localAsset.createdAt.toIso8601String(),
adjustmentTime: mapping.localAsset.adjustmentTime?.toIso8601String(),
latitude: mapping.localAsset.latitude?.toString(),
longitude: mapping.localAsset.longitude?.toString(),
),
),
);
}
Future<void> _bulkUpdateCloudIds(AssetsApi assetsApi, List<AssetMetadataBulkUpsertItemDto> items) async {
try {
await assetsApi.updateBulkAssetMetadata(AssetMetadataBulkUpsertDto(items: items));
} catch (error, stack) {
@@ -163,7 +150,7 @@ Future<void> _populateCloudIds(Drift drift) async {
typedef _CloudIdMapping = ({String remoteAssetId, LocalAsset localAsset});
Future<List<_CloudIdMapping>> _fetchCloudIdMappings(Drift drift, String userId, int limit, int offset) async {
Future<List<_CloudIdMapping>> _fetchCloudIdMappings(Drift drift, String userId, int limit, String? lastLocalId) async {
final query =
drift.localAssetEntity.select().join([
innerJoin(
@@ -189,7 +176,11 @@ Future<List<_CloudIdMapping>> _fetchCloudIdMappings(Drift drift, String userId,
drift.remoteAssetCloudIdEntity.createdAt.isNotExp(drift.localAssetEntity.createdAt)),
)
..orderBy([OrderingTerm.asc(drift.localAssetEntity.id)])
..limit(limit, offset: offset);
..limit(limit);
if (lastLocalId != null) {
query.where(drift.localAssetEntity.id.isBiggerThanValue(lastLocalId));
}
return query.map((row) {
return (

View File

@@ -213,7 +213,6 @@ class DriftLocalAssetRepository extends DriftDatabaseRepository {
INNER JOIN remote_asset_entity
ON remote_asset_cloud_id_entity.asset_id = remote_asset_entity.id
WHERE local_asset_entity.i_cloud_id = remote_asset_cloud_id_entity.cloud_id
AND local_asset_entity.i_cloud_id IS NOT NULL
AND local_asset_entity.checksum IS NULL
AND remote_asset_cloud_id_entity.adjustment_time IS local_asset_entity.adjustment_time
AND remote_asset_cloud_id_entity.latitude IS local_asset_entity.latitude