Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 48 additions & 7 deletions bb
Original file line number Diff line number Diff line change
Expand Up @@ -611,12 +611,17 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None:
stall_flags: list[str] = []
if params.stall_rate > 0:
stall_flags = [
"--stall-rate", str(params.stall_rate),
"--stall-duration", str(params.stall_duration),
"--stall-rate",
str(params.stall_rate),
"--stall-duration",
str(params.stall_duration),
]

server = None
if local:
server_kwargs: dict[str, Any] = {}
if params.print_counters:
server_kwargs["stdout"] = subprocess.PIPE
server = start_process(
"taskset",
"-c",
Expand All @@ -629,7 +634,9 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None:
str(params.port),
"--delay",
str(params.delay),
*print_counters_flag,
*verbose_flag,
**server_kwargs,
)
wait_for_tcp_port(params.host, params.port)

Expand Down Expand Up @@ -703,11 +710,20 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None:
]
print(_perf_row(cells, _NP_WIDTH))
if params.print_counters:
print()
print("### client counters")
_print_counters(data)
finally:
if server:
server.terminate()
server.wait()
server_stdout, _ = server.communicate()
if params.print_counters and server_stdout:
try:
server_data = json.loads(server_stdout)
print("### server counters")
_print_counters(server_data)
except json.JSONDecodeError as e:
log.warning("could not parse server counters: %s", e)


@dataclass
Expand Down Expand Up @@ -1015,9 +1031,14 @@ def _start_internal_server(
]
if _parse_duration_s(params.delay) > 0:
args += ["--delay", params.delay]
if params.print_counters:
args += ["--print-counters"]
if log.isEnabledFor(logging.DEBUG):
args += ["--verbose"]
return start_process(*args)
server_kwargs: dict[str, Any] = {}
if params.print_counters:
server_kwargs["stdout"] = subprocess.PIPE
return start_process(*args, **server_kwargs)


def cmd_http_perf(preset: str, params: HttpPerfParams) -> None:
Expand Down Expand Up @@ -1110,10 +1131,22 @@ def cmd_http_perf(preset: str, params: HttpPerfParams) -> None:
]
print(_perf_row(cells, _HP_WIDTHS))
if params.print_counters:
print()
print("### client counters")
_print_counters(data)
finally:
server.terminate()
server.wait()
if params.print_counters and not params.nginx:
server_stdout, _ = server.communicate()
if server_stdout:
try:
server_data = json.loads(server_stdout)
print("### server counters")
_print_counters(server_data)
except json.JSONDecodeError as e:
log.warning("could not parse server counters: %s", e)
else:
server.wait()


