diff --git a/bb b/bb index 3e8df0e..bfbd0d5 100755 --- a/bb +++ b/bb @@ -611,12 +611,17 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None: stall_flags: list[str] = [] if params.stall_rate > 0: stall_flags = [ - "--stall-rate", str(params.stall_rate), - "--stall-duration", str(params.stall_duration), + "--stall-rate", + str(params.stall_rate), + "--stall-duration", + str(params.stall_duration), ] server = None if local: + server_kwargs: dict[str, Any] = {} + if params.print_counters: + server_kwargs["stdout"] = subprocess.PIPE server = start_process( "taskset", "-c", @@ -629,7 +634,9 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None: str(params.port), "--delay", str(params.delay), + *print_counters_flag, *verbose_flag, + **server_kwargs, ) wait_for_tcp_port(params.host, params.port) @@ -703,11 +710,20 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None: ] print(_perf_row(cells, _NP_WIDTH)) if params.print_counters: + print() + print("### client counters") _print_counters(data) finally: if server: server.terminate() - server.wait() + server_stdout, _ = server.communicate() + if params.print_counters and server_stdout: + try: + server_data = json.loads(server_stdout) + print("### server counters") + _print_counters(server_data) + except json.JSONDecodeError as e: + log.warning("could not parse server counters: %s", e) @dataclass @@ -1015,9 +1031,14 @@ def _start_internal_server( ] if _parse_duration_s(params.delay) > 0: args += ["--delay", params.delay] + if params.print_counters: + args += ["--print-counters"] if log.isEnabledFor(logging.DEBUG): args += ["--verbose"] - return start_process(*args) + server_kwargs: dict[str, Any] = {} + if params.print_counters: + server_kwargs["stdout"] = subprocess.PIPE + return start_process(*args, **server_kwargs) def cmd_http_perf(preset: str, params: HttpPerfParams) -> None: @@ -1110,10 +1131,22 @@ def cmd_http_perf(preset: str, params: HttpPerfParams) -> None: ] print(_perf_row(cells, _HP_WIDTHS)) if params.print_counters: + print() + print("### client counters") _print_counters(data) finally: server.terminate() - server.wait() + if params.print_counters and not params.nginx: + server_stdout, _ = server.communicate() + if server_stdout: + try: + server_data = json.loads(server_stdout) + print("### server counters") + _print_counters(server_data) + except json.JSONDecodeError as e: + log.warning("could not parse server counters: %s", e) + else: + server.wait() def _ensure_minio() -> tuple[str, str]: @@ -2002,8 +2035,16 @@ def main() -> None: targets = set(args.targets) if "all" in targets: targets = { - "file", "fio", "net", "net-asio", "net-epoll", - "http", "http-threads", "http-nginx", "s3", "s3-threads", + "file", + "fio", + "net", + "net-asio", + "net-epoll", + "http", + "http-threads", + "http-nginx", + "s3", + "s3-threads", } file_params = FilePerfParams( numjobs=[1, 16], diff --git a/include/silk/fibers/fiber.h b/include/silk/fibers/fiber.h index 4a2feb5..5c5a055 100644 --- a/include/silk/fibers/fiber.h +++ b/include/silk/fibers/fiber.h @@ -56,11 +56,13 @@ static_assert(sizeof(FiberId) == 8); */ enum class ProfileEventKind : uint8_t { - SUSPEND_WAIT = 0, // suspended -> enqueueReady: blocked-on-condition latency - IO_SUBMIT = 1, // submitIo: io_uring_submit cost per fiber-suspend flush - IO_WAIT = 2, // enqueueIo -> handleCompletion: IO latency - READY_WAIT = 3, // enqueueReady -> runFiber: ready-queue dwell - FIBER_RUN = 4, // switchToFiberContext -> return: on-CPU time per slice + READY_WAIT = 0, // enqueueReady -> runFiber: ready-queue dwell + FIBER_RUN = 1, // switchToFiberContext -> return: on-CPU time per slice + SUSPEND_WAIT = 2, // suspended -> enqueueReady: blocked-on-condition latency + IO_WAIT = 3, // enqueueIo -> handleCompletion: full IO latency + SQ_WAIT = 4, // enqueueIo -> io_uring_submit: SQE pending in silk's SQ ring before flush to kernel + SUBMIT_IO = 5, // io_uring_submit syscall cost + CQ_WAIT = 6, // wall-clock gap between consecutive non-empty CQ drains on a ring MAX }; @@ -91,32 +93,36 @@ class FiberScheduler { // Per-fiber stack size in bytes. Must be a multiple of the system page size. // The pool also reserves two guard pages adjacent to each stack. - uint64_t fiberStackSize = 64 * 1024; + uint32_t fiberStackSize = 64 * 1024; // Per-CPU ready queue capacity (fibers). Must be a power of two and >= 2. // Sized to absorb dispatch bursts without falling back to the global queue. - uint64_t readyQueueCapacity = 1024; + uint32_t readyQueueCapacity = 1024; + + // Hash-table size for futex-style waiter lookups. Must be a power of two. + uint32_t waiterTableSize = 4096; // Per-CPU io_uring SQ ring capacity. Must be a power of two; the kernel // rounds up to the nearest supported size. - uint64_t ioUringQueueSize = 256; + uint32_t ioUringQueueSize = 256; - // Mid-drain submit threshold: submitIo defers io_uring_enter until the SQ - // ring holds at least this many entries. Lower values approach per-fiber - // submit (better latency on inline-completion workloads); higher values - // amortize syscall cost across more SQEs (better throughput on networked - // workloads). Must be <= ioUringQueueSize. + // Upper bound on the number of SQEs that may sit in silk's SQ ring + // before being flushed to the kernel. Must be <= ioUringQueueSize. uint32_t ioUringFlushThreshold = 64; - // Hash-table size for futex-style waiter lookups. Must be a power of two. - uint64_t waiterTableSize = 4096; + // Upper bound on how long an SQE may sit in silk's SQ ring before + // being flushed to the kernel. + uint32_t ioUringFlushTimeout = 100'000; + + // Derived in initialize() from ioUringFlushTimeout; users do not set this. + uint64_t ioUringFlushTimeoutCycles = 0; // Scheduler park backoff (nanoseconds). The dispatch loop spins for up to // spinThresholdNs after going idle; past that it parks on the eventfd with // an exponential backoff starting at initialWaitNs and capped at maxWaitNs. - uint64_t initialWaitNs = 1'000; - uint64_t maxWaitNs = 10'000'000; - uint64_t spinThresholdNs = 20'000; + uint32_t initialWaitNs = 1'000; + uint32_t maxWaitNs = 10'000'000; + uint32_t spinThresholdNs = 20'000; // Allocate per-CPU latency profilers. bool enableProfiler = false; diff --git a/src/fibers/fiber.cpp b/src/fibers/fiber.cpp index c037f90..2597ba8 100644 --- a/src/fibers/fiber.cpp +++ b/src/fibers/fiber.cpp @@ -68,6 +68,7 @@ static constexpr uint16_t INVALID_PROCESSOR_NUMBER = (1 << 10); x(FIBER_STOLEN, "FiberStolen") \ x(IO_ENQUEUED, "IoEnqueued") \ x(IO_COMPLETED, "IoCompleted") \ + x(IO_SUBMITTED, "IoSubmitted") \ x(SQ_RING_OVERFLOW, "SQRingOverflow") \ x(CQ_RING_OVERFLOW, "CQRingOverflow") \ x(SLEEP_ENQUEUED, "SleepEnqueued") \ @@ -533,7 +534,7 @@ struct FiberScheduler::ProcessorState template bool enqueueIo(IoFuture * future, Setup && setup) noexcept; - void submitIo(bool flush) noexcept; + bool submitIo(bool flush) noexcept; void insertSuspended(Fiber * fiber) noexcept; void removeSuspended(Fiber * fiber) noexcept; @@ -579,6 +580,18 @@ struct FiberScheduler::ProcessorState // Deadline of the in-flight io_uring timeout SQE; 0 when none is pending. uint64_t wakeupDeadlineCycles = 0; + // Timestamp (TSC cycles) of the most recent successful CQ drain on + // this ring. Used to emit CQ_WAIT profile events bounding how long + // any CQE in the next drain batch could have sat in the ring. + uint64_t lastCqDrainCycles = 0; + + // Timestamp (TSC cycles) of the most recent io_uring_submit call. + // Read in submitIo (time-gate) and handleCompletionQueue (SQ_WAIT + // emit) under serviceLoopLock; written in submitIo under + // submissionLock. Relaxed atomic is sufficient: readers tolerate a + // slightly stale value. + std::atomic lastSubmitCycles{0}; + // Per-CPU latency profiler. Allocated only when Options::enableProfiler // is set; null otherwise. Co-located with the hot path so the null // check in reportFiberWait/reportIoWait costs no additional miss when @@ -653,6 +666,9 @@ void FiberScheduler::ProcessorState::initialize(uint32_t cpu) noexcept { profiler = std::make_unique(); } + + lastCqDrainCycles = Tsc::getCycles(); + lastSubmitCycles.store(lastCqDrainCycles, std::memory_order_relaxed); } void FiberScheduler::ProcessorState::destroy() noexcept @@ -841,59 +857,55 @@ bool FiberScheduler::ProcessorState::enqueueIo(IoFuture * future, Setup && setup return false; } -// Submit pending SQEs to the kernel. -// -// flush=false: pressure-relief mode for dispatch loops - only submit -// once at least options.ioUringFlushThreshold SQEs are queued, otherwise -// let the next fiber accumulate more work before paying the syscall. -// -// flush=true: force submit regardless of count. Used at end-of-drain, by -// the proxy/wakeup paths that need the SQE visible immediately, and by -// worker threads which have no batching boundary. -// -// IO_SUBMIT profile event represents the syscall cost; per-fiber latency -// lives in IO_WAIT. Category 0 because a single io_uring_enter covers SQEs -// from any fibers that contributed. -void FiberScheduler::ProcessorState::submitIo(bool flush) noexcept -{ - // Fast path: submitIo is always called on the same thread that just called - // enqueueIo, so our own enqueue is always visible here. - // Suppress the TSan report for the cross-CPU race on sqe_tail. +// Submit pending SQEs to the kernel. flush=true: unconditional flush. +// flush=false: gated by ioUringFlushThreshold (count) or ioUringFlushTimeout. +bool FiberScheduler::ProcessorState::submitIo(bool flush) noexcept +{ + // Fast path: read SQ tail outside the lock. TSAN_IGNORE_BEGIN(); uint32_t count = ::io_uring_sq_ready(&ring); TSAN_IGNORE_END(); + if (count == 0) + { + return false; + } + + uint64_t nowCycles = Tsc::getCycles(); - uint32_t minCount = flush ? 1 : options.ioUringFlushThreshold; - if (count < minCount) + if (!flush) { - return; + bool countMet = count >= options.ioUringFlushThreshold; + bool staleMet = nowCycles - lastSubmitCycles.load(std::memory_order_relaxed) > options.ioUringFlushTimeoutCycles; + if (!countMet && !staleMet) + { + return false; + } } std::lock_guard lock(submissionLock); count = ::io_uring_sq_ready(&ring); - if (count >= minCount) + if (count == 0) { - // TSan needs an explicit barrier between submission/completion. - TSAN_RELEASE(this); + return false; + } - if (profiler) - { - uint64_t startCycles = Tsc::getCycles(); + // TSan needs an explicit barrier between submission/completion. + TSAN_RELEASE(this); - int r = ::io_uring_submit(&ring); - SILK_ASSERT(r >= 0); + lastSubmitCycles.store(nowCycles, std::memory_order_relaxed); - profileEvent(ProfileEventKind::IO_SUBMIT, 0, Tsc::getCycles() - startCycles); - } - else - { - int r = ::io_uring_submit(&ring); - SILK_ASSERT(r >= 0); - } + int r = ::io_uring_submit(&ring); + SILK_ASSERT(r >= 0); - Perf::getSimpleCounter(simpleCounters[IO_ENQUEUED], number).increment(count); + if (profiler) + { + profileEvent(ProfileEventKind::SUBMIT_IO, 0, Tsc::getCycles() - nowCycles); } + + Perf::getSimpleCounter(simpleCounters[IO_ENQUEUED], number).increment(count); + Perf::getSimpleCounter(simpleCounters[IO_SUBMITTED], number).increment(); + return true; } void FiberScheduler::ProcessorState::insertSuspended(Fiber * fiber) noexcept @@ -996,6 +1008,7 @@ void FiberScheduler::initialize(const Options * userOptions) noexcept SILK_ASSERT(options.ioUringQueueSize >= 2 && (options.ioUringQueueSize & (options.ioUringQueueSize - 1)) == 0); SILK_ASSERT(options.ioUringFlushThreshold >= 1 && options.ioUringFlushThreshold <= options.ioUringQueueSize); SILK_ASSERT(options.waiterTableSize >= 2 && (options.waiterTableSize & (options.waiterTableSize - 1)) == 0); + options.ioUringFlushTimeoutCycles = Tsc::nanosecondsToCycles(options.ioUringFlushTimeout); scheduler = new SchedulerState(); @@ -1522,7 +1535,7 @@ void FiberScheduler::runScheduler(ProcessorState * processor) noexcept } else { - waitNs = waitNs ? std::min(waitNs * 2, options.maxWaitNs) : options.initialWaitNs; + waitNs = waitNs ? std::min(waitNs * 2, options.maxWaitNs) : options.initialWaitNs; } } } @@ -1662,6 +1675,12 @@ bool FiberScheduler::handleCompletionQueue(ProcessorState * processor) noexcept // TSan needs an explicit barrier between submission/completion. TSAN_ACQUIRE(processor); + uint64_t entryCycles = 0; + if (processor->profiler) + { + entryCycles = Tsc::getCycles(); + } + for (;;) { // Handle completion entries in the ring. @@ -1704,7 +1723,10 @@ bool FiberScheduler::handleCompletionQueue(ProcessorState * processor) noexcept if (processor->profiler) { - processor->profileEvent(ProfileEventKind::IO_WAIT, future->category, Tsc::getCycles() - future->submitTimestamp); + uint64_t nowCycles = Tsc::getCycles(); + uint64_t submitCycles = processor->lastSubmitCycles.load(std::memory_order_relaxed); + processor->profileEvent(ProfileEventKind::IO_WAIT, future->category, nowCycles - future->submitTimestamp); + processor->profileEvent(ProfileEventKind::SQ_WAIT, future->category, submitCycles - future->submitTimestamp); } int result = cqe->res; @@ -1742,6 +1764,12 @@ bool FiberScheduler::handleCompletionQueue(ProcessorState * processor) noexcept SILK_ASSERT(r >= 0); } + if (didWork && processor->profiler) + { + processor->profileEvent(ProfileEventKind::CQ_WAIT, 0, entryCycles - processor->lastCqDrainCycles); + processor->lastCqDrainCycles = entryCycles; + } + return didWork; } diff --git a/src/perf/common.cpp b/src/perf/common.cpp index ddcd0af..b6a13fc 100644 --- a/src/perf/common.cpp +++ b/src/perf/common.cpp @@ -112,16 +112,20 @@ static const char * profileEventKindName(silk::ProfileEventKind kind) noexcept { switch (kind) { - case silk::ProfileEventKind::SUSPEND_WAIT: - return "suspend_wait"; - case silk::ProfileEventKind::IO_SUBMIT: - return "io_submit"; - case silk::ProfileEventKind::IO_WAIT: - return "io_wait"; case silk::ProfileEventKind::READY_WAIT: return "ready_wait"; case silk::ProfileEventKind::FIBER_RUN: return "fiber_run"; + case silk::ProfileEventKind::SUSPEND_WAIT: + return "suspend_wait"; + case silk::ProfileEventKind::IO_WAIT: + return "io_wait"; + case silk::ProfileEventKind::CQ_WAIT: + return "cq_wait"; + case silk::ProfileEventKind::SQ_WAIT: + return "sq_wait"; + case silk::ProfileEventKind::SUBMIT_IO: + return "submit_io"; default: return "unknown"; } diff --git a/src/perf/http-perf.cpp b/src/perf/http-perf.cpp index 42a6657..d3e1a35 100644 --- a/src/perf/http-perf.cpp +++ b/src/perf/http-perf.cpp @@ -699,6 +699,18 @@ static void runServer(int argc, char ** argv) server.stop(); } + if (cfg.printCounters) + { + printf("{\n"); + if (!cfg.useThreads) + { + printSchedulerLatency(); + printf(","); + } + printCounters(); + printf("}\n"); + } + if (!cfg.useThreads) { silk::FiberScheduler::destroy(); diff --git a/src/perf/net-perf.cpp b/src/perf/net-perf.cpp index 9a88bc4..fb5e39c 100644 --- a/src/perf/net-perf.cpp +++ b/src/perf/net-perf.cpp @@ -795,6 +795,15 @@ static void runServer(int argc, char ** argv) SILK_INFO("stopping server"); server.stop(); + if (cfg.printCounters) + { + printf("{\n"); + printSchedulerLatency(); + printf(","); + printCounters(); + printf("}\n"); + } + silk::FiberScheduler::destroy(); silk::destroy(); }