Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions bb
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ class NetPerfParams:
warmup: str = "2s"
connections: list[int] = field(default_factory=lambda: [1000])
delay: str = "0"
stall_rate: float = 0.0
stall_duration: str = "0"
flamegraph: bool = False
print_counters: bool = False
timeout: int = 180
Expand Down Expand Up @@ -606,6 +608,12 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None:
local = params.host in ("127.0.0.1", "localhost")
verbose_flag = ["--verbose"] if log.isEnabledFor(logging.DEBUG) else []
print_counters_flag = ["--print-counters"] if params.print_counters else []
stall_flags: list[str] = []
if params.stall_rate > 0:
stall_flags = [
"--stall-rate", str(params.stall_rate),
"--stall-duration", str(params.stall_duration),
]

server = None
if local:
Expand Down Expand Up @@ -648,6 +656,7 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None:
str(params.duration),
"--warmup",
str(params.warmup),
*stall_flags,
*verbose_flag,
],
)
Expand All @@ -674,6 +683,7 @@ def cmd_net_perf(preset: str, params: NetPerfParams) -> None:
str(params.duration),
"--warmup",
str(params.warmup),
*stall_flags,
*print_counters_flag,
*verbose_flag,
timeout=params.timeout or None,
Expand Down Expand Up @@ -1693,6 +1703,21 @@ def _build_parser() -> argparse.ArgumentParser:
metavar="DURATION",
help="server-side delay per message (e.g. 1ms, 100us)",
)
parser.add_argument(
"--stall-rate",
dest="net_stall_rate",
default=net_params.stall_rate,
type=float,
metavar="HZ",
help="per-connection Poisson rate of stall messages (Hz, 0 disables)",
)
parser.add_argument(
"--stall-duration",
dest="net_stall_duration",
default=net_params.stall_duration,
metavar="DURATION",
help="stall duration per stall event (e.g. 100us, 1ms)",
)
parser.add_argument(
"--print-counters",
dest="net_print_counters",
Expand Down
14 changes: 13 additions & 1 deletion include/silk/fibers/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class FiberScheduler

// Allocate per-CPU latency profilers.
bool enableProfiler = false;

// Disable work-stealing. Set to study the effect of work-stealing in
// isolation (e.g. head-of-line blocking benchmarks).
// Production should leave this off.
bool disableWorkStealing = false;
};