def _ensure_minio() -> tuple[str, str]:
Expand Down Expand Up @@ -2002,8 +2035,16 @@ def main() -> None:
targets = set(args.targets)
if "all" in targets:
targets = {
"file", "fio", "net", "net-asio", "net-epoll",
"http", "http-threads", "http-nginx", "s3", "s3-threads",
"file",
"fio",
"net",
"net-asio",
"net-epoll",
"http",
"http-threads",
"http-nginx",
"s3",
"s3-threads",
}
file_params = FilePerfParams(
numjobs=[1, 16],
Expand Down
42 changes: 24 additions & 18 deletions include/silk/fibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ static_assert(sizeof(FiberId) == 8);
*/
enum class ProfileEventKind : uint8_t
{
SUSPEND_WAIT = 0, // suspended -> enqueueReady: blocked-on-condition latency
IO_SUBMIT = 1, // submitIo: io_uring_submit cost per fiber-suspend flush
IO_WAIT = 2, // enqueueIo -> handleCompletion: IO latency
READY_WAIT = 3, // enqueueReady -> runFiber: ready-queue dwell
FIBER_RUN = 4, // switchToFiberContext -> return: on-CPU time per slice
READY_WAIT = 0, // enqueueReady -> runFiber: ready-queue dwell
FIBER_RUN = 1, // switchToFiberContext -> return: on-CPU time per slice
SUSPEND_WAIT = 2, // suspended -> enqueueReady: blocked-on-condition latency
IO_WAIT = 3, // enqueueIo -> handleCompletion: full IO latency
SQ_WAIT = 4, // enqueueIo -> io_uring_submit: SQE pending in silk's SQ ring before flush to kernel
SUBMIT_IO = 5, // io_uring_submit syscall cost
CQ_WAIT = 6, // wall-clock gap between consecutive non-empty CQ drains on a ring
MAX
};

Expand Down Expand Up @@ -91,32 +93,36 @@ class FiberScheduler
{
// Per-fiber stack size in bytes. Must be a multiple of the system page size.
// The pool also reserves two guard pages adjacent to each stack.
uint64_t fiberStackSize = 64 * 1024;
uint32_t fiberStackSize = 64 * 1024;

// Per-CPU ready queue capacity (fibers). Must be a power of two and >= 2.
// Sized to absorb dispatch bursts without falling back to the global queue.
uint64_t readyQueueCapacity = 1024;
uint32_t readyQueueCapacity = 1024;

// Hash-table size for futex-style waiter lookups. Must be a power of two.
uint32_t waiterTableSize = 4096;

// Per-CPU io_uring SQ ring capacity. Must be a power of two; the kernel
// rounds up to the nearest supported size.
uint64_t ioUringQueueSize = 256;
uint32_t ioUringQueueSize = 256;

// Mid-drain submit threshold: submitIo defers io_uring_enter until the SQ
// ring holds at least this many entries. Lower values approach per-fiber
// submit (better latency on inline-completion workloads); higher values
// amortize syscall cost across more SQEs (better throughput on networked
// workloads). Must be <= ioUringQueueSize.
// Upper bound on the number of SQEs that may sit in silk's SQ ring
// before being flushed to the kernel. Must be <= ioUringQueueSize.
uint32_t ioUringFlushThreshold = 64;

// Hash-table size for futex-style waiter lookups. Must be a power of two.
uint64_t waiterTableSize = 4096;
// Upper bound on how long an SQE may sit in silk's SQ ring before
// being flushed to the kernel.
uint32_t ioUringFlushTimeout = 100'000;

// Derived in initialize() from ioUringFlushTimeout; users do not set this.
uint64_t ioUringFlushTimeoutCycles = 0;

// Scheduler park backoff (nanoseconds). The dispatch loop spins for up to
// spinThresholdNs after going idle; past that it parks on the eventfd with
// an exponential backoff starting at initialWaitNs and capped at maxWaitNs.
uint64_t initialWaitNs = 1'000;
uint64_t maxWaitNs = 10'000'000;
uint64_t spinThresholdNs = 20'000;
uint32_t initialWaitNs = 1'000;
uint32_t maxWaitNs = 10'000'000;
uint32_t spinThresholdNs = 20'000;

// Allocate per-CPU latency profilers.
bool enableProfiler = false;
Expand Down
108 changes: 68 additions & 40 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static constexpr uint16_t INVALID_PROCESSOR_NUMBER = (1 << 10);
x(FIBER_STOLEN, "FiberStolen") \
x(IO_ENQUEUED, "IoEnqueued") \
x(IO_COMPLETED, "IoCompleted") \
x(IO_SUBMITTED, "IoSubmitted") \
x(SQ_RING_OVERFLOW, "SQRingOverflow") \
x(CQ_RING_OVERFLOW, "CQRingOverflow") \
x(SLEEP_ENQUEUED, "SleepEnqueued") \
Expand Down Expand Up @@ -533,7 +534,7 @@ struct FiberScheduler::ProcessorState

template <typename Setup>
bool enqueueIo(IoFuture * future, Setup && setup) noexcept;
void submitIo(bool flush) noexcept;
bool submitIo(bool flush) noexcept;

void insertSuspended(Fiber * fiber) noexcept;
void removeSuspended(Fiber * fiber) noexcept;
Expand Down Expand Up @@ -579,6 +580,18 @@ struct FiberScheduler::ProcessorState
// Deadline of the in-flight io_uring timeout SQE; 0 when none is pending.
uint64_t wakeupDeadlineCycles = 0;

// Timestamp (TSC cycles) of the most recent successful CQ drain on
// this ring. Used to emit CQ_WAIT profile events bounding how long
// any CQE in the next drain batch could have sat in the ring.
uint64_t lastCqDrainCycles = 0;

// Timestamp (TSC cycles) of the most recent io_uring_submit call.
// Read in submitIo (time-gate) and handleCompletionQueue (SQ_WAIT
// emit) under serviceLoopLock; written in submitIo under
// submissionLock. Relaxed atomic is sufficient: readers tolerate a
// slightly stale value.
std::atomic<uint64_t> lastSubmitCycles{0};

// Per-CPU latency profiler. Allocated only when Options::enableProfiler
// is set; null otherwise. Co-located with the hot path so the null
// check in reportFiberWait/reportIoWait costs no additional miss when
Expand Down Expand Up @@ -653,6 +666,9 @@ void FiberScheduler::ProcessorState::initialize(uint32_t cpu) noexcept
{
profiler = std::make_unique<Profiler>();
}

lastCqDrainCycles = Tsc::getCycles();
lastSubmitCycles.store(lastCqDrainCycles, std::memory_order_relaxed);
}

void FiberScheduler::ProcessorState::destroy() noexcept
Expand Down Expand Up @@ -841,59 +857,55 @@ bool FiberScheduler::ProcessorState::enqueueIo(IoFuture * future, Setup && setup
return false;
}

// Submit pending SQEs to the kernel.
//
// flush=false: pressure-relief mode for dispatch loops - only submit
// once at least options.ioUringFlushThreshold SQEs are queued, otherwise
// let the next fiber accumulate more work before paying the syscall.
//
// flush=true: force submit regardless of count. Used at end-of-drain, by
// the proxy/wakeup paths that need the SQE visible immediately, and by
// worker threads which have no batching boundary.
//
// IO_SUBMIT profile event represents the syscall cost; per-fiber latency
// lives in IO_WAIT. Category 0 because a single io_uring_enter covers SQEs
// from any fibers that contributed.
void FiberScheduler::ProcessorState::submitIo(bool flush) noexcept
{
// Fast path: submitIo is always called on the same thread that just called
// enqueueIo, so our own enqueue is always visible here.
// Suppress the TSan report for the cross-CPU race on sqe_tail.
// Submit pending SQEs to the kernel. flush=true: unconditional flush.
// flush=false: gated by ioUringFlushThreshold (count) or ioUringFlushTimeout.
bool FiberScheduler::ProcessorState::submitIo(bool flush) noexcept
{
// Fast path: read SQ tail outside the lock.
TSAN_IGNORE_BEGIN();
uint32_t count = ::io_uring_sq_ready(&ring);
TSAN_IGNORE_END();
if (count == 0)
{
return false;
}

uint64_t nowCycles = Tsc::getCycles();

uint32_t minCount = flush ? 1 : options.ioUringFlushThreshold;
if (count < minCount)
if (!flush)
{
return;
bool countMet = count >= options.ioUringFlushThreshold;
bool staleMet = nowCycles - lastSubmitCycles.load(std::memory_order_relaxed) > options.ioUringFlushTimeoutCycles;
if (!countMet && !staleMet)
{
return false;
}
}

std::lock_guard lock(submissionLock);

count = ::io_uring_sq_ready(&ring);
if (count >= minCount)
if (count == 0)
{
// TSan needs an explicit barrier between submission/completion.
TSAN_RELEASE(this);
return false;
}

if (profiler)
{
uint64_t startCycles = Tsc::getCycles();
// TSan needs an explicit barrier between submission/completion.
TSAN_RELEASE(this);

int r = ::io_uring_submit(&ring);
SILK_ASSERT(r >= 0);
lastSubmitCycles.store(nowCycles, std::memory_order_relaxed);

profileEvent(ProfileEventKind::IO_SUBMIT, 0, Tsc::getCycles() - startCycles);
}
else
{
int r = ::io_uring_submit(&ring);
SILK_ASSERT(r >= 0);
}
int r = ::io_uring_submit(&ring);
SILK_ASSERT(r >= 0);

Perf::getSimpleCounter(simpleCounters[IO_ENQUEUED], number).increment(count);
if (profiler)
{
profileEvent(ProfileEventKind::SUBMIT_IO, 0, Tsc::getCycles() - nowCycles);
}

Perf::getSimpleCounter(simpleCounters[IO_ENQUEUED], number).increment(count);
Perf::getSimpleCounter(simpleCounters[IO_SUBMITTED], number).increment();
return true;
}

void FiberScheduler::ProcessorState::insertSuspended(Fiber * fiber) noexcept
Expand Down Expand Up @@ -996,6 +1008,7 @@ void FiberScheduler::initialize(const Options * userOptions) noexcept
SILK_ASSERT(options.ioUringQueueSize >= 2 && (options.ioUringQueueSize & (options.ioUringQueueSize - 1)) == 0);
SILK_ASSERT(options.ioUringFlushThreshold >= 1 && options.ioUringFlushThreshold <= options.ioUringQueueSize);
SILK_ASSERT(options.waiterTableSize >= 2 && (options.waiterTableSize & (options.waiterTableSize - 1)) == 0);
options.ioUringFlushTimeoutCycles = Tsc::nanosecondsToCycles(options.ioUringFlushTimeout);

scheduler = new SchedulerState();

Expand Down Expand Up @@ -1522,7 +1535,7 @@ void FiberScheduler::runScheduler(ProcessorState * processor) noexcept
}
else
{
waitNs = waitNs ? std::min(waitNs * 2, options.maxWaitNs) : options.initialWaitNs;
waitNs = waitNs ? std::min<uint64_t>(waitNs * 2, options.maxWaitNs) : options.initialWaitNs;
}
}
}
Expand Down Expand Up @@ -1662,6 +1675,12 @@ bool FiberScheduler::handleCompletionQueue(ProcessorState * processor) noexcept
// TSan needs an explicit barrier between submission/completion.
TSAN_ACQUIRE(processor);

