Skip to content
This repository was archived by the owner on Mar 24, 2026. It is now read-only.

Commit 2ca722d

Browse files
Privacy Sandbox Teamcopybara-github
authored andcommitted
feat: Copy multi_curl_http_fetcher_async and file_util from B&A
This is part of a series of CLs to copy the KV BYOS clients from B&A to common repo. Bug: 411430242 Change-Id: Icf105e520ce1a068548e94db092a3464e2448853 GitOrigin-RevId: d587666a2fb3f2768089f93515530dc45c72c6b2
1 parent 0cbc1b0 commit 2ca722d

12 files changed

Lines changed: 985 additions & 3 deletions

.bazelrc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ build --config=otel
1414
build --@com_google_googleurl//build_config:system_icu=0
1515

1616
test --test_output=errors
17+
test --test_tag_filters=-flaky
1718

1819
build:run_all_tests --cache_test_results=no
1920
test:run_all_tests --test_verbose_timeout_warnings

src/clients/http_fetcher_async/BUILD.bazel

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ cc_library(
3434
hdrs = [
3535
"curl_request_data.h",
3636
],
37+
visibility = ["//visibility:public"],
3738
deps = [
3839
"@com_google_absl//absl/container:flat_hash_map",
3940
"@com_google_absl//absl/functional:any_invocable",
@@ -115,6 +116,58 @@ cc_library(
115116
],
116117
)
117118

119+
cc_library(
120+
name = "multi_curl_http_fetcher_async",
121+
srcs = [
122+
"multi_curl_http_fetcher_async.cc",
123+
"multi_curl_request_manager.cc",
124+
"multi_curl_request_manager.h",
125+
],
126+
hdrs = [
127+
"multi_curl_http_fetcher_async.h",
128+
],
129+
deps = [
130+
":curl_request_data",
131+
":curl_request_queue",
132+
":curl_request_worker",
133+
":http_fetcher_async",
134+
"//src/concurrent:executor",
135+
"//src/util/file:file_util",
136+
"@com_google_absl//absl/container:flat_hash_map",
137+
"@com_google_absl//absl/log:check",
138+
"@com_google_absl//absl/status",
139+
"@com_google_absl//absl/strings",
140+
"@com_google_absl//absl/time",
141+
"@curl",
142+
"@libevent//:event",
143+
],
144+
)
145+
146+
cc_test(
147+
name = "multi_curl_http_fetcher_async_test",
148+
size = "medium",
149+
srcs = ["multi_curl_http_fetcher_async_test.cc"],
150+
flaky = 1,
151+
tags = [
152+
"noasan", # TODO(b/411430242) - Fix ASAN errors in this test.
153+
"notsan", # TSAN cannot detect event library locking mechanism
154+
"requires-network",
155+
],
156+
deps = [
157+
":multi_curl_http_fetcher_async",
158+
"//src/concurrent:executor",
159+
"//src/logger:request_context_logger",
160+
"@com_github_grpc_grpc//:grpc++",
161+
"@com_google_absl//absl/status",
162+
"@com_google_absl//absl/strings",
163+
"@com_google_absl//absl/synchronization",
164+
"@com_google_absl//absl/time",
165+
"@com_google_googletest//:gtest",
166+
"@com_google_googletest//:gtest_main",
167+
"@rapidjson",
168+
],
169+
)
170+
118171
cc_test(
119172
name = "curl_request_queue_test",
120173
size = "small",

src/clients/http_fetcher_async/curl_request_data.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ struct MultiCurlHttpFetcherAsyncOptions {
112112
// TODO(b/412330778): Rename to `curl_max_wait_time_ms`
113113
absl::Duration curl_max_wait_time = kDefaultMaxRequestWaitTime;
114114
int curl_queue_length = kDefaultMaxCurlPendingRequests;
115+
bool skip_tls_verification = false;
115116
};
116117

117118
// This struct maintains the data related to a Curl request, some of which

src/clients/http_fetcher_async/http_fetcher_async.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 Google LLC
1+
// Copyright 2025 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "src/clients/http_fetcher_async/multi_curl_http_fetcher_async.h"
16+
17+
#include <stdlib.h>
18+
19+
#include <algorithm>
20+
#include <random>
21+
#include <sstream>
22+
#include <string>
23+
#include <utility>
24+
25+
#include <curl/curl.h>
26+
27+
#include "absl/log/check.h"
28+
#include "absl/strings/str_cat.h"
29+
#include "absl/time/time.h"
30+
#include "event2/thread.h"
31+
#include "src/util/file/file_util.h"
32+
33+
namespace privacy_sandbox::server_common::clients {
34+
using ::grpc_event_engine::experimental::EventEngine;
35+
36+
MultiCurlHttpFetcherAsync::MultiCurlHttpFetcherAsync(
37+
server_common::Executor* executor,
38+
const MultiCurlHttpFetcherAsyncOptions& options)
39+
: executor_(executor),
40+
keepalive_idle_sec_(options.keepalive_idle_sec),
41+
keepalive_interval_sec_(options.keepalive_interval_sec),
42+
skip_tls_verification_(options.skip_tls_verification),
43+
ca_cert_(options.ca_cert),
44+
num_curl_workers_(options.num_curl_workers) {
45+
DCHECK_GT(num_curl_workers_, 0);
46+
auto ca_cert_blob = GetFileContent(ca_cert_, /*log_on_error=*/true);
47+
CHECK_OK(ca_cert_blob);
48+
ca_cert_blob_ = *std::move(ca_cert_blob);
49+
50+
// Setup curl workers and their work queues.
51+
curl_request_workers_.reserve(num_curl_workers_);
52+
for (int i = 0; i < options.num_curl_workers; ++i) {
53+
auto request_queue = std::make_unique<CurlRequestQueue>(
54+
executor_, options.curl_queue_length, options.curl_max_wait_time);
55+
curl_request_workers_.push_back(std::make_unique<CurlRequestWorker>(
56+
executor_, *request_queue, options.curlmopt_maxconnects,
57+
options.curlmopt_max_total_connections,
58+
options.curlmopt_max_host_connections));
59+
request_queues_.emplace_back(std::move(request_queue));
60+
}
61+
}
62+
63+
// The function declaration of WriteCallback is specified by libcurl.
64+
// Please do not modify the parameter types or ordering, although you
65+
// may modify the function name and body.
66+
// libcurl documentation: https://curl.se/libcurl/c/CURLOPT_WRITEFUNCTION.html
67+
//
68+
// data: A pointer to the data that was delivered over the wire.
69+
// size: (legacy) size is always 1. Represents 1 byte.
70+
// number_elements: the number of elements (each of size 1 byte) to write
71+
// output: a libcurl-client-provided pointer of where to save the data
72+
// return: number of bytes actually written to output
73+
static size_t WriteCallback(char* data, size_t size, size_t number_elements,
74+
std::string* output) {
75+
output->append(reinterpret_cast<char*>(data), size * number_elements);
76+
return size * number_elements;
77+
}
78+
79+
// The function declaration of ReadCallback is specified by libcurl.
80+
// Please do not modify the parameter types or ordering, although you
81+
// may modify the function name and body.
82+
// libcurl documentation: https://curl.se/libcurl/c/CURLOPT_READFUNCTION.html
83+
static size_t ReadCallback(char* data, size_t size, size_t num_items,
84+
void* userdata) {
85+
auto* to_upload = static_cast<DataToUpload*>(userdata);
86+
if (to_upload->offset >= to_upload->data.size()) {
87+
// No more data to upload.
88+
return 0;
89+
}
90+
size_t num_bytes_to_upload =
91+
std::min(to_upload->data.size() - to_upload->offset, num_items * size);
92+
memcpy(data, to_upload->data.c_str() + to_upload->offset,
93+
num_bytes_to_upload);
94+
PS_VLOG(8) << "PUT-ing data (offset: " << to_upload->offset
95+
<< ", chunk size: " << num_bytes_to_upload
96+
<< "): " << to_upload->data;
97+
to_upload->offset += num_bytes_to_upload;
98+
return num_bytes_to_upload;
99+
}
100+
101+
void MultiCurlHttpFetcherAsync::FetchUrls(
102+
const std::vector<HTTPRequest>& requests, absl::Duration timeout,
103+
OnDoneFetchUrls done_callback) {
104+
FetchUrlsWithMetadata(
105+
requests, timeout,
106+
[done_callback = std::move(done_callback)](
107+
std::vector<absl::StatusOr<HTTPResponse>> response_vector) mutable {
108+
std::vector<absl::StatusOr<std::string>> results;
109+
results.reserve(response_vector.size());
110+
for (auto& response : response_vector) {
111+
if (response.ok()) {
112+
results.emplace_back(std::move(response)->body);
113+
} else {
114+
results.emplace_back(std::move(response).status());
115+
}
116+
}
117+
std::move(done_callback)(std::move(results));
118+
});
119+
}
120+
121+
void MultiCurlHttpFetcherAsync::FetchUrlsWithMetadata(
122+
const std::vector<HTTPRequest>& requests, absl::Duration timeout,
123+
OnDoneFetchUrlsWithMetadata done_callback) {
124+
// The FetchUrl lambdas are the owners of the underlying FetchUrlsLifetime and
125+
// this shared_ptr will be destructed once the last FetchUrl lambda finishes.
126+
// Using a shared_ptr here allows us to avoid making MultiCurlHttpFetcherAsync
127+
// the owner of the FetchUrlsLifetime, which would complicate cleanup on
128+
// MultiCurlHttpFetcherAsync destruction during a pending FetchUrls call.
129+
auto shared_lifetime = std::make_shared<
130+
MultiCurlHttpFetcherAsync::FetchUrlsWithMetadataLifetime>();
131+
shared_lifetime->pending_results = requests.size();
132+
shared_lifetime->all_done_callback = std::move(done_callback);
133+
shared_lifetime->results =
134+
std::vector<absl::StatusOr<HTTPResponse>>(requests.size());
135+
if (requests.empty()) {
136+
// Execute callback immediately if there are no requests.
137+
std::move(shared_lifetime->all_done_callback)(
138+
std::move(shared_lifetime->results));
139+
return;
140+
}
141+
for (int i = 0; i < requests.size(); i++) {
142+
FetchUrlWithMetadata(
143+
requests.at(i), absl::ToInt64Milliseconds(timeout),
144+
[i, shared_lifetime](absl::StatusOr<HTTPResponse> result) {
145+
absl::MutexLock lock_results(&shared_lifetime->results_mu);
146+
shared_lifetime->results[i] = std::move(result);
147+
if (--shared_lifetime->pending_results == 0) {
148+
std::move(shared_lifetime->all_done_callback)(
149+
std::move(shared_lifetime->results));
150+
}
151+
});
152+
}
153+
}
154+
155+
std::unique_ptr<CurlRequestData> MultiCurlHttpFetcherAsync::CreateCurlRequest(
156+
const HTTPRequest& request, int timeout_ms, int64_t keepalive_idle_sec,
157+
int64_t keepalive_interval_sec, OnDoneFetchUrlWithMetadata done_callback) {
158+
auto curl_request_data = std::make_unique<CurlRequestData>(
159+
request.headers, std::move(done_callback), request.include_headers,
160+
request.redirect_config.get_redirect_url);
161+
CURL* req_handle = curl_request_data->req_handle;
162+
curl_easy_setopt(req_handle, CURLOPT_WRITEFUNCTION, WriteCallback);
163+
curl_easy_setopt(req_handle, CURLOPT_URL, request.url.begin());
164+
curl_easy_setopt(req_handle, CURLOPT_WRITEDATA,
165+
&curl_request_data->response_with_metadata.body);
166+
curl_easy_setopt(req_handle, CURLOPT_FOLLOWLOCATION, 1);
167+
if (request.redirect_config.strict_http) {
168+
curl_easy_setopt(req_handle, CURLOPT_REDIR_PROTOCOLS_STR, "http,https");
169+
}
170+
171+
curl_easy_setopt(req_handle, CURLOPT_TIMEOUT_MS, timeout_ms);
172+
// Enable TCP keep-alive to keep connection warm.
173+
curl_easy_setopt(req_handle, CURLOPT_TCP_KEEPALIVE, 1L);
174+
curl_easy_setopt(req_handle, CURLOPT_TCP_KEEPIDLE,
175+
static_cast<int64_t>(keepalive_idle_sec));
176+
curl_easy_setopt(req_handle, CURLOPT_TCP_KEEPINTVL,
177+
static_cast<int64_t>(keepalive_interval_sec));
178+
// Allow upto 1200 seconds idle time.
179+
curl_easy_setopt(req_handle, CURLOPT_MAXAGE_CONN, 1200L);
180+
// Set CURLOPT_ACCEPT_ENCODING to an empty string to pass all supported
181+
// encodings. See https://curl.se/libcurl/c/CURLOPT_ACCEPT_ENCODING.html.
182+
curl_easy_setopt(req_handle, CURLOPT_ACCEPT_ENCODING, "");
183+
184+
if (skip_tls_verification_) {
185+
curl_easy_setopt(req_handle, CURLOPT_SSL_VERIFYPEER, 0L);
186+
curl_easy_setopt(req_handle, CURLOPT_SSL_VERIFYHOST, 0L);
187+
curl_easy_setopt(req_handle, CURLOPT_PROXY_SSL_VERIFYPEER, 0L);
188+
curl_easy_setopt(req_handle, CURLOPT_PROXY_SSL_VERIFYHOST, 0L);
189+
} else {
190+
curl_easy_setopt(req_handle, CURLOPT_CAINFO_BLOB, ca_cert_blob_.c_str());
191+
}
192+
// Set HTTP headers.
193+
if (!request.headers.empty()) {
194+
curl_easy_setopt(req_handle, CURLOPT_HTTPHEADER,
195+
curl_request_data->headers_list_ptr);
196+
}
197+
return curl_request_data;
198+
}
199+
200+
void MultiCurlHttpFetcherAsync::ScheduleAsyncCurlRequest(
201+
std::unique_ptr<CurlRequestData> request) {
202+
// Spreads the requests in a uniformly distributed (random) manner across
203+
// the available queues/workers.
204+
static std::random_device random_device;
205+
static std::mt19937 generator(random_device());
206+
std::uniform_int_distribution<int> distribute(0, num_curl_workers_ - 1);
207+
int num_worker = distribute(generator);
208+
209+
auto& request_queue = request_queues_[num_worker];
210+
absl::MutexLock lock(&request_queue->Mu());
211+
if (request_queue->Full()) {
212+
std::move(request->done_callback)(
213+
absl::ResourceExhaustedError("Request Queue Limit Exceeded."));
214+
return;
215+
}
216+
217+
request_queue->Enqueue(std::move(request));
218+
}
219+
220+
void MultiCurlHttpFetcherAsync::FetchUrl(const HTTPRequest& request,
221+
int timeout_ms,
222+
OnDoneFetchUrl done_callback) {
223+
FetchUrlWithMetadata(request, timeout_ms,
224+
[callback = std::move(done_callback)](
225+
absl::StatusOr<HTTPResponse> response) mutable {
226+
if (response.ok()) {
227+
absl::StatusOr<std::string> string_response =
228+
std::move(response->body);
229+
std::move(callback)(std::move(string_response));
230+
} else {
231+
std::move(callback)(response.status());
232+
}
233+
});
234+
}
235+
236+
void MultiCurlHttpFetcherAsync::FetchUrlWithMetadata(
237+
const HTTPRequest& request, int timeout_ms,
238+
OnDoneFetchUrlWithMetadata done_callback) {
239+
ScheduleAsyncCurlRequest(
240+
CreateCurlRequest(request, timeout_ms, keepalive_idle_sec_,
241+
keepalive_interval_sec_, std::move(done_callback)));
242+
}
243+
244+
void MultiCurlHttpFetcherAsync::PutUrl(const HTTPRequest& http_request,
245+
int timeout_ms,
246+
OnDoneFetchUrl done_callback) {
247+
absl::AnyInvocable<void(absl::StatusOr<HTTPResponse>)> on_done_with_metadata =
248+
[callback = std::move(done_callback)](
249+
absl::StatusOr<HTTPResponse> response) mutable {
250+
if (response.ok()) {
251+
absl::StatusOr<std::string> string_response =
252+
std::move(response->body);
253+
std::move(callback)(std::move(string_response));
254+
} else {
255+
std::move(callback)(response.status());
256+
}
257+
};
258+
auto request = CreateCurlRequest(http_request, timeout_ms,
259+
keepalive_idle_sec_, keepalive_interval_sec_,
260+
std::move(on_done_with_metadata));
261+
262+
request->body =
263+
std::make_unique<DataToUpload>(DataToUpload{http_request.body});
264+
curl_easy_setopt(request->req_handle, CURLOPT_UPLOAD, 1L);
265+
curl_easy_setopt(request->req_handle, CURLOPT_PUT, 1L);
266+
curl_easy_setopt(request->req_handle, CURLOPT_POSTFIELDSIZE_LARGE,
267+
static_cast<curl_off_t>(http_request.body.size()));
268+
curl_easy_setopt(request->req_handle, CURLOPT_READDATA, request->body.get());
269+
curl_easy_setopt(request->req_handle, CURLOPT_READFUNCTION, ReadCallback);
270+
271+
ScheduleAsyncCurlRequest(std::move(request));
272+
}
273+
274+
} // namespace privacy_sandbox::server_common::clients

0 commit comments

Comments
 (0)