/**
Expand Down Expand Up @@ -476,7 +481,14 @@ class FiberScheduler

struct CompareStealCost
{
bool operator()(const StealCandidate & l, const StealCandidate & r) const noexcept { return l.costCycles < r.costCycles; }
bool operator()(const StealCandidate & l, const StealCandidate & r) const noexcept
{
if (l.costCycles != r.costCycles)
{
return l.costCycles < r.costCycles;
}
return l.processorNumber < r.processorNumber;
}
};

//
Expand Down
19 changes: 14 additions & 5 deletions src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <format>
#include <memory>
#include <mutex>
#include <random>
#include <thread>
#include <utility>

Expand Down Expand Up @@ -1089,8 +1088,10 @@ void FiberScheduler::buildStealCandidates() noexcept

std::sort(processor->stealCandidates.get(), processor->stealCandidates.get() + candidateCount, CompareStealCost{});

// Shuffle CPUs in the same cost group.
std::mt19937 rng(cpu);
// Spread first-choice steal targets within each cost-tie group via a
// deterministic rotation by cpu % groupSize. Avoids the thundering
// herd of every CPU racing the same first target while keeping the
// candidate order reproducible across runs.
for (uint32_t start = 0; start < candidateCount;)
{
uint64_t groupCost = processor->stealCandidates[start].costCycles;
Expand All @@ -1099,7 +1100,15 @@ void FiberScheduler::buildStealCandidates() noexcept
{
++end;
}
std::shuffle(processor->stealCandidates.get() + start, processor->stealCandidates.get() + end, rng);
uint32_t groupSize = end - start;
if (groupSize > 1)
{
uint32_t rotation = cpu % groupSize;
std::rotate(
processor->stealCandidates.get() + start,
processor->stealCandidates.get() + start + rotation,
processor->stealCandidates.get() + end);
}
start = end;
}
}
Expand Down Expand Up @@ -1501,7 +1510,7 @@ void FiberScheduler::runScheduler(ProcessorState * processor) noexcept
didWork |= runServiceLoop(processor, waitNs, &timer);

// Steal work only when there is nothing to do on own CPU.
if (!didWork)
if (!didWork && !options.disableWorkStealing)
{
didWork |= runStealLoop(processor, idleSinceCycles, &timer);
}
Expand Down
34 changes: 34 additions & 0 deletions src/perf/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
#include <silk/util/assert.h>
#include <silk/util/perf.h>
#include <silk/util/platform.h>
#include <silk/util/tsc.h>

#include <algorithm>
#include <cerrno>
#include <cmath>
#include <cstdio>
#include <cstring>
#include <stdexcept>
#include <string>
#include <vector>
Expand Down Expand Up @@ -257,3 +259,35 @@ std::string formatDuration(uint64_t ns)
}
return std::to_string(ns) + "ns";
}

uint32_t StallScheduler::next() noexcept
{
if (rateHz <= 0.0)
{
return 0;
}
uint64_t now = silk::Tsc::getCycles();
if (now < nextStallCycles)
{
return 0;
}
std::exponential_distribution<double> dist(rateHz);
double gapNs = dist(rng) * 1'000'000'000.0;
nextStallCycles = now + silk::Tsc::nanosecondsToCycles(static_cast<uint64_t>(gapNs));
return static_cast<uint32_t>(stallNs);
}

void busyLoopForStall(const char * buf) noexcept
{
uint32_t stallNs = 0;
std::memcpy(&stallNs, buf, sizeof(stallNs));
if (stallNs == 0)
{
return;
}
uint64_t target = silk::Tsc::getCycles() + silk::Tsc::nanosecondsToCycles(stallNs);
while (silk::Tsc::getCycles() < target)
{
silk::cpuPause();
}
}
50 changes: 50 additions & 0 deletions src/perf/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <cerrno>
#include <cstdint>
#include <random>
#include <string>
#include <vector>

Expand Down Expand Up @@ -59,3 +60,52 @@ static inline bool isExpectedShutdown(int r)
{
return r == ECONNRESET || r == ECANCELED || r == EBADF || r == EPIPE || r == EINVAL;
}

/**
* Per-connection Poisson scheduler for stall messages in net-perf workloads.
* next() returns the stall budget (nanoseconds) for the upcoming message;
* the client encodes the value into the first 4 bytes and the server reads
* it back with busyLoopForStall. Inter-arrival times are exponentially
* distributed at the configured rate. rateHz = 0 disables stalls entirely.
*/
class StallScheduler
{
public:
/** Default-constructed scheduler is disabled; next() returns 0. */
StallScheduler() noexcept = default;

/**
* Construct an armed scheduler. The first inter-arrival is sampled in the
* constructor so the caller's first call to next() returns 0 (no stall on
* the first message). rateHz = 0 leaves the scheduler disabled.
*/
StallScheduler(double rateHz_, uint64_t stallNs_, uint64_t seed) noexcept
: rateHz(rateHz_)
, stallNs(stallNs_)
, rng(seed)
{
// The discarded return value samples the first inter-arrival and
// arms nextStallCycles; subsequent calls fire when due.
next();
}

/** True if the scheduler will ever emit a stall. */
bool enabled() const noexcept { return rateHz > 0.0; }

/** Returns the stall duration (ns) for the next message; 0 means no stall. */
uint32_t next() noexcept;

private:
double rateHz = 0.0;
uint64_t stallNs = 0;
std::mt19937_64 rng;
uint64_t nextStallCycles = 0;
};