uint64_t entryCycles = 0;
if (processor->profiler)
{
entryCycles = Tsc::getCycles();
}

for (;;)
{
// Handle completion entries in the ring.
Expand Down Expand Up @@ -1704,7 +1723,10 @@ bool FiberScheduler::handleCompletionQueue(ProcessorState * processor) noexcept

if (processor->profiler)
{
processor->profileEvent(ProfileEventKind::IO_WAIT, future->category, Tsc::getCycles() - future->submitTimestamp);
uint64_t nowCycles = Tsc::getCycles();
uint64_t submitCycles = processor->lastSubmitCycles.load(std::memory_order_relaxed);
processor->profileEvent(ProfileEventKind::IO_WAIT, future->category, nowCycles - future->submitTimestamp);
processor->profileEvent(ProfileEventKind::SQ_WAIT, future->category, submitCycles - future->submitTimestamp);
}

int result = cqe->res;
Expand Down Expand Up @@ -1742,6 +1764,12 @@ bool FiberScheduler::handleCompletionQueue(ProcessorState * processor) noexcept
SILK_ASSERT(r >= 0);
}

if (didWork && processor->profiler)
{
processor->profileEvent(ProfileEventKind::CQ_WAIT, 0, entryCycles - processor->lastCqDrainCycles);
processor->lastCqDrainCycles = entryCycles;
}

return didWork;
}

Expand Down
Loading
Loading