-
Notifications
You must be signed in to change notification settings - Fork 196
feat: persist part metadata during multipart uploads and reuse when completing #3041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -193,10 +193,24 @@ open class MultipartStore( | |
| partNumber: Int, | ||
| path: Path, | ||
| encryptionHeaders: Map<String, String>, | ||
| checksumAlgorithm: ChecksumAlgorithm? = null, | ||
| checksum: String? = null, | ||
| ): String { | ||
| val file = inputPathToFile(path, getPartPath(bucket, uploadId, partNumber)) | ||
|
|
||
| return DigestUtil.hexDigest(encryptionHeaders[AwsHttpHeaders.X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID], file) | ||
| val etag = DigestUtil.hexDigest(encryptionHeaders[AwsHttpHeaders.X_AMZ_SERVER_SIDE_ENCRYPTION_AWS_KMS_KEY_ID], file) | ||
| writePartMetadata( | ||
| bucket, | ||
| uploadId, | ||
| PartMetadata( | ||
| partNumber = partNumber, | ||
| etag = etag, | ||
| checksumAlgorithm = checksumAlgorithm, | ||
| checksum = checksum, | ||
| size = file.length(), | ||
| lastModified = file.lastModified(), | ||
| ), | ||
| ) | ||
| return etag | ||
| } | ||
|
|
||
| fun completeMultipartUpload( | ||
|
|
@@ -225,7 +239,8 @@ open class MultipartStore( | |
| input.transferTo(os) | ||
| } | ||
| } | ||
| val checksumFor = validateChecksums(uploadInfo, tempFile, parts, partsPaths, checksum, checksumType, checksumAlgorithm) | ||
| val checksumFor = | ||
| validateChecksums(uploadInfo, tempFile, parts, partsPaths, checksum, checksumType, checksumAlgorithm, bucket, uploadId) | ||
| val etag = DigestUtil.hexDigestMultipart(partsPaths) | ||
| val s3ObjectMetadata = | ||
| objectStore.storeS3ObjectMetadata( | ||
|
|
@@ -280,10 +295,15 @@ open class MultipartStore( | |
| val name = it.fileName.toString() | ||
| val prefix = name.substringBefore('.') | ||
| val partNumber = prefix.toInt() | ||
| val file = it.toFile() | ||
| val partMd5 = DigestUtil.hexDigest(file) | ||
| val lastModified = Date(file.lastModified()) | ||
| Part(partNumber, partMd5, lastModified, file.length()) | ||
| val metadata = readPartMetadata(bucket, uploadId, partNumber) | ||
| if (metadata != null) { | ||
| partFromMetadata(metadata) | ||
| } else { | ||
| val file = it.toFile() | ||
| val partMd5 = DigestUtil.hexDigest(file) | ||
| val lastModified = Date(file.lastModified()) | ||
| Part(partNumber, partMd5, lastModified, file.length()) | ||
| } | ||
| }.sortedBy { it.partNumber } | ||
| .toList() | ||
| } catch (e: IOException) { | ||
|
|
@@ -304,13 +324,21 @@ open class MultipartStore( | |
| ): String { | ||
| verifyMultipartUploadPreparation(destinationBucket, destinationId, uploadId) | ||
|
|
||
| return copyPartToFile( | ||
| bucket, | ||
| id, | ||
| copyRange, | ||
| createPartFile(destinationBucket, destinationId, uploadId, partNumber), | ||
| versionId, | ||
| val partFile = createPartFile(destinationBucket, destinationId, uploadId, partNumber) | ||
| val etag = copyPartToFile(bucket, id, copyRange, partFile, versionId) | ||
| writePartMetadata( | ||
| destinationBucket, | ||
| uploadId, | ||
| PartMetadata( | ||
| partNumber = partNumber, | ||
| etag = etag, | ||
| checksumAlgorithm = null, | ||
| checksum = null, | ||
| size = partFile.length(), | ||
| lastModified = partFile.lastModified(), | ||
| ), | ||
| ) | ||
| return etag | ||
| } | ||
|
|
||
| private fun copyPartToFile( | ||
|
|
@@ -396,6 +424,78 @@ open class MultipartStore( | |
| uploadId: UUID, | ||
| ): Path = getPartsFolder(bucket, uploadId).resolve(MULTIPART_UPLOAD_META_FILE) | ||
|
|
||
| private fun getPartMetadataPath( | ||
| bucket: BucketMetadata, | ||
| uploadId: UUID, | ||
| partNumber: Int, | ||
| ): Path = getPartsFolder(bucket, uploadId).resolve(partNumber.toString() + PART_METADATA_SUFFIX) | ||
|
|
||
| private fun writePartMetadata( | ||
| bucket: BucketMetadata, | ||
| uploadId: UUID, | ||
| partMetadata: PartMetadata, | ||
| ) { | ||
| try { | ||
| val metaFile = getPartMetadataPath(bucket, uploadId, partMetadata.partNumber).toFile() | ||
| objectMapper.writeValue(metaFile, partMetadata) | ||
| } catch (e: IOException) { | ||
| throw IllegalStateException( | ||
| "Could not write part metadata file. uploadId=$uploadId, partNumber=${partMetadata.partNumber}", | ||
| e, | ||
| ) | ||
| } | ||
|
Comment on lines
+433
to
+446
|
||
| } | ||
|
|
||
| private fun readPartMetadata( | ||
| bucket: BucketMetadata, | ||
| uploadId: UUID, | ||
| partNumber: Int, | ||
| ): PartMetadata? { | ||
| val metaPath = getPartMetadataPath(bucket, uploadId, partNumber) | ||
| if (!metaPath.exists()) return null | ||
| return try { | ||
| objectMapper.readValue(metaPath.toFile(), PartMetadata::class.java) | ||
| } catch (e: IOException) { | ||
| LOG.warn("Could not read part metadata file. uploadId={}, partNumber={}", uploadId, partNumber, e) | ||
| null | ||
| } | ||
| } | ||
|
|
||
| private fun partFromMetadata(metadata: PartMetadata): Part { | ||
| val checksum = metadata.checksum | ||
| return when (metadata.checksumAlgorithm) { | ||
| ChecksumAlgorithm.CRC32 -> { | ||
| Part(metadata.partNumber, metadata.etag, Date(metadata.lastModified), metadata.size, checksumCRC32 = checksum) | ||
| } | ||
|
|
||
| ChecksumAlgorithm.CRC32C -> { | ||
| Part(metadata.partNumber, metadata.etag, Date(metadata.lastModified), metadata.size, checksumCRC32C = checksum) | ||
| } | ||
|
|
||
| ChecksumAlgorithm.CRC64NVME -> { | ||
| Part( | ||
| metadata.partNumber, | ||
| metadata.etag, | ||
| Date(metadata.lastModified), | ||
| metadata.size, | ||
| checksumCRC64NVME = checksum, | ||
| ) | ||
| } | ||
|
|
||
| ChecksumAlgorithm.SHA1 -> { | ||
| Part(metadata.partNumber, metadata.etag, Date(metadata.lastModified), metadata.size, checksumSHA1 = checksum) | ||
| } | ||
|
|
||
| ChecksumAlgorithm.SHA256 -> { | ||
| Part(metadata.partNumber, metadata.etag, Date(metadata.lastModified), metadata.size, checksumSHA256 = checksum) | ||
| } | ||
|
|
||
| null -> { | ||
| Part(metadata.partNumber, metadata.etag, Date(metadata.lastModified), metadata.size) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private fun getPartsFolder( | ||
| bucket: BucketMetadata, | ||
| uploadId: UUID, | ||
|
|
@@ -463,6 +563,8 @@ open class MultipartStore( | |
| checksum: String?, | ||
| checksumType: ChecksumType?, | ||
| checksumAlgorithm: ChecksumAlgorithm?, | ||
| bucket: BucketMetadata, | ||
| uploadId: UUID, | ||
| ): String? { | ||
| val checksumToValidate = checksum ?: uploadInfo.checksum | ||
| val checksumAlgorithmToValidate = checksumAlgorithm ?: uploadInfo.checksumAlgorithm | ||
|
|
@@ -471,7 +573,7 @@ open class MultipartStore( | |
| } | ||
| val checksumFor = | ||
| if (uploadInfo.checksumType == ChecksumType.COMPOSITE) { | ||
| checksumFor(partsPaths, uploadInfo) | ||
| checksumForComposite(partsPaths, uploadInfo, bucket, uploadId) | ||
| } else { | ||
| checksumFor(tempFile, uploadInfo) | ||
| } | ||
|
|
@@ -493,12 +595,35 @@ open class MultipartStore( | |
| return checksumFor | ||
| } | ||
|
|
||
| private fun checksumFor( | ||
| /** | ||
| * Computes the COMPOSITE checksum. Uses persisted per-part checksums when all parts have them | ||
| * stored (avoids re-reading the binary part files); falls back to computing from files otherwise. | ||
| */ | ||
| private fun checksumForComposite( | ||
| paths: List<Path>, | ||
| uploadInfo: MultipartUploadInfo, | ||
| bucket: BucketMetadata, | ||
| uploadId: UUID, | ||
| ): String? = | ||
| uploadInfo.checksumAlgorithm?.let { algo -> | ||
| DigestUtil.checksumMultipart(paths, algo.toChecksumAlgorithm()) | ||
| val partNumbers = | ||
| paths.map { | ||
| it.fileName | ||
| .toString() | ||
| .substringBefore('.') | ||
| .toInt() | ||
| } | ||
| val storedChecksums = | ||
| partNumbers | ||
| .map { readPartMetadata(bucket, uploadId, it) } | ||
| .takeIf { metadataList -> | ||
| metadataList.all { it?.checksum != null && it.checksumAlgorithm == uploadInfo.checksumAlgorithm } | ||
| }?.mapNotNull { it?.checksum } | ||
| if (storedChecksums != null) { | ||
| DigestUtil.checksumMultipartFromStoredChecksums(storedChecksums, algo.toChecksumAlgorithm()) | ||
| } else { | ||
| DigestUtil.checksumMultipart(paths, algo.toChecksumAlgorithm()) | ||
| } | ||
| } | ||
|
|
||
| private fun checksumFor( | ||
|
|
@@ -512,6 +637,7 @@ open class MultipartStore( | |
| companion object { | ||
| private val LOG: Logger = LoggerFactory.getLogger(MultipartStore::class.java) | ||
| private const val PART_SUFFIX = ".part" | ||
| private const val PART_METADATA_SUFFIX = ".partMetadata.json" | ||
| private const val MULTIPART_UPLOAD_META_FILE = "multipartMetadata.json" | ||
| const val MULTIPARTS_FOLDER: String = "multiparts" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,33 @@ | ||
| /* | ||
| * Copyright 2017-2026 Adobe. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.adobe.testing.s3mock.store | ||
|
|
||
| import com.adobe.testing.s3mock.dto.ChecksumAlgorithm | ||
|
|
||
| /** | ||
| * Encapsulates the metadata of a single multipart upload part, persisted alongside the part's | ||
| * binary data so that ETag, checksum, size, and last-modified do not have to be recomputed from | ||
| * the binary file on subsequent reads. | ||
| */ | ||
| data class PartMetadata( | ||
| val partNumber: Int, | ||
| val etag: String?, | ||
| val checksumAlgorithm: ChecksumAlgorithm?, | ||
| val checksum: String?, | ||
| val size: Long, | ||
| val lastModified: Long, | ||
| ) |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -172,6 +172,29 @@ object DigestUtil { | |||||||||||||||||||||||||||||||||||||
| algorithm: software.amazon.awssdk.checksums.spi.ChecksumAlgorithm, | ||||||||||||||||||||||||||||||||||||||
| ): String = "${BinaryUtils.toBase64(checksum(paths, algorithm))}-${paths.size}" | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||
| * Calculates the composite checksum from pre-computed per-part checksums encoded as base64 | ||||||||||||||||||||||||||||||||||||||
| * strings. This avoids re-reading the part binary files when per-part checksums were already | ||||||||||||||||||||||||||||||||||||||
| * persisted during upload. | ||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||
| * Each base64-encoded part checksum is decoded to bytes; all decoded bytes are concatenated; | ||||||||||||||||||||||||||||||||||||||
| * then the final checksum is computed over that concatenation. | ||||||||||||||||||||||||||||||||||||||
| * [API](https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html) | ||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||
| @JvmStatic | ||||||||||||||||||||||||||||||||||||||
| fun checksumMultipartFromStoredChecksums( | ||||||||||||||||||||||||||||||||||||||
| partChecksums: List<String>, | ||||||||||||||||||||||||||||||||||||||
| algorithm: software.amazon.awssdk.checksums.spi.ChecksumAlgorithm, | ||||||||||||||||||||||||||||||||||||||
| ): String { | ||||||||||||||||||||||||||||||||||||||
| val sdkChecksum = SdkChecksum.forAlgorithm(algorithm) | ||||||||||||||||||||||||||||||||||||||
| val allChecksumBytes = | ||||||||||||||||||||||||||||||||||||||
| partChecksums | ||||||||||||||||||||||||||||||||||||||
| .flatMap { Base64.getDecoder().decode(it).toList() } | ||||||||||||||||||||||||||||||||||||||
| .toByteArray() | ||||||||||||||||||||||||||||||||||||||
| sdkChecksum.update(allChecksumBytes, 0, allChecksumBytes.size) | ||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+190
to
+194
|
||||||||||||||||||||||||||||||||||||||
| val allChecksumBytes = | |
| partChecksums | |
| .flatMap { Base64.getDecoder().decode(it).toList() } | |
| .toByteArray() | |
| sdkChecksum.update(allChecksumBytes, 0, allChecksumBytes.size) | |
| val decoder = Base64.getDecoder() | |
| try { | |
| partChecksums.forEach { partChecksum -> | |
| val decodedChecksum = decoder.decode(partChecksum) | |
| sdkChecksum.update(decodedChecksum, 0, decodedChecksum.size) | |
| } | |
| } catch (e: IllegalArgumentException) { | |
| throw IllegalStateException( | |
| "Checksum could not be calculated because stored multipart part checksum metadata is malformed.", | |
| e, | |
| ) | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
completeMultipartUploaddeletes only the.partfiles (partsPaths) after completion. With the new{partNumber}.partMetadata.jsonsidecars being written, those metadata files will be left behind indefinitely, increasing disk usage for completed uploads. When cleaning up parts, also delete the corresponding.partMetadata.jsonfiles (or delete all part-related files in the upload folder) so completed uploads don’t accumulate per-part metadata.