Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,15 +371,11 @@ public FileSystem create(URI fsUri) throws IOException {
maxConcurrentUploads);

boolean useAsyncOperations = config.get(USE_ASYNC_OPERATIONS);

// Validate and clamp read buffer size to sensible range [64KB, 4MB]
// We clip rather than throw to provide flexibility while preventing extreme values
int configuredReadBufferSize = config.get(READ_BUFFER_SIZE);
int readBufferSize =
Math.max(64 * 1024, Math.min(configuredReadBufferSize, 4 * 1024 * 1024));
int readBufferSize = Math.max(256 * 1024, configuredReadBufferSize);
if (readBufferSize != configuredReadBufferSize) {
LOG.warn(
"{} value {} was outside valid range [64KB, 4MB]. Using {} instead.",
"{} value {} was below 64KB. Using {} instead.",
READ_BUFFER_SIZE.key(),
configuredReadBufferSize,
readBufferSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,20 @@
import java.util.concurrent.locks.ReentrantLock;

/**
* S3 input stream with configurable read-ahead buffer, range-based requests for seek operations,
* automatic stream reopening on errors, and lazy initialization to minimize memory footprint.
* S3 input stream with configurable read-ahead buffer, lazy seek, and automatic stream reopening.
*
* <p><b>Thread Safety:</b> Internal state is guarded by a lock to ensure safe concurrent access and
* resource cleanup.
* <p>{@link #seek(long)} only records the desired position without performing any I/O. All HTTP
* work is deferred to the next {@link #read()} call via {@link #lazySeek()}, so multiple seeks
* between reads coalesce. When the seek is forward and within {@code readBufferSize}, bytes are
* skipped in-buffer instead of reopening the HTTP connection.
*/
class NativeS3InputStream extends FSDataInputStream {

private static final Logger LOG = LoggerFactory.getLogger(NativeS3InputStream.class);

/** Default read-ahead buffer size: 256KB. */
/** Default read-ahead buffer size. */
private static final int DEFAULT_READ_BUFFER_SIZE = 256 * 1024;

/** Maximum buffer size for very large sequential reads. */
private static final int MAX_READ_BUFFER_SIZE = 4 * 1024 * 1024; // 4MB

private final ReentrantLock lock = new ReentrantLock();

private final S3Client s3Client;
Expand All @@ -65,8 +63,19 @@ class NativeS3InputStream extends FSDataInputStream {
@GuardedBy("lock")
private BufferedInputStream bufferedStream;

/**
* The position the caller expects to read from next. Updated by {@link #seek(long)}, {@link
* #skip(long)}, and after every successful {@link #read()}.
*/
@GuardedBy("lock")
private long nextReadPos;

/**
* The actual byte offset of the underlying stream cursor, reconciled lazily via {@link
* #lazySeek()}.
*/
@GuardedBy("lock")
private long position;
private long streamPos;

@GuardedBy("lock")
private volatile boolean closed;
Expand All @@ -86,8 +95,9 @@ public NativeS3InputStream(
this.bucketName = bucketName;
this.key = key;
this.contentLength = contentLength;
this.readBufferSize = Math.min(readBufferSize, MAX_READ_BUFFER_SIZE);
this.position = 0;
this.readBufferSize = readBufferSize;
Comment thread
Samrat002 marked this conversation as resolved.
this.nextReadPos = 0;
this.streamPos = 0;
this.closed = false;

LOG.debug(
Expand All @@ -98,39 +108,63 @@ public NativeS3InputStream(
this.readBufferSize / 1024);
}

/** Reconciles {@link #nextReadPos} and {@link #streamPos} before reading bytes. */
@GuardedBy("lock")
private void lazyInitialize() throws IOException {
assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock to be held";
if (currentStream == null && !closed) {
openStreamAtCurrentPosition();
}
}
private void lazySeek() throws IOException {
assert lock.isHeldByCurrentThread() : "lazySeek() requires lock to be held";
long targetPos = nextReadPos;

/** At EOF, release instead of reopening: {@code bytes=contentLength-} returns S3 416. */
@GuardedBy("lock")
private void repositionOpenStream() throws IOException {
assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires lock to be held";
if (currentStream == null) {
streamPos = targetPos;
return;
}
if (position >= contentLength) {

if (targetPos == streamPos) {
return;
}

long diff = targetPos - streamPos;
streamPos = targetPos;

if (targetPos >= contentLength) {
releaseStreams();
} else {
return;
}

// BufferedInputStream does not expose how many bytes are in its local array, so
// readBufferSize is used as the skip threshold: at most readBufferSize bytes may be
// consumed from the live HTTP connection before a range request is preferred instead.
if (diff > 0 && diff <= (long) readBufferSize) {
Comment thread
Samrat002 marked this conversation as resolved.
skipBytesInBuffer(diff);
return;
}

openStreamAtCurrentPosition();
}

@GuardedBy("lock")
private void ensureStreamOpen() throws IOException {
assert lock.isHeldByCurrentThread() : "ensureStreamOpen() requires lock to be held";
if (currentStream == null && !closed) {
openStreamAtCurrentPosition();
}
}

/**
* Opens (or reopens) the S3 stream at the current position.
*
* <p>This method:
*
* <ul>
* <li>Closes any existing stream
* <li>Opens a new stream starting at {@link #position}
* <li>Uses HTTP range requests for non-zero positions
* </ul>
*/
@GuardedBy("lock")
private void skipBytesInBuffer(long n) throws IOException {
assert lock.isHeldByCurrentThread() : "skipBytesInBuffer() requires lock to be held";
long remaining = n;
while (remaining > 0) {
long skipped = bufferedStream.skip(remaining);
if (skipped <= 0) {
openStreamAtCurrentPosition();
Comment thread
Samrat002 marked this conversation as resolved.
return;
}
remaining -= skipped;
}
}

/** Opens (or reopens) the S3 stream at {@link #streamPos}. */
private void openStreamAtCurrentPosition() throws IOException {
lock.lock();
try {
Expand All @@ -140,11 +174,11 @@ private void openStreamAtCurrentPosition() throws IOException {
GetObjectRequest.Builder requestBuilder =
GetObjectRequest.builder().bucket(bucketName).key(key);

if (position > 0) {
requestBuilder.range(String.format("bytes=%d-", position));
if (streamPos > 0) {
requestBuilder.range(String.format("bytes=%d-", streamPos));
LOG.debug(
"Opening S3 stream with range: bytes={}-{}",
position,
streamPos,
contentLength - 1);
} else {
LOG.debug("Opening S3 stream for full object: {} bytes", contentLength);
Expand All @@ -160,12 +194,7 @@ private void openStreamAtCurrentPosition() throws IOException {
}
}

/**
* Aborts the in-flight HTTP connection so that subsequent {@code close()} calls on the stream
* do not drain remaining bytes over the network.
*
* @see ResponseInputStream#abort()
*/
/** Aborts the in-flight HTTP connection to avoid draining remaining bytes on close. */
@GuardedBy("lock")
private void abortCurrentStream() {
assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires lock to be held";
Expand All @@ -179,11 +208,10 @@ private void abortCurrentStream() {
}

/**
* Aborts and closes both streams, nulling the references. The abort is called first to prevent
* {@link ResponseInputStream#close()} from draining remaining bytes over the network.
* Aborts and closes both streams, nulling the references.
*
* @return the first {@link IOException} encountered (with subsequent ones added as suppressed),
* or {@code null} if cleanup succeeded without errors
* or {@code null} if cleanup succeeded
*/
@GuardedBy("lock")
private IOException releaseStreams() {
Expand Down Expand Up @@ -236,10 +264,7 @@ public void seek(long desired) throws IOException {
+ contentLength);
}

if (desired != position) {
position = desired;
repositionOpenStream();
}
nextReadPos = desired;
} finally {
lock.unlock();
}
Expand All @@ -249,7 +274,7 @@ public void seek(long desired) throws IOException {
public long getPos() throws IOException {
lock();
try {
return position;
return nextReadPos;
} finally {
lock.unlock();
}
Expand All @@ -262,13 +287,15 @@ public int read() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
if (position >= contentLength) {
if (nextReadPos >= contentLength) {
return -1;
}
lazyInitialize();
lazySeek();
ensureStreamOpen();
int data = bufferedStream.read();
if (data != -1) {
position++;
nextReadPos++;
streamPos++;
}
return data;
} finally {
Expand All @@ -295,15 +322,17 @@ public int read(byte[] b, int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
if (position >= contentLength) {
if (nextReadPos >= contentLength) {
return -1;
}
lazyInitialize();
long remaining = contentLength - position;
lazySeek();
ensureStreamOpen();
long remaining = contentLength - nextReadPos;
int toRead = (int) Math.min(len, remaining);
int bytesRead = bufferedStream.read(b, off, toRead);
if (bytesRead > 0) {
position += bytesRead;
nextReadPos += bytesRead;
streamPos += bytesRead;
}
return bytesRead;
} finally {
Expand Down Expand Up @@ -336,7 +365,7 @@ public void close() throws IOException {
"Closed S3 input stream - bucket: {}, key: {}, final position: {}/{}",
bucketName,
key,
position,
nextReadPos,
contentLength);
if (exception != null) {
throw exception;
Expand All @@ -346,24 +375,14 @@ public void close() throws IOException {
}
}

/**
* Returns an estimate of the number of bytes that can be read without blocking.
*
* <p>This implementation returns the remaining bytes in the object based on content length and
* current position. Note that actual reads may still block due to network I/O, but this
* indicates how much data is logically available.
*
* @return the number of remaining bytes (capped at Integer.MAX_VALUE)
* @throws IOException if the stream has been closed
*/
@Override
public int available() throws IOException {
lock();
try {
if (closed) {
throw new IOException("Stream is closed");
}
long remaining = contentLength - position;
long remaining = contentLength - nextReadPos;
return (int) Math.min(remaining, Integer.MAX_VALUE);
} finally {
lock.unlock();
Expand All @@ -380,12 +399,9 @@ public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
long newPos = Math.min(position + n, contentLength);
long skipped = newPos - position;
if (newPos != position) {
position = newPos;
repositionOpenStream();
}
long newPos = Math.min(nextReadPos + n, contentLength);
long skipped = newPos - nextReadPos;
nextReadPos = newPos;
return skipped;
} finally {
lock.unlock();
Expand Down
Loading