Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.fs.s3native.writer.NativeS3ObjectOperations;
import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.FutureUtils;

Expand Down Expand Up @@ -127,7 +128,8 @@ public NativeS3FileSystem(
boolean useAsyncOperations,
int readBufferSize,
Duration fsCloseTimeout) {
this.clientProvider = clientProvider;
this.clientProvider =
Preconditions.checkNotNull(clientProvider, "clientProvider must not be null");
this.uri = uri;
this.bucketName = uri.getHost();
this.entropyInjectionKey = entropyInjectionKey;
Expand Down Expand Up @@ -550,9 +552,8 @@ public CompletableFuture<Void> closeAsync() {
"Native S3 FileSystem closed for bucket: {}",
bucketName))
.thenCompose(
ignored -> {
if (clientProvider != null) {
return clientProvider
ignored ->
clientProvider
.closeAsync()
.whenComplete(
(result, error) -> {
Expand All @@ -564,10 +565,7 @@ public CompletableFuture<Void> closeAsync() {
LOG.debug(
"S3 client provider closed");
}
});
}
return CompletableFuture.completedFuture(null);
})
}))
.orTimeout(fsCloseTimeout.toSeconds(), TimeUnit.SECONDS)
.whenComplete(
(result, error) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
Expand Down Expand Up @@ -292,7 +294,7 @@ public class NativeS3FileSystemFactory implements FileSystemFactory {
+ "When not set, the default chain is used: delegation tokens -> "
+ "static credentials (if configured) -> DefaultCredentialsProvider.");

private Configuration flinkConfig;
@Nullable private Configuration flinkConfig;

@Override
public String getScheme() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
package org.apache.flink.fs.s3native;

import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.util.Preconditions;

import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import javax.annotation.Nullable;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -63,21 +62,21 @@ class NativeS3OutputStream extends FSDataOutputStream {
public NativeS3OutputStream(
S3Client s3Client, String bucketName, String key, String localTmpDir)
throws IOException {
this(s3Client, bucketName, key, localTmpDir, null);
this(s3Client, bucketName, key, localTmpDir, S3EncryptionConfig.none());
}

public NativeS3OutputStream(
S3Client s3Client,
String bucketName,
String key,
String localTmpDir,
@Nullable S3EncryptionConfig encryptionConfig)
S3EncryptionConfig encryptionConfig)
throws IOException {
this.s3Client = s3Client;
this.bucketName = bucketName;
this.key = key;
this.encryptionConfig =
encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none();
Preconditions.checkNotNull(encryptionConfig, "encryptionConfig must not be null");

File tmpDir = new File(localTmpDir);
if (!tmpDir.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3native.token.DynamicTemporaryAWSCredentialsProvider;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,6 +39,7 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
Expand All @@ -54,7 +57,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -91,7 +93,7 @@ class S3ClientProvider implements AutoCloseableAsync {
private S3ClientProvider(
S3Client s3Client,
S3TransferManager transferManager,
@Nullable S3EncryptionConfig encryptionConfig,
S3EncryptionConfig encryptionConfig,
AwsCredentialsProvider credentialsProvider,
@Nullable StsClient stsClient,
Duration clientCloseTimeout,
Expand All @@ -103,22 +105,24 @@ private S3ClientProvider(
boolean checksumValidation,
int maxConnections,
int maxRetries) {
this.s3Client = Objects.requireNonNull(s3Client, "s3Client must not be null");
this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client must not be null");
this.transferManager =
Objects.requireNonNull(transferManager, "transferManager must not be null");
Preconditions.checkNotNull(transferManager, "transferManager must not be null");
this.encryptionConfig =
encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none();
Preconditions.checkNotNull(encryptionConfig, "encryptionConfig must not be null");
this.credentialsProvider =
Objects.requireNonNull(credentialsProvider, "credentialsProvider must not be null");
Preconditions.checkNotNull(
credentialsProvider, "credentialsProvider must not be null");
this.stsClient = stsClient;
this.clientCloseTimeout =
Objects.requireNonNull(clientCloseTimeout, "clientCloseTimeout must not be null");
Preconditions.checkNotNull(
clientCloseTimeout, "clientCloseTimeout must not be null");
this.connectionTimeout =
Objects.requireNonNull(connectionTimeout, "connectionTimeout must not be null");
Preconditions.checkNotNull(connectionTimeout, "connectionTimeout must not be null");
this.socketTimeout =
Objects.requireNonNull(socketTimeout, "socketTimeout must not be null");
Preconditions.checkNotNull(socketTimeout, "socketTimeout must not be null");
this.connectionMaxIdleTime =
Objects.requireNonNull(
Preconditions.checkNotNull(
connectionMaxIdleTime, "connectionMaxIdleTime must not be null");
this.pathStyleAccess = pathStyleAccess;
this.chunkedEncoding = chunkedEncoding;
Expand Down Expand Up @@ -199,19 +203,15 @@ public CompletableFuture<Void> closeAsync() {
}
return CompletableFuture.runAsync(
() -> {
if (transferManager != null) {
try {
transferManager.close();
} catch (Exception e) {
LOG.warn("Error closing S3 TransferManager", e);
}
try {
transferManager.close();
} catch (Exception e) {
LOG.warn("Error closing S3 TransferManager", e);
}
if (s3Client != null) {
try {
s3Client.close();
} catch (Exception e) {
LOG.warn("Error closing S3 sync client", e);
}
try {
s3Client.close();
} catch (Exception e) {
LOG.warn("Error closing S3 sync client", e);
}
if (getCredentialsProvider() instanceof SdkAutoCloseable) {
try {
Expand Down Expand Up @@ -268,7 +268,7 @@ public static class Builder {
private int assumeRoleSessionDurationSeconds = 3600;

// Encryption configuration
private S3EncryptionConfig encryptionConfig;
private S3EncryptionConfig encryptionConfig = S3EncryptionConfig.none();

// Custom credentials provider class names (comma-separated)
@Nullable private String credentialsProviderClasses;
Expand Down Expand Up @@ -348,10 +348,8 @@ public Builder assumeRoleExternalId(@Nullable String assumeRoleExternalId) {
return this;
}

public Builder assumeRoleSessionName(@Nullable String assumeRoleSessionName) {
if (assumeRoleSessionName != null) {
this.assumeRoleSessionName = assumeRoleSessionName;
}
public Builder assumeRoleSessionName(String assumeRoleSessionName) {
this.assumeRoleSessionName = assumeRoleSessionName;
return this;
}

Expand All @@ -360,8 +358,10 @@ public Builder assumeRoleSessionDurationSeconds(int assumeRoleSessionDurationSec
return this;
}

public Builder encryptionConfig(@Nullable S3EncryptionConfig encryptionConfig) {
this.encryptionConfig = encryptionConfig;
public Builder encryptionConfig(S3EncryptionConfig encryptionConfig) {
this.encryptionConfig =
Preconditions.checkNotNull(
encryptionConfig, "encryptionConfig must not be null");
return this;
}

Expand All @@ -386,7 +386,7 @@ public S3ClientProvider build() {
AwsCredentialsProvider credentialsProvider;

AwsCredentialsProvider baseProvider = buildBaseCredentialsProvider();
if (assumeRoleArn != null && !assumeRoleArn.isEmpty()) {
if (!StringUtils.isNullOrWhitespaceOnly(assumeRoleArn)) {
stsClient = buildStsClient(baseProvider, awsRegion);
credentialsProvider = buildAssumeRoleProvider(stsClient);
} else {
Expand Down Expand Up @@ -424,24 +424,24 @@ public S3ClientProvider build() {
clientBuilder.endpointOverride(endpointUri);
}
S3Client s3Client = clientBuilder.build();

S3AsyncClientBuilder asyncClientBuilder =
S3AsyncClient.builder()
.credentialsProvider(credentialsProvider)
.region(awsRegion)
.serviceConfiguration(s3Config)
.httpClientBuilder(
NettyNioAsyncHttpClient.builder()
.maxConcurrency(maxConnections)
.connectionTimeout(connectionTimeout)
.readTimeout(socketTimeout)
.connectionAcquisitionTimeout(connectionTimeout))
.overrideConfiguration(overrideConfig);
if (endpointUri != null) {
asyncClientBuilder.endpointOverride(endpointUri);
}
S3TransferManager transferManager =
S3TransferManager.builder()
.s3Client(
S3AsyncClient.builder()
.credentialsProvider(credentialsProvider)
.region(awsRegion)
.serviceConfiguration(s3Config)
.httpClientBuilder(
NettyNioAsyncHttpClient.builder()
.maxConcurrency(maxConnections)
.connectionTimeout(connectionTimeout)
.readTimeout(socketTimeout)
.connectionAcquisitionTimeout(
connectionTimeout))
.overrideConfiguration(overrideConfig)
.endpointOverride(endpointUri)
.build())
.build();
S3TransferManager.builder().s3Client(asyncClientBuilder.build()).build();

return new S3ClientProvider(
s3Client,
Expand All @@ -463,8 +463,7 @@ public S3ClientProvider build() {
private AwsCredentialsProvider buildBaseCredentialsProvider() {
List<AwsCredentialsProvider> chain = new ArrayList<>();

if (credentialsProviderClasses != null
&& !credentialsProviderClasses.trim().isEmpty()) {
if (!StringUtils.isNullOrWhitespaceOnly(credentialsProviderClasses)) {
for (String name : credentialsProviderClasses.split(",")) {
String trimmed = name.trim();
if (!trimmed.isEmpty()) {
Expand Down Expand Up @@ -547,7 +546,7 @@ private AwsCredentialsProvider buildAssumeRoleProvider(StsClient stsClient) {
.roleSessionName(assumeRoleSessionName)
.durationSeconds(assumeRoleSessionDurationSeconds);

if (assumeRoleExternalId != null && !assumeRoleExternalId.isEmpty()) {
if (!StringUtils.isNullOrWhitespaceOnly(assumeRoleExternalId)) {
requestBuilder.externalId(assumeRoleExternalId);
}

Expand All @@ -558,7 +557,7 @@ private AwsCredentialsProvider buildAssumeRoleProvider(StsClient stsClient) {
}

private Region resolveRegion(@Nullable String explicitRegion) {
if (explicitRegion != null && !explicitRegion.isEmpty()) {
if (!StringUtils.isNullOrWhitespaceOnly(explicitRegion)) {
LOG.info("Using configured AWS region: {}", explicitRegion);
return Region.of(explicitRegion);
}
Expand Down