Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;

private volatile boolean closed;
private volatile boolean closing;
// how much of data is actually written yet to underlying stream
Expand Down Expand Up @@ -130,7 +122,6 @@ private ECKeyOutputStream(Builder builder) {
return flushStripeFromQueue();
});
this.flushCheckpoint = new AtomicLong(0);
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}

@Override
Expand Down Expand Up @@ -489,12 +480,6 @@ public void close() throws IOException {
Preconditions.checkArgument(writeOffset == offset,
"Expected writeOffset= " + writeOffset
+ " Expected offset=" + offset);
if (atomicKeyCreation) {
long expectedSize = blockOutputStreamEntryPool.getDataSize();
Preconditions.checkState(expectedSize == offset, String.format(
"Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput

private long clientID;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;

private List<CheckedRunnable<IOException>> preCommits = Collections.emptyList();

@Override
Expand Down Expand Up @@ -130,7 +122,6 @@ public KeyDataStreamOutput() {

this.writeOffset = 0;
this.clientID = 0L;
this.atomicKeyCreation = false;
}

@SuppressWarnings({"parameternumber", "squid:S00107"})
Expand All @@ -141,8 +132,7 @@ public KeyDataStreamOutput(
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationConfig replicationConfig,
String uploadID, int partNumber, boolean isMultipart,
boolean unsafeByteBufferConversion,
boolean atomicKeyCreation
boolean unsafeByteBufferConversion
) {
super(HddsClientUtils.getRetryPolicyByException(
config.getMaxRetryCount(), config.getRetryInterval()));
Expand All @@ -163,7 +153,6 @@ public KeyDataStreamOutput(
// encrypted bucket.
this.writeOffset = 0;
this.clientID = handler.getId();
this.atomicKeyCreation = atomicKeyCreation;
}

/**
Expand Down Expand Up @@ -458,12 +447,6 @@ public void close() throws IOException {
if (!isException()) {
Preconditions.checkArgument(writeOffset == offset);
}
if (atomicKeyCreation) {
long expectedSize = blockDataStreamOutputEntryPool.getDataSize();
Preconditions.checkArgument(expectedSize == offset,
String.format("Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
Expand Down Expand Up @@ -503,7 +486,6 @@ public static class Builder {
private boolean unsafeByteBufferConversion;
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private boolean atomicKeyCreation = false;

public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
Expand Down Expand Up @@ -555,11 +537,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
return this;
}

public Builder setAtomicKeyCreation(boolean atomicKey) {
this.atomicKeyCreation = atomicKey;
return this;
}

public KeyDataStreamOutput build() {
return new KeyDataStreamOutput(
clientConfig,
Expand All @@ -572,8 +549,7 @@ public KeyDataStreamOutput build() {
multipartUploadID,
multipartNumber,
isMultipartKey,
unsafeByteBufferConversion,
atomicKeyCreation);
unsafeByteBufferConversion);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,6 @@ public class KeyOutputStream extends OutputStream
private long clientID;
private StreamBufferArgs streamBufferArgs;

/**
* Indicates if an atomic write is required. When set to true,
* the amount of data written must match the declared size during the commit.
* A mismatch will prevent the commit from succeeding.
* This is essential for operations like S3 put to ensure atomicity.
*/
private boolean atomicKeyCreation;
private ContainerClientMetrics clientMetrics;
private OzoneManagerVersion ozoneManagerVersion;
private final Lock writeLock = new ReentrantLock();
Expand Down Expand Up @@ -187,7 +180,6 @@ public KeyOutputStream(Builder b) {
this.isException = false;
this.writeOffset = 0;
this.clientID = b.getOpenHandler().getId();
this.atomicKeyCreation = b.getAtomicKeyCreation();
this.streamBufferArgs = b.getStreamBufferArgs();
this.clientMetrics = b.getClientMetrics();
this.ozoneManagerVersion = b.ozoneManagerVersion;
Expand Down Expand Up @@ -657,12 +649,6 @@ private void closeInternal() throws IOException {
if (!isException) {
Preconditions.checkArgument(writeOffset == offset);
}
if (atomicKeyCreation) {
long expectedSize = blockOutputStreamEntryPool.getDataSize();
Preconditions.checkState(expectedSize == offset,
String.format("Expected: %d and actual %d write sizes do not match",
expectedSize, offset));
}
for (CheckedRunnable<IOException> preCommit : preCommits) {
preCommit.run();
}
Expand Down Expand Up @@ -703,7 +689,6 @@ public static class Builder {
private OzoneClientConfig clientConfig;
private ReplicationConfig replicationConfig;
private ContainerClientMetrics clientMetrics;
private boolean atomicKeyCreation = false;
private StreamBufferArgs streamBufferArgs;
private Supplier<ExecutorService> executorServiceSupplier;
private OzoneManagerVersion ozoneManagerVersion;
Expand Down Expand Up @@ -802,11 +787,6 @@ public Builder setReplicationConfig(ReplicationConfig replConfig) {
return this;
}

public Builder setAtomicKeyCreation(boolean atomicKey) {
this.atomicKeyCreation = atomicKey;
return this;
}

public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
this.clientMetrics = clientMetrics;
return this;
Expand All @@ -816,10 +796,6 @@ public ContainerClientMetrics getClientMetrics() {
return clientMetrics;
}

public boolean getAtomicKeyCreation() {
return atomicKeyCreation;
}

public Builder setExecutorServiceSupplier(Supplier<ExecutorService> executorServiceSupplier) {
this.executorServiceSupplier = executorServiceSupplier;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.ratis.util.function.CheckedRunnable;

/**
* OzoneOutputStream is used to write data into Ozone.
Expand Down Expand Up @@ -128,9 +130,9 @@ public void hsync() throws IOException {
}

public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
KeyOutputStream keyOutputStream = getKeyOutputStream();
if (keyOutputStream != null) {
return keyOutputStream.getCommitUploadPartInfo();
KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
if (keyCommitOutput != null) {
return keyCommitOutput.getCommitUploadPartInfo();
}
// Otherwise return null.
return null;
Expand All @@ -139,12 +141,23 @@ public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
public OutputStream getOutputStream() {
return outputStream;
}

public KeyOutputStream getKeyOutputStream() {
OutputStream base = unwrap(outputStream);
return base instanceof KeyOutputStream ? (KeyOutputStream) base : null;
}

public void setPreCommits(List<CheckedRunnable<IOException>> preCommits) {
KeyCommitOutput keyCommitOutput = getKeyCommitOutput();
if (keyCommitOutput != null) {
keyCommitOutput.setPreCommits(preCommits);
return;
}
throw new IllegalStateException(
"Output stream is not backed by KeyCommitOutput: " +
outputStream.getClass());
}

@Override
public Map<String, String> getMetadata() {
OutputStream base = unwrap(outputStream);
Expand All @@ -155,6 +168,11 @@ public Map<String, String> getMetadata() {
"OutputStream is not KeyMetadataAware: " + base.getClass());
}

private KeyCommitOutput getKeyCommitOutput() {
OutputStream base = unwrap(outputStream);
return base instanceof KeyCommitOutput ? (KeyCommitOutput) base : null;
}

private static OutputStream unwrap(OutputStream out) {
if (out instanceof CryptoOutputStream) {
return ((CryptoOutputStream) out).getWrappedStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,8 +1224,6 @@ OzoneKey headObject(String volumeName, String bucketName,
*/
void setThreadLocalS3Auth(S3Auth s3Auth);

void setIsS3Request(boolean isS3Request);

/**
* Gets the S3 Authentication information that is attached to the thread.
* @return S3 Authentication information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.crypto.Cipher;
Expand Down Expand Up @@ -223,7 +222,6 @@ public class RpcClient implements ClientProtocol {
private final MemoizedSupplier<ExecutorService> ecReconstructExecutor;
private final ContainerClientMetrics clientMetrics;
private final MemoizedSupplier<ExecutorService> writeExecutor;
private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
private volatile OzoneFsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate;
private final long serverDefaultsValidityPeriod;
Expand Down Expand Up @@ -1473,13 +1471,6 @@ private OmKeyArgs.Builder createWriteKeyArgsBuilder(String volumeName,
private OzoneOutputStream openOutputStream(OmKeyArgs keyArgs, long size)
throws IOException {
OpenKeySession openKey = ozoneManagerClient.openKey(keyArgs);
// For bucket with layout OBJECT_STORE, when create an empty file (size=0),
// OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
// which will cause S3G's atomic write length check to fail,
// so reset size to 0 here.
if (isS3GRequest.get() && size == 0) {
openKey.getKeyInfo().setDataSize(0);
}
Comment thread
ivandika3 marked this conversation as resolved.
return createOutputStream(openKey);
}

Expand Down Expand Up @@ -2588,15 +2579,12 @@ private OzoneDataStreamOutput createDataStreamOutput(OpenKeySession openKey)
}

private KeyDataStreamOutput.Builder newKeyOutputStreamBuilder() {
// Amazon S3 never adds partial objects, So for S3 requests we need to
// set atomicKeyCreation to true
// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
return new KeyDataStreamOutput.Builder()
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get());
.setConfig(clientConfig);
}

private OzoneOutputStream createOutputStream(OpenKeySession openKey)
Expand Down Expand Up @@ -2670,7 +2658,6 @@ private KeyOutputStream.Builder createKeyOutputStream(
.setOmClient(ozoneManagerClient)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
.setConfig(clientConfig)
.setAtomicKeyCreation(isS3GRequest.get())
.setClientMetrics(clientMetrics)
.setExecutorServiceSupplier(writeExecutor)
.setStreamBufferArgs(streamBufferArgs)
Expand Down Expand Up @@ -2774,11 +2761,6 @@ public void setThreadLocalS3Auth(
this.s3gUgi = UserGroupInformation.createRemoteUser(getThreadLocalS3Auth().getUserPrincipal());
}

@Override
public void setIsS3Request(boolean s3Request) {
this.isS3GRequest.set(s3Request);
}

@Override
public S3Auth getThreadLocalS3Auth() {
return ozoneManagerClient.getThreadLocalS3Auth();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,40 +220,6 @@ public void testPutKeyWithECReplicationConfig() throws IOException {
}
}

/**
* This test validates that for S3G,
* the key upload process needs to be atomic.
* It simulates two mismatch scenarios where the actual write data size does
* not match the expected size.
*/
@Test
public void testPutKeySizeMismatch() throws IOException {
String value = new String(new byte[1024], UTF_8);
OzoneBucket bucket = getOzoneBucket();
String keyName = UUID.randomUUID().toString();
try {
// Simulating first mismatch: Write less data than expected
client.getProxy().setIsS3Request(true);
OzoneOutputStream out1 = bucket.createKey(keyName,
value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
new HashMap<>());
out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8));
assertThrows(IllegalStateException.class, out1::close,
"Expected IllegalArgumentException due to size mismatch.");

// Simulating second mismatch: Write more data than expected
OzoneOutputStream out2 = bucket.createKey(keyName,
value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
new HashMap<>());
value += "1";
out2.write(value.getBytes(UTF_8));
assertThrows(IllegalStateException.class, out2::close,
"Expected IllegalArgumentException due to size mismatch.");
} finally {
client.getProxy().setIsS3Request(false);
}
}

private OzoneBucket getOzoneBucket() throws IOException {
String volumeName = UUID.randomUUID().toString();
String bucketName = UUID.randomUUID().toString();
Expand Down
Loading