/**
* Read the leading uint32_t stall_ns prefix from a fully-received message
* buffer and busy-loop (RDTSC + cpuPause) for that duration. The server
* side of every net-perf variant calls this immediately after reading a
* complete message, before echoing.
*/
void busyLoopForStall(const char * buf) noexcept;
19 changes: 19 additions & 0 deletions src/perf/net-perf-asio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ asio::awaitable<void> Server::acceptLoop()

asio::awaitable<void> Server::handleConnection(Server * server, std::shared_ptr<tcp::socket> socket)
{
SILK_ASSERT(server->cfg.msgSize >= sizeof(uint32_t));
auto buf = std::make_unique<char[]>(server->cfg.msgSize);
boost::system::error_code ec;
for (;;)
Expand All @@ -153,6 +154,11 @@ asio::awaitable<void> Server::handleConnection(Server * server, std::shared_ptr<
n += got;
}

// First 4 bytes carry a per-message stall budget in nanoseconds.
// Busy-loop for that long; this thread's executor cannot service
// other connections during the stall, demonstrating HOL blocking.
busyLoopForStall(buf.get());

if (server->cfg.delayNs)
{
asio::steady_timer timer(co_await asio::this_coro::executor);
Expand Down Expand Up @@ -198,6 +204,8 @@ struct ClientConfig
uint32_t msgSize = 64;
uint64_t durationNs = 10'000'000'000ULL;
uint64_t warmupNs = 2'000'000'000ULL;
double stallRateHz = 0.0;
uint64_t stallNs = 0;
bool printCounters = false;
};

Expand Down Expand Up @@ -266,11 +274,18 @@ void Client::stop()

asio::awaitable<void> Client::clientConnection(Client * client, Connection * connection)
{
SILK_ASSERT(client->cfg.msgSize >= sizeof(uint32_t));
auto buf = std::make_unique<char[]>(client->cfg.msgSize);
std::memset(buf.get(), 0xAB, client->cfg.msgSize);
boost::system::error_code ec;

StallScheduler stalls(client->cfg.stallRateHz, client->cfg.stallNs, static_cast<uint64_t>(reinterpret_cast<uintptr_t>(connection)));

for (;;)
{
uint32_t stallNs = stalls.next();
std::memcpy(buf.get(), &stallNs, sizeof(stallNs));

uint64_t start = silk::Tsc::getCycles();

std::size_t n = 0;
Expand Down Expand Up @@ -439,6 +454,7 @@ static void runClient(int argc, char ** argv)

std::string durationStr = "10s";
std::string warmupStr = "2s";
std::string stallDurationStr = "0";

// clang-format off
desc.add_options()
Expand All @@ -449,6 +465,8 @@ static void runClient(int argc, char ** argv)
("msg-size", po::value(&cfg.msgSize), "message size in bytes")
("duration", po::value(&durationStr), "measurement duration (e.g. 10s, 500ms)")
("warmup", po::value(&warmupStr), "warmup duration (e.g. 2s, 500ms)")
("stall-rate", po::value(&cfg.stallRateHz), "per-connection Poisson rate of stall messages (Hz, 0 disables)")
("stall-duration", po::value(&stallDurationStr), "stall duration per stall event (e.g. 100us, 1ms)")
("print-counters", po::bool_switch(&cfg.printCounters), "include counters in the JSON report")
("verbose,v", po::bool_switch(&verbose), "enable debug logging")
;
Expand All @@ -466,6 +484,7 @@ static void runClient(int argc, char ** argv)
po::notify(vm);
cfg.durationNs = parseDuration(durationStr);
cfg.warmupNs = parseDuration(warmupStr);
cfg.stallNs = parseDuration(stallDurationStr);
if (verbose)
{
silk::Logger::setLevel(silk::LogLevel::DEBUG);
Expand Down
Loading
Loading