Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ Cloud storage URLs in virtual chunk references are automatically translated:
- `abfs://container@account.dfs.core.windows.net/path` →
`https://account.blob.core.windows.net/container/path`

S3 buckets whose name contains a dot use path-style addressing on the global
endpoint, which 301-redirects buckets outside `us-east-1` to their regional
endpoint. In Node and other non-browser runtimes this redirect is resolved
automatically. Browsers can't read the cross-origin redirect — S3 returns no
CORS headers on it — so for such buckets supply a [`fetchClient`](#virtual-chunk-authentication)
that routes to the regional endpoint (`https://s3.<region>.amazonaws.com/<bucket>/<key>`)
or a CORS-enabled proxy.

### Repository

For direct access to branches, tags, and checkouts.
Expand Down
60 changes: 49 additions & 11 deletions src/reader/range-coalescer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,45 @@ function expectedRangeLength(range: RangeQuery): number {
return "suffixLength" in range ? range.suffixLength : range.length;
}

/**
* Map an S3 path-style global-endpoint region redirect to the regional URL to
* retry. For buckets outside `us-east-1`, `s3.amazonaws.com` answers 301 with
* no `Location` header (so `fetch` can't follow it) and reports the region in
* `x-amz-bucket-region`; this rebuilds the URL against `s3.<region>.amazonaws.com`.
* Returns null when the response isn't that redirect.
*/
function regionalS3RedirectUrl(url: string, response: Response): string | null {
if (response.status !== 301 && response.status !== 307) return null;
const region = response.headers.get("x-amz-bucket-region");
if (!region) return null;
Comment on lines +66 to +69

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make the helper more robust against custom or poorly-mocked fetchClient implementations, it is safer to use optional chaining when accessing response.headers.get.

Suggested change
function regionalS3RedirectUrl(url: string, response: Response): string | null {
if (response.status !== 301 && response.status !== 307) return null;
const region = response.headers.get("x-amz-bucket-region");
if (!region) return null;
function regionalS3RedirectUrl(url: string, response: Response): string | null {
if (response.status !== 301 && response.status !== 307) return null;
const region = response.headers?.get?.("x-amz-bucket-region");
if (!region) return null;

const globalPrefix = "https://s3.amazonaws.com/";
if (!url.startsWith(globalPrefix)) return null;
return `https://s3.${region}.amazonaws.com/${url.slice(globalPrefix.length)}`;
}

/**
* Build a minimal `AsyncReadable` that services every `getRange` by
* fetching the configured URL with the requested byte range. The zarr
* key is ignored — when wrapped by `withRangeCoalescing`, all requests
* converge on the same path and become eligible for range-merging.
*/
export function makeUrlStore(opts: MakeUrlStoreOptions): AsyncReadable {
const { url, fetchClient, conditionalHeaders } = opts;
const { fetchClient, conditionalHeaders } = opts;
// Hint shared across reads: once one read resolves an S3 regional redirect,
// later reads start from the regional host. Each read works off its own
// `requestUrl` copy, so this value is an optimization, not a source of truth.
let pinnedUrl = opts.url;

async function doFetch(init: RequestInit): Promise<Response> {
async function doFetch(target: string, init: RequestInit): Promise<Response> {
return fetchClient
? await fetchClient.fetch(url, init)
: await fetch(url, init);
? await fetchClient.fetch(target, init)
: await fetch(target, init);
}

return {
async get() {
throw new Error(
`Virtual chunk URL store for ${url} only supports ranged reads`,
`Virtual chunk URL store for ${pinnedUrl} only supports ranged reads`,
);
},
async getRange(_key, range, options) {
Expand All @@ -86,16 +106,34 @@ export function makeUrlStore(opts: MakeUrlStoreOptions): AsyncReadable {
? `bytes=-${range.suffixLength}`
: `bytes=${range.offset}-${range.offset + range.length - 1}`;

const response = await doFetch({ headers, signal: options?.signal });
// Copy the endpoint locally so a concurrent read updating the shared pin
// can't change which URL this request detects and retries against.
let requestUrl = pinnedUrl;
let response = await doFetch(requestUrl, {
headers,
signal: options?.signal,
});

// Resolve a global-endpoint region redirect and retry once on the
// regional host, re-pinning it for later reads.
const regionalUrl = regionalS3RedirectUrl(requestUrl, response);
if (regionalUrl) {
requestUrl = regionalUrl;
pinnedUrl = regionalUrl;
response = await doFetch(requestUrl, {
headers,
signal: options?.signal,
});
}
Comment on lines +119 to +129

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When a 301/307 redirect is received, the original response is discarded and overwritten with the regional fetch response. In some environments (like Node.js), discarding a response without consuming or canceling its body can lead to socket/resource leaks and prevent connection reuse (keep-alive).

To prevent this, we should cancel the response body of the redirect before performing the retry.

      const regionalUrl = regionalS3RedirectUrl(requestUrl, response);
      if (regionalUrl) {
        if (response.body && typeof response.body.cancel === "function") {
          response.body.cancel().catch(() => {});
        }
        requestUrl = regionalUrl;
        pinnedUrl = regionalUrl;
        response = await doFetch(requestUrl, {
          headers,
          signal: options?.signal,
        });
      }


if (response.status === 412) {
throw new Error(
`Virtual chunk at ${url} failed integrity check — data has been modified since snapshot was created`,
`Virtual chunk at ${requestUrl} failed integrity check — data has been modified since snapshot was created`,
);
}
if (response.status !== 200 && response.status !== 206) {
throw new Error(
`Failed to fetch virtual chunk from ${url}: ${response.status} ${response.statusText}`,
`Failed to fetch virtual chunk from ${requestUrl}: ${response.status} ${response.statusText}`,
);
}

Expand All @@ -108,7 +146,7 @@ export function makeUrlStore(opts: MakeUrlStoreOptions): AsyncReadable {
const expected = expectedRangeLength(range);
if (data.length === expected) return data;
throw new Error(
`Virtual range response size mismatch for ${url}: expected ${expected} bytes, got ${data.length}`,
`Virtual range response size mismatch for ${requestUrl}: expected ${expected} bytes, got ${data.length}`,
);
}

Expand All @@ -119,15 +157,15 @@ export function makeUrlStore(opts: MakeUrlStoreOptions): AsyncReadable {
const end = range.offset + range.length;
if (data.length >= end) return data.slice(range.offset, end);
throw new Error(
`Virtual range request not honored for ${url}: need at least ${end} bytes for fallback slicing, got ${data.length}`,
`Virtual range request not honored for ${requestUrl}: need at least ${end} bytes for fallback slicing, got ${data.length}`,
);
}
// Suffix-length on a 200 fallback: take the trailing suffixLength bytes.
if (data.length >= range.suffixLength) {
return data.slice(data.length - range.suffixLength);
}
throw new Error(
`Virtual suffix range request not honored for ${url}: need at least ${range.suffixLength} bytes for fallback slicing, got ${data.length}`,
`Virtual suffix range request not honored for ${requestUrl}: need at least ${range.suffixLength} bytes for fallback slicing, got ${data.length}`,
);
},
};
Expand Down
5 changes: 3 additions & 2 deletions src/reader/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,8 +886,9 @@ export function expandVccUrl(
* via the azureAccount parameter.
*
* Note: S3 URLs use virtual-hosted style for simple bucket names, but fall back to
* path-style for buckets containing dots (which break SSL certificate validation).
* For buckets in specific regions, S3 will redirect to the correct endpoint.
* path-style on the global endpoint for buckets containing dots (which break SSL
* certificate validation). For such buckets outside us-east-1 the global endpoint
* redirects to a regional one, which `makeUrlStore` resolves at fetch time.
*/
function translateToHttpUrl(url: string, azureAccount?: string): string {
// S3: s3://bucket/key → https://bucket.s3.amazonaws.com/key
Expand Down
94 changes: 93 additions & 1 deletion tests/reader/range-coalescer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ function toArrayBuffer(data: Uint8Array): ArrayBuffer {
) as ArrayBuffer;
}

function mockFetchResponse(status: number, data: Uint8Array): Response {
function mockFetchResponse(
status: number,
data: Uint8Array,
headers: Record<string, string> = {},
): Response {
return {
status,
statusText: status === 206 ? "Partial Content" : "OK",
headers: { get: (name: string) => headers[name.toLowerCase()] ?? null },
arrayBuffer: vi.fn().mockResolvedValue(toArrayBuffer(data)),
} as unknown as Response;
}
Expand Down Expand Up @@ -65,6 +70,93 @@ describe("range coalescer adapters", () => {
fetchSpy.mockRestore();
});

it("retries the regional endpoint when the S3 global endpoint 301-redirects", async () => {
const globalUrl =
"https://s3.amazonaws.com/us-west-2.opendata.source.coop/data.bin";
const regionalUrl =
"https://s3.us-west-2.amazonaws.com/us-west-2.opendata.source.coop/data.bin";
const body = new Uint8Array([1, 2, 3]);
const fetchSpy = vi
.spyOn(globalThis, "fetch")
.mockResolvedValueOnce(
mockFetchResponse(301, new Uint8Array(), {
"x-amz-bucket-region": "us-west-2",
}),
)
.mockResolvedValueOnce(mockFetchResponse(206, body));
const store = makeUrlStore({ url: globalUrl });

const result = await store.getRange("/", { offset: 0, length: 3 });

expect(result).toEqual(body);
expect(fetchSpy).toHaveBeenNthCalledWith(1, globalUrl, expect.anything());
expect(fetchSpy).toHaveBeenNthCalledWith(2, regionalUrl, expect.anything());

fetchSpy.mockRestore();
});

it("pins the regional endpoint for later reads after one redirect", async () => {
const globalUrl =
"https://s3.amazonaws.com/eu-central-1.example.bucket/data.bin";
const regionalUrl =
"https://s3.eu-central-1.amazonaws.com/eu-central-1.example.bucket/data.bin";
const body = new Uint8Array([4, 5, 6]);
const fetchSpy = vi
.spyOn(globalThis, "fetch")
.mockResolvedValueOnce(
mockFetchResponse(301, new Uint8Array(), {
"x-amz-bucket-region": "eu-central-1",
}),
)
.mockResolvedValue(mockFetchResponse(206, body));
const store = makeUrlStore({ url: globalUrl });

await store.getRange("/", { offset: 0, length: 3 });
await store.getRange("/", { offset: 0, length: 3 });

// First read redirects (global → regional); second read goes straight to regional.
expect(fetchSpy).toHaveBeenCalledTimes(3);
expect(fetchSpy).toHaveBeenNthCalledWith(3, regionalUrl, expect.anything());

fetchSpy.mockRestore();
});

it("retries both reads when concurrent requests hit the global endpoint", async () => {
const globalUrl =
"https://s3.amazonaws.com/us-west-2.opendata.source.coop/data.bin";
const regionalUrl =
"https://s3.us-west-2.amazonaws.com/us-west-2.opendata.source.coop/data.bin";
const body = new Uint8Array([1, 2, 3]);
// Mock by target URL, not call order: the global endpoint always 301s,
// the regional endpoint always serves. This reproduces two in-flight
// reads both starting against the global host before either resolves.
const fetchSpy = vi
.spyOn(globalThis, "fetch")
.mockImplementation(async (input) => {
const target = String(input);
if (target === globalUrl) {
return mockFetchResponse(301, new Uint8Array(), {
"x-amz-bucket-region": "us-west-2",
});
}
if (target === regionalUrl) return mockFetchResponse(206, body);
throw new Error(`unexpected fetch URL: ${target}`);
});
const store = makeUrlStore({ url: globalUrl });

// Both reads snapshot the global URL before either resolves its redirect;
// each must detect and retry against the regional host independently.
const [a, b] = await Promise.all([
store.getRange("/", { offset: 0, length: 3 }),
store.getRange("/", { offset: 0, length: 3 }),
]);

expect(a).toEqual(body);
expect(b).toEqual(body);

fetchSpy.mockRestore();
});

it("rejects URL offset 206 responses with mismatched body length", async () => {
const url = "https://example.com/data.bin";
const fetchSpy = vi
Expand Down
Loading