Skip to content
This repository was archived by the owner on Mar 24, 2026. It is now read-only.

Commit 0cbc1b0

Browse files
Privacy Sandbox Teamcopybara-github
authored andcommitted
feat: Copy curl queue and worker from B&A to common repo
This is part of a series of CLs to copy the KV BYOS clients from B&A to common repo. Bug: 411430242 Change-Id: I55de66cb464d93b69726d9aa8b73f02b4c5b7b3c GitOrigin-RevId: b33aeef6c0e19112a9c55326db158fe2dfdc3292
1 parent e351dd5 commit 0cbc1b0

7 files changed

Lines changed: 785 additions & 1 deletion

File tree

src/clients/http_fetcher_async/BUILD.bazel

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
load("@rules_cc//cc:defs.bzl", "cc_library")
15+
load("@rules_cc//cc:defs.bzl", "cc_library", "cc_test")
1616

1717
cc_library(
1818
name = "http_fetcher_async",
@@ -47,6 +47,50 @@ cc_library(
4747
],
4848
)
4949

50+
cc_library(
51+
name = "curl_request_queue",
52+
srcs = [
53+
"curl_request_queue.cc",
54+
],
55+
hdrs = [
56+
"curl_request_queue.h",
57+
],
58+
deps = [
59+
":curl_request_data",
60+
"//src/concurrent:executor",
61+
"//src/core/event",
62+
"//src/core/event:event_base",
63+
"//src/util/containers:doubly_linked_list",
64+
"@com_google_absl//absl/container:flat_hash_map",
65+
"@com_google_absl//absl/log:check",
66+
"@com_google_absl//absl/status",
67+
"@com_google_absl//absl/strings",
68+
"@com_google_absl//absl/synchronization",
69+
"@com_google_absl//absl/time",
70+
],
71+
)
72+
73+
cc_library(
74+
name = "curl_request_worker",
75+
srcs = [
76+
"curl_request_worker.cc",
77+
],
78+
hdrs = [
79+
"curl_request_worker.h",
80+
],
81+
deps = [
82+
":curl_request_data",
83+
":curl_request_queue",
84+
":multi_curl_request_manager",
85+
"//src/concurrent:executor",
86+
"@com_google_absl//absl/log:check",
87+
"@com_google_absl//absl/status",
88+
"@com_google_absl//absl/strings",
89+
"@com_google_absl//absl/synchronization",
90+
"@com_google_absl//absl/time",
91+
],
92+
)
93+
5094
cc_library(
5195
name = "multi_curl_request_manager",
5296
srcs = [
@@ -70,3 +114,47 @@ cc_library(
70114
"@libevent//:event",
71115
],
72116
)
117+
118+
cc_test(
119+
name = "curl_request_queue_test",
120+
size = "small",
121+
srcs = ["curl_request_queue_test.cc"],
122+
deps = [
123+
":curl_request_data",
124+
":curl_request_queue",
125+
"//src/concurrent:executor",
126+
"//src/logger:request_context_logger",
127+
"@com_github_grpc_grpc//:grpc++",
128+
"@com_google_absl//absl/status",
129+
"@com_google_absl//absl/strings",
130+
"@com_google_absl//absl/synchronization",
131+
"@com_google_absl//absl/time",
132+
"@com_google_googletest//:gtest",
133+
"@com_google_googletest//:gtest_main",
134+
"@rapidjson",
135+
],
136+
)
137+
138+
cc_test(
139+
name = "curl_request_worker_test",
140+
size = "small",
141+
srcs = ["curl_request_worker_test.cc"],
142+
tags = [
143+
"notsan",
144+
],
145+
deps = [
146+
":curl_request_data",
147+
":curl_request_queue",
148+
":curl_request_worker",
149+
"//src/concurrent:executor",
150+
"//src/logger:request_context_logger",
151+
"@com_github_grpc_grpc//:grpc++",
152+
"@com_google_absl//absl/status",
153+
"@com_google_absl//absl/strings",
154+
"@com_google_absl//absl/synchronization",
155+
"@com_google_absl//absl/time",
156+
"@com_google_googletest//:gtest",
157+
"@com_google_googletest//:gtest_main",
158+
"@rapidjson",
159+
],
160+
)
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "src/clients/http_fetcher_async/curl_request_queue.h"
16+
17+
#include <utility>
18+
19+
#include "absl/log/check.h"
20+
#include "absl/synchronization/mutex.h"
21+
#include "src/core/event/event.h"
22+
#include "src/util/containers/doubly_linked_list.h"
23+
24+
namespace privacy_sandbox::server_common::clients {
25+
namespace {
26+
// Timer used by the event on event loop to signal that event loop has
27+
// successfully started.
28+
inline static timeval OneMicrosecond = {0, 1};
29+
30+
} // namespace
31+
32+
CurlRequestQueue::CurlRequestQueue(server_common::Executor* executor,
33+
int capacity, absl::Duration max_wait_time)
34+
: executor_(executor),
35+
capacity_(capacity),
36+
max_wait_(max_wait_time),
37+
expiry_(absl::ToTimeval(max_wait_time)),
38+
eventloop_started_event_(Event(event_base_.get(), /* fd= */ -1,
39+
/* event_type= */ 0,
40+
/* event_callback= */ StartedEventLoop,
41+
/* arg= */ this,
42+
/* priority= */ kNumEventPriorities / 2,
43+
&OneMicrosecond)),
44+
ticker_(Event(event_base_.get(), /*fd=*/-1,
45+
/*event_type=*/EV_PERSIST,
46+
/*event_callback=*/RemoveExpiredEntries,
47+
/*arg=*/this,
48+
/*priority=*/kNumEventPriorities / 2, &expiry_)) {
49+
DCHECK(executor_ != nullptr);
50+
// Start the event loop - used to remove expired request from the request
51+
// queue.
52+
executor_->Run([this]() { event_base_dispatch(event_base_.get()); });
53+
eventloop_started_.WaitForNotification();
54+
}
55+
56+
absl::Mutex& CurlRequestQueue::Mu() { return mu_; }
57+
58+
bool CurlRequestQueue::Full() { return size_ == capacity_; }
59+
60+
bool CurlRequestQueue::Empty() { return size_ == 0; }
61+
62+
std::unique_ptr<CurlRequestData> CurlRequestQueue::Dequeue() {
63+
if (Empty()) {
64+
return nullptr;
65+
}
66+
67+
--size_;
68+
auto* node = dll_.Tail();
69+
std::unique_ptr<CurlRequestData> curl_request_data =
70+
std::move(node->data->key);
71+
dll_.Remove(curl_handles_[curl_request_data.get()]);
72+
curl_handles_.erase(curl_request_data.get());
73+
return curl_request_data;
74+
}
75+
76+
// static
77+
void CurlRequestQueue::StartedEventLoop(int fd, int16_t event_type, void* arg) {
78+
auto* self = reinterpret_cast<CurlRequestQueue*>(arg);
79+
PS_VLOG(5) << "Notifying event loop start";
80+
self->eventloop_started_.Notify();
81+
}
82+
83+
// static
84+
void CurlRequestQueue::RemoveExpiredEntries(int fd, int16_t event_type,
85+
void* arg) {
86+
auto* self = reinterpret_cast<CurlRequestQueue*>(arg);
87+
if (self->shutting_down_.HasBeenNotified()) {
88+
absl::MutexLock lock(&self->Mu());
89+
while (auto request = self->Dequeue()) {
90+
std::move(request->done_callback)(absl::InternalError("Shutting down."));
91+
}
92+
event_base_loopbreak(self->event_base_.get());
93+
self->shutdown_completed_.Notify();
94+
return;
95+
}
96+
97+
absl::MutexLock lock(&self->mu_);
98+
while (auto* node = self->dll_.Head()) {
99+
auto* curl_request_data_ptr = node->data->key.get();
100+
if (absl::Now() - curl_request_data_ptr->start_time < self->max_wait_) {
101+
return;
102+
}
103+
104+
--self->size_;
105+
std::unique_ptr<CurlRequestData> curl_request_data =
106+
std::move(node->data->key);
107+
self->dll_.Remove(self->curl_handles_[curl_request_data_ptr]);
108+
self->curl_handles_.erase(curl_request_data_ptr);
109+
110+
// Move the callback to a different thread.
111+
self->executor_->Run([curl_request_data = std::move(curl_request_data)]() {
112+
std::move(curl_request_data->done_callback)(
113+
absl::InternalError("Request timed out waiting in the queue"));
114+
});
115+
}
116+
}
117+
118+
void CurlRequestQueue::Enqueue(std::unique_ptr<CurlRequestData> request) {
119+
if (shutting_down_.HasBeenNotified()) {
120+
std::move(request->done_callback)(absl::InternalError("Shutting down."));
121+
return;
122+
}
123+
124+
auto* curl_request_data = request.get();
125+
curl_request_data->start_time = absl::Now();
126+
auto data = std::make_unique<DLLData>(DLLData{.key = std::move(request)});
127+
auto* dll_node = dll_.InsertAtFront(std::move(data));
128+
curl_handles_[curl_request_data] = dll_node;
129+
++size_;
130+
}
131+
132+
CurlRequestQueue::~CurlRequestQueue() {
133+
shutting_down_.Notify();
134+
event_active(ticker_.get(), /*res=*/0, /*ncalls=*/0);
135+
shutdown_completed_.WaitForNotification();
136+
}
137+
138+
} // namespace privacy_sandbox::server_common::clients
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef SRC_CLIENTS_HTTP_FETCHER_ASYNC_CURL_REQUEST_QUEUE_H_
16+
#define SRC_CLIENTS_HTTP_FETCHER_ASYNC_CURL_REQUEST_QUEUE_H_
17+
18+
#include <memory>
19+
20+
#include "absl/base/thread_annotations.h"
21+
#include "absl/container/flat_hash_map.h"
22+
#include "absl/synchronization/mutex.h"
23+
#include "absl/synchronization/notification.h"
24+
#include "absl/time/time.h"
25+
#include "src/clients/http_fetcher_async/curl_request_data.h"
26+
#include "src/concurrent/executor.h"
27+
#include "src/core/event/event.h"
28+
#include "src/core/event/event_base.h"
29+
#include "src/util/containers/doubly_linked_list.h"
30+
31+
namespace privacy_sandbox::server_common::clients {
32+
33+
// Request queue used to decouple the producers/reactors who want to make curl
34+
// calls from the executor (a thread that will run the curl transfers).
35+
// Idea is that queuing/dequeuing will be a fast operation and reactor threads
36+
// don't have to block (for long periods) to add requests even if the executor
37+
// is taking longer to process any request.
38+
//
39+
// Note: Mutex `Mu()` should be held before accessing any public interface of
40+
// this class.
41+
class CurlRequestQueue {
42+
public:
43+
explicit CurlRequestQueue(server_common::Executor* executor, int capacity,
44+
absl::Duration max_wait_time);
45+
~CurlRequestQueue();
46+
47+
// Callers can check the fullness before attempting to queue. This is
48+
// helpful since otherwise callers will try to queue and lose the request
49+
// data (including any callbacks), if the queue is full.
50+
bool Full() ABSL_EXCLUSIVE_LOCKS_REQUIRED(Mu());
51+
52+
// Returns a bool to reflect emptiness of the queue. It helps the executor
53+
// thread to setup a condition variable and wait for new requests.
54+
bool Empty() ABSL_EXCLUSIVE_LOCKS_REQUIRED(Mu());
55+
56+
// Removes element from a queue. Returns nullptr if no element is present.
57+
// Will be used by the executor thread to get tasks off the queue and
58+
// timeout thread to remove tasks that have been on the queue for a
59+
// (configurable) timeout period.
60+
std::unique_ptr<CurlRequestData> Dequeue()
61+
ABSL_EXCLUSIVE_LOCKS_REQUIRED(Mu());
62+
63+
// Adds an element to a queue. Caller must ensure that the queue is not
64+
// full before calling this method.
65+
void Enqueue(std::unique_ptr<CurlRequestData> request)
66+
ABSL_EXCLUSIVE_LOCKS_REQUIRED(Mu());
67+
68+
// Controls concurrent access to the queue.
69+
absl::Mutex& Mu();
70+
71+
private:
72+
using DLLData = CacheHashData<std::unique_ptr<CurlRequestData>,
73+
/*unused=*/char>;
74+
using NodeType = Node<std::unique_ptr<CurlRequestData>, /*unused=*/char>;
75+
76+
// Executor is used to run a thread for expiring entries from the queue.
77+
server_common::Executor* executor_;
78+
79+
// Signals successful start of event loop.
80+
static void StartedEventLoop(int fd, int16_t event_type, void* arg);
81+
82+
// Callback to remove expired entries.
83+
static void RemoveExpiredEntries(int fd, int16_t event_type, void* arg);
84+
85+
// Maps from curl request to the node in the doubly linked list.
86+
absl::flat_hash_map<CurlRequestData*, NodeType*> curl_handles_;
87+
88+
// Doubly linked list is used to queue/dequeue requests.
89+
DoublyLinkedList<std::unique_ptr<CurlRequestData>, /*unused=*/char> dll_;
90+
91+
// Number of elements currently in the queue.
92+
int size_ = 0;
93+
94+
// Maximum number of elements that the queue will hold.
95+
const int capacity_;
96+
97+
// Mutex used to expire entries from the queue.
98+
absl::Mutex mu_;
99+
100+
// Event base used to register the request expiry events.
101+
EventBase event_base_ = EventBase(/*use_pthreads=*/true);
102+
103+
// Max time a request is allowed to wait in the queue before it is expired.
104+
absl::Duration max_wait_;
105+
106+
// Maximum time to wait for the request to be dequeued before the request
107+
// forcibly expired from the queue.
108+
struct timeval expiry_;
109+
110+
// Activates on event loop start and signals that event loop has successfully
111+
// started.
112+
Event eventloop_started_event_;
113+
114+
// Callback executed every `max_wait_` time to remove expired entries off
115+
// the queue.
116+
Event ticker_;
117+
118+
// Notification to sync the caller with the event loop start.
119+
absl::Notification eventloop_started_;
120+
121+
// Once set all the incoming enqueue requests will be rejected right away.
122+
absl::Notification shutting_down_;
123+
absl::Notification shutdown_completed_;
124+
};
125+
126+
} // namespace privacy_sandbox::server_common::clients
127+
128+
#endif // SRC_CLIENTS_HTTP_FETCHER_ASYNC_CURL_REQUEST_QUEUE_H_

0 commit comments

Comments
 (0)