From 026cab4131f90ae4d11a243d6f6329fd21633875 Mon Sep 17 00:00:00 2001 From: Kelvin Ng Date: Fri, 13 Mar 2026 01:03:08 +0000 Subject: [PATCH 1/5] SpikeAsync implementation --- spike/ASYNC_API.md | 350 ++++++++++++++++++++++ spike/src/include/nrt_wrapper.h | 84 ++++++ spike/src/include/spike.h | 265 ++++++++++++++++- spike/src/include/tensor.h | 3 + spike/src/include/tensor_set.h | 17 +- spike/src/nrt_wrapper.cpp | 9 + spike/src/python_bindings.cpp | 130 +++++++++ spike/src/spike.cpp | 503 +++++++++++++++++++++++++++++++- spike/src/spike/__init__.py | 17 ++ spike/src/spike/_spike.pyi | 85 +++++- spike/src/spike/spike_async.py | 416 ++++++++++++++++++++++++++ spike/src/tensor.cpp | 14 + spike/src/tensor_set.cpp | 45 ++- spike/tests/conftest.py | 16 + spike/tests/test_spike_async.py | 210 +++++++++++++ 15 files changed, 2146 insertions(+), 18 deletions(-) create mode 100644 spike/ASYNC_API.md create mode 100644 spike/src/spike/spike_async.py create mode 100644 spike/tests/conftest.py create mode 100644 spike/tests/test_spike_async.py diff --git a/spike/ASYNC_API.md b/spike/ASYNC_API.md new file mode 100644 index 0000000..b0fc8d6 --- /dev/null +++ b/spike/ASYNC_API.md @@ -0,0 +1,350 @@ +# Spike Async/Nonblock API + +This document describes the asynchronous and non-blocking APIs added to the Spike runtime. + +## Overview + +The Spike runtime supports two levels of asynchronous operation: + +1. **High-level Async API** (Recommended): Python asyncio-style interface with automatic dependency management +2. **Low-level Nonblock API** (Advanced): Direct control over non-blocking operations with manual polling + +**Most users should use the high-level Async API**, which provides a cleaner interface and automatic dependency management. + +## High-Level Async API + +The high-level API provides an asyncio-like interface with automatic dependency tracking and stream support. + +### Initialization + +```python +from spike import SpikeAsync + +spike_async = SpikeAsync(verbose_level=1) +# Nonblock mode is automatically initialized + +# Load models and allocate tensors +model = spike_async.load_model("model.neff", core_id=0) +input_tensor = spike_async.allocate_tensor(1024 * 1024, core_id=0) +output_tensor = spike_async.allocate_tensor(1024 * 1024, core_id=0) +``` + +### Basic Operations + +All operations return a Future that can be awaited: + +```python +# Tensor operations +write_fut = spike_async.tensor_write(tensor, data) +read_fut = spike_async.tensor_read(tensor) + +# Model execution +exec_fut = spike_async.execute(model, input_set, output_set) + +# Wait for completion (blocking) +data = read_fut.wait() +``` + +### Async/Await Style Programming (Recommended) + +The most powerful feature is using async/await to write sequential-looking code that runs asynchronously. You can `await` futures directly, making your code clean and readable: + +```python +async def inference_pipeline(model, input_data, input_tensor, output_tensor): + # Write sequential code, but it runs asynchronously! + # Each await releases control, allowing other operations to run + + # Prepare input data + await spike_async.tensor_write(input_tensor, input_data) + + # Run inference + input_set = spike_async.create_tensor_set({"input": input_tensor}) + output_set = spike_async.create_tensor_set({"output": output_tensor}) + await spike_async.execute(model, input_set, output_set) + + # Read results + output_data = await spike_async.tensor_read(output_tensor) + + return output_data + +# Run TWO pipelines concurrently +pipeline1 = spike_async.submit( + inference_pipeline(model1, data1, input_tensor1, output_tensor1) +) +pipeline2 = spike_async.submit( + inference_pipeline(model2, data2, input_tensor2, output_tensor2) +) + +# Both pipelines are now running concurrently! +# While pipeline1 is executing the model, pipeline2 can do tensor I/O +result1 = pipeline1.wait() +result2 = pipeline2.wait() +``` + +**Why this is powerful:** +- Code reads like synchronous operations, but executes asynchronously +- Multiple pipelines run concurrently without manual dependency tracking +- Different operation types (tensor I/O, model execution) can overlap, maximizing hardware utilization +- You can mix async Spike operations with regular Python async operations + +### Explicit Dependency Management + +Alternatively, you can explicitly specify dependencies between operations: + +```python +# Write, then read +write_fut = spike_async.tensor_write(tensor, data) +read_fut = spike_async.tensor_read(tensor, deps=[write_fut]) + +# Chain multiple operations +fut1 = spike_async.execute(model1, in1, out1) +fut2 = spike_async.execute(model2, in2, out2, deps=[fut1]) +fut3 = spike_async.execute(model3, in3, out3, deps=[fut1, fut2]) +``` + +This approach is useful when you need fine-grained control or want to build complex DAGs without writing async functions. + +### Streams + +Streams provide automatic sequencing of operations - all operations in a stream execute in order: + +```python +# All operations in a stream are sequenced +with spike_async.create_stream() as stream: + spike_async.tensor_write(tensor1, data1) + spike_async.tensor_write(tensor2, data2) + spike_async.execute(model, inputs, outputs) + +# Wait for stream to complete +stream.wait() +``` + +Multiple streams can run concurrently: + +```python +streams = [] +for i in range(num_parallel): + with spike_async.create_stream() as stream: + # Operations in this stream + spike_async.execute(models[i], inputs[i], outputs[i]) + streams.append(stream) + +# Wait for all streams +for stream in streams: + stream.wait() +``` + +### Stream Events + +Synchronize between streams using events: + +```python +with spike_async.create_stream() as stream1: + spike_async.tensor_write(tensor, data) + event = stream1.record_event() # Capture current point + +with spike_async.create_stream() as stream2: + stream2.wait_event(event) # Wait for stream1's event + spike_async.execute(model, inputs, outputs) +``` + +### Batched Operations + +For efficiency, you can batch multiple tensor operations: + +**Batched Writes:** +```python +# Prepare a batch of writes +batch_id = spike_async.spike.tensor_write_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + data_objs=[data1, data2, data3], + offsets=[0, 0, 0] # Optional +) + +# Start the batch operation (returns a single future) +write_fut = spike_async.tensor_write_batched_start(batch_id) +write_fut.wait() # Wait for all writes to complete +``` + +**Batched Reads:** +```python +# Prepare a batch of reads +batch_id = spike_async.spike.tensor_read_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + dests=[dest1, dest2, dest3], # Pre-allocated numpy arrays + offsets=[0, 0, 0], # Optional + sizes=[size1, size2, size3] # Optional +) + +# Start the batch operation (returns a single future) +read_fut = spike_async.tensor_read_batched_start(batch_id) +read_fut.wait() # Wait for all reads to complete +``` + +Batched operations reduce overhead when you have many small operations. + +### Waiting for Multiple Operations + +```python +# Launch multiple operations +futs = [] +for i in range(10): + fut = spike_async.execute(model, inputs[i], outputs[i]) + futs.append(fut) + +# Wait for all +all_results = await spike_async.all(futs) +``` + +## Low-Level Nonblock API + +The low-level API provides direct access to non-blocking operations. You must manually poll for completion. **Use this only if you need fine-grained control.** + +### Initialization + +```python +from spike import Spike + +spike = Spike(verbose_level=1) +spike.init_nonblock() # Initialize thread pools for async operations +``` + +### Nonblocking Operations + +**Tensor Write:** +```python +# Returns operation ID +write_id = spike.tensor_write_nonblock(tensor, data, offset=0) +``` + +**Tensor Read:** +```python +# Returns operation ID +read_id = spike.tensor_read_nonblock(tensor, offset=0, size=0) +``` + +**Model Execution:** +```python +input_set = spike.create_tensor_set({"input": input_tensor}) +output_set = spike.create_tensor_set({"output": output_tensor}) +exec_id = spike.execute_nonblock(model, input_set, output_set) +``` + +### Polling for Results + +```python +while True: + result = spike.try_poll() # Non-blocking poll + if result is not None: + # Check result type + if isinstance(result, NonBlockTensorWriteResult): + if result.err is None: + print(f"Write {result.id} completed successfully") + else: + print(f"Write {result.id} failed: {result.err}") + elif isinstance(result, NonBlockTensorReadResult): + if result.err is None: + data = result.data # Retrieved data + else: + print(f"Read {result.id} failed: {result.err}") + elif isinstance(result, NonBlockExecResult): + if result.err is None: + print(f"Execution {result.id} completed successfully") + else: + print(f"Execution {result.id} failed: {result.err}") + + # Check if this is the operation we're waiting for + if result.id == target_id: + break +``` + +### Batched Operations (Low-Level) + +**Batched Writes:** +```python +# Prepare batch +batch_id = spike.tensor_write_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + data_objs=[data1, data2, data3], + offsets=[0, 0, 0] # Optional +) + +# Start the batch (returns single operation ID) +write_id = spike.tensor_write_nonblock_batched_start(batch_id) + +# Poll for single completion +while True: + result = spike.try_poll() + if result is not None and result.id == write_id: + break +``` + +**Batched Reads:** +```python +# Prepare batch +batch_id = spike.tensor_read_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + dests=[dest1, dest2, dest3], # Pre-allocated numpy arrays + offsets=[0, 0, 0], # Optional + sizes=[size1, size2, size3] # Optional +) + +# Start the batch (returns single operation ID) +read_id = spike.tensor_read_nonblock_batched_start(batch_id) + +# Poll for completion +while True: + result = spike.try_poll() + if result is not None and result.id == read_id: + if result.err is None: + # All reads completed successfully + # Data is already in dest1, dest2, dest3 + pass + break +``` + +## Architecture + +### Thread Pools + +When `init_nonblock()` is called, Spike creates two thread pools: + +- **Tensor threads**: Handle tensor read/write operations (one per NeuronCore) +- **Execution threads**: Handle model execution (one per NeuronCore) + +Operations are dispatched to the appropriate thread based on the NeuronCore ID of the tensor or model. + +### Result Queue + +Completed operations are pushed to a lock-free notification queue. The `try_poll()` method (or the async event loop) checks this queue for results. + +### Async Event Loop + +The `SpikeAsyncEventLoop` integrates with the nonblock API: + +- Futures are registered with operation IDs +- The selector polls `try_poll()` periodically +- When results arrive, corresponding futures are resolved + +### InternalResult → Result Pattern + +To avoid GIL overheads, the worker threads use `InternalResult` structures that hold shared_ptrs to keep resources alive. When polled from the main thread (with GIL held), these are converted to `Result` structures. This ensures nanobind/Python object destructors run in GIL context, not in worker threads. + +## Best Practices + +1. **Use the high-level Async API**: It's simpler and handles dependency management automatically +2. **Use async/await for readable concurrent code**: Multiple pipelines can overlap tensor I/O with model execution +3. **Use streams for simple sequencing**: Streams are the easiest way to ensure operations execute in order +4. **Use explicit dependencies for complex graphs**: When you have multiple parallel streams that need to synchronize in complex patterns +5. **Batch operations when possible**: Batched operations reduce overhead for multiple small transfers +6. **Don't mix sync and async**: Once you submit async or non-blocking operations, submitting sync operations may cause unexpected behaviors. + +## Migration from Old API + +If you're migrating from the old `NeuronPy` codebase: + +- `SpikeCore` → `Spike` +- `spike_cpp` module → `_spike` module +- `device_id` → `core_id` (parameter rename) +- API is otherwise compatible at the low level +- High-level `SpikeAsync` API is unchanged diff --git a/spike/src/include/nrt_wrapper.h b/spike/src/include/nrt_wrapper.h index b5f3970..7064cfe 100644 --- a/spike/src/include/nrt_wrapper.h +++ b/spike/src/include/nrt_wrapper.h @@ -10,6 +10,89 @@ #include #include +extern "C" { + +// Underlying tensor and model declaration copied from NRT +// This is a temporary hack for implementing nonblocking/async operations, +// as I will need some underlying info that is not exposed +// Will not need these after explicit async is ready and stable from NRT + +#define DX_CACHE_ALIGNED __attribute__((aligned(64))) + +typedef enum nrt_tensor_mem_type { + NRT_TENSOR_MEM_TYPE_INVALID = 0, + NRT_TENSOR_MEM_TYPE_MALLOC, + NRT_TENSOR_MEM_TYPE_DMA, + NRT_TENSOR_MEM_TYPE_FAKE, +} nrt_tensor_mem_type_t; + +// Memory, host or device that is used by +// a tensor. The memory is ref counted and can be shared among +// multiple tensors. +typedef struct nrt_tensor_storage { + uint32_t hbm_idx; + size_t allocated_size; + nrt_tensor_mem_type_t type; + union { + void *dmem; // dmem associated with addr, for tensor type + // NRT_TENSOR_MEM_TYPE_DMA + uint8_t + *vmem; // malloc'ed memory for tensor type NRT_TENSOR_MEM_TYPE_MALLOC + }; + volatile uint64_t ref_count DX_CACHE_ALIGNED; + bool mem_owned_by_tensor; + + pthread_mutex_t tensor_op_cv_lock; // Lock for async exec. Used with + // `tensor_op_cv` to block the thread while + // there are still pending execs. If this + // is NULL we are not in async exec mode. + pthread_cond_t tensor_op_cv; // used to block tensor op vars + volatile uint64_t pending_exec_count_read + DX_CACHE_ALIGNED; // count of pending execs that reads this location + volatile uint64_t pending_exec_count_write + DX_CACHE_ALIGNED; // count of pending execs that writes to this location + int32_t vtpb_idx; // same as vcore->vtpb_idx but -1 if no vcore for tensor + // (used for trace api) +} nrt_tensor_storage_t; + +typedef struct nrt_tensor { + char *name; // optional name + nrt_tensor_storage_t *sto; // the actual memory represented by the tensor + // don't access directly, use helper functions to ensure correctness + // params below allow a tensor to represent a slice of the memory + // pointed by "sto" + size_t _offset; // offset within the storage + size_t _size; // tensor size + void *extra; // used to store any metadata needed by the runtime + + volatile uint64_t ref_count + DX_CACHE_ALIGNED; // refcount for tensor. Only when this is 0 can we free + // the tensor it is incremented by + // `tensor_get_reference` and decremented by + // `tensor_free`. Tensor will automatically be freed in + // `tensor_free` once ref_count is zero. + volatile uint64_t output_completion_count + DX_CACHE_ALIGNED; // used to track the completion count of an output + // tensor. 0 means not complete; 1 and above means the + // number of completions +} nrt_tensor_t; + +typedef struct H_NN { + uint32_t id; +} H_NN; + +struct nrt_model { + uint32_t start_vnc; // VirtualNeuronCore start index + uint32_t vnc_count; // number of VirtualNeuronCore(s) requested + uint32_t instance_index; // instance index which will execute on the next call + // to nrt_execute + uint32_t instance_count; // number of loaded instances + uint32_t gid; // global id, for debug + char name[256]; + H_NN h_nn[]; // kmgr model id (instance_count entries) +}; +} + namespace spike { // RAII wrapper for NRT runtime @@ -26,6 +109,7 @@ class NrtRuntime { NrtRuntime &operator=(NrtRuntime &&) = default; static uint32_t get_visible_nc_count(); + static uint32_t get_total_nc_count(); private: bool initialized_; diff --git a/spike/src/include/spike.h b/spike/src/include/spike.h index e9f1cff..60a7b44 100644 --- a/spike/src/include/spike.h +++ b/spike/src/include/spike.h @@ -4,11 +4,26 @@ #include "model.h" #include "nrt_wrapper.h" #include "tensor.h" +#include "tensor_set.h" + +#include +#include + +#include +#include +#include #include +#include #include +#include +#include +#include #include +#include #include +namespace nb = nanobind; + namespace spike { // Tensor metadata structure @@ -24,6 +39,183 @@ struct ModelTensorInfo { std::unordered_map outputs; }; +// NonBlock command structures +struct NonBlockCloseCmd {}; + +struct NonBlockTensorReadCmd { + uint64_t id; + std::shared_ptr tensor; + size_t offset; + size_t size; + void *data; + std::variant> data_obj; +}; + +struct NonBlockTensorWriteCmd { + uint64_t id; + std::shared_ptr tensor; + const void *data; + size_t size; + std::variant> data_obj; + size_t offset; +}; + +struct NonBlockTensorWriteBatchedCmd { + uint64_t id; + uint64_t batch_id; +}; + +struct NonBlockTensorReadBatchedCmd { + uint64_t id; + uint64_t batch_id; +}; + +typedef std::variant + NonBlockTensorCmd; + +struct NonBlockExecCmd { + uint64_t id; + std::shared_ptr model; + std::shared_ptr input_set; + std::shared_ptr output_set; + std::optional ntff_name; + bool save_trace; +}; + +typedef std::variant + NonBlockExecOrCloseCmd; + +// Thread-safe queue template (blocking and non-blocking versions) +template class LockedQueue { +public: + LockedQueue() { + if constexpr (!is_blocking) { + size_ = 0; + } + } + + void push(const T &value) { + std::unique_lock lk(mtx_); + q_.push(value); + lk.unlock(); + if constexpr (is_blocking) { + cv_.notify_one(); + } else { + size_.fetch_add(1, std::memory_order_release); + } + } + + void push(T &&value) { + std::unique_lock lk(mtx_); + q_.push(std::move(value)); + lk.unlock(); + if constexpr (is_blocking) { + cv_.notify_one(); + } else { + size_.fetch_add(1, std::memory_order_release); + } + } + + template void emplace(Args &&...args) { + std::unique_lock lk(mtx_); + q_.emplace(std::forward(args)...); + lk.unlock(); + if constexpr (is_blocking) { + cv_.notify_one(); + } else { + size_.fetch_add(1, std::memory_order_release); + } + } + + template std::enable_if_t pop() { + std::unique_lock lk(mtx_); + cv_.wait(lk, [this]() { return !q_.empty(); }); + T ret(std::move(q_.front())); + q_.pop(); + lk.unlock(); + return ret; + } + + // This function is only safe with one consumer + template + std::enable_if_t> try_pop() { + if (size_.load(std::memory_order_acquire) >= 1) { + size_.fetch_sub(1, std::memory_order_release); + std::lock_guard lk(mtx_); + T ret(std::move(q_.front())); + q_.pop(); + return ret; + } else { + return std::nullopt; + } + } + +private: + struct Empty {}; + + std::queue q_; + std::mutex mtx_; + std::conditional_t cv_; + std::conditional_t size_; +}; + +// NonBlock result structures (exposed to Python) +struct NonBlockTensorReadResult { + uint64_t id; + std::variant> data; + std::optional> err; +}; + +struct NonBlockTensorWriteResult { + uint64_t id; + std::optional> err; +}; + +struct NonBlockExecResult { + uint64_t id; + std::optional> err; +}; + +typedef std::variant + NonBlockResult; + +// NonBlock internal result structures (used in worker threads to avoid GIL) +// These hold shared_ptrs to keep resources alive until convert_internal_result() +// transfers nanobind objects to Result types. This two-phase pattern ensures +// nanobind/Python object destructors run in GIL context, not in worker threads. +struct NonBlockTensorReadInternalResult { + uint64_t id; + std::variant> data_obj; + std::optional> err; + + std::shared_ptr tensor; +}; + +struct NonBlockTensorWriteInternalResult { + uint64_t id; + std::optional> err; + + std::shared_ptr tensor; + std::variant> data_obj; +}; + +struct NonBlockExecInternalResult { + uint64_t id; + std::optional> err; + + std::shared_ptr model; + std::shared_ptr input_set; + std::shared_ptr output_set; +}; + +typedef std::variant + NonBlockInternalResult; + // Main Spike class - Python interface class Spike { public: @@ -72,6 +264,55 @@ class Spike { std::optional ntff_name = std::nullopt, bool save_trace = false); + // Nonblocking operations + void init_nonblock(); + + uint64_t tensor_write_nonblock(std::shared_ptr tensor, + nb::bytes data_obj, size_t offset = 0); + uint64_t tensor_write_nonblock(std::shared_ptr tensor, + nb::ndarray<> data_obj, + size_t offset = 0); + uint64_t tensor_write_nonblock(std::shared_ptr tensor, + const void *data, size_t size, + size_t offset); + + uint64_t tensor_read_nonblock(std::shared_ptr tensor, + size_t offset = 0, size_t size = 0); + uint64_t tensor_read_nonblock(std::shared_ptr tensor, + nb::ndarray<> dest, size_t offset = 0, + size_t size = 0); + + uint64_t tensor_write_nonblock_batched_prepare( + std::vector> tensors, + std::vector> data_objs, + std::optional> offsets); + uint64_t tensor_write_nonblock_batched_start(uint64_t batch_id); + + uint64_t tensor_read_nonblock_batched_prepare( + std::vector> tensors, + std::vector> dests, + std::optional> offsets, + std::optional> sizes); + uint64_t tensor_read_nonblock_batched_start(uint64_t batch_id); + + uint64_t + execute_nonblock(std::shared_ptr model, + std::shared_ptr input_set, + std::shared_ptr output_set, + std::optional ntff_name = std::nullopt, + bool save_trace = false); + + std::optional try_poll(); + + NrtTensorSet create_tensor_set( + const std::unordered_map> + &tensor_map); + + // Wrap existing NRT objects (for interop with external code) + NrtModel wrap_model(nrt_model_t *ptr); + NrtTensor wrap_tensor(nrt_tensor_t *ptr); + NrtTensorSet wrap_tensor_set(nrt_tensor_set_t *ptr); + // Model introspection ModelTensorInfo get_tensor_info(NrtModel &model); @@ -79,10 +320,32 @@ class Spike { int verbose_level_; std::unique_ptr runtime_; + // Nonblock support + uint64_t next_non_block_id_ = 0; + uint64_t next_batch_id_ = 0; + + std::vector tensor_threads_; + std::vector exec_threads_; + std::vector> tensor_queues_; + std::vector> exec_queues_; + LockedQueue noti_queue_; + + std::unordered_map> + tensor_write_batched_cmds_; + std::shared_mutex tensor_write_batched_cmds_mtx_; + + std::unordered_map> + tensor_read_batched_cmds_; + std::shared_mutex tensor_read_batched_cmds_mtx_; + + void loop_execute(int core_id); + void loop_tensor(int core_id); + // Helper methods - NrtTensorSet create_tensor_sets( + NrtTensorSet create_tensor_set( const std::unordered_map &tensor_map); std::string dtype_to_string(nrt_dtype_t dtype); + NonBlockResult convert_internal_result(NonBlockInternalResult &internal_res_); }; } // namespace spike diff --git a/spike/src/include/tensor.h b/spike/src/include/tensor.h index 6a8d190..1f2ea00 100644 --- a/spike/src/include/tensor.h +++ b/spike/src/include/tensor.h @@ -16,6 +16,9 @@ class NrtTensor { // This constructor creates an NrtTensor that owns the tensor NrtTensor(nrt_tensor_placement_t placement, uint32_t core_id, size_t size, const std::string &name, const Spike *spike); + // This constructor creates an NrtTensor that references an existing tensor + NrtTensor(nrt_tensor_t *ptr, uint32_t core_id, uint64_t size, + const std::string &name, const Spike *spike); NrtTensor(const NrtTensor &source, size_t offset, size_t size, const std::string &name); ~NrtTensor(); diff --git a/spike/src/include/tensor_set.h b/spike/src/include/tensor_set.h index 435b5ec..418817c 100644 --- a/spike/src/include/tensor_set.h +++ b/spike/src/include/tensor_set.h @@ -2,14 +2,21 @@ #define SPIKE_SRC_INCLUDE_TENSOR_SET_H #include "tensor.h" +#include #include +#include namespace spike { +class Spike; + // RAII wrapper for NRT tensor set class NrtTensorSet { public: - NrtTensorSet(); + // Constructor that creates an owned tensor set + explicit NrtTensorSet(const Spike *spike); + // Constructor to wrap an existing tensor set (non-owning) + explicit NrtTensorSet(nrt_tensor_set_t *ptr); ~NrtTensorSet(); // Non-copyable, movable @@ -18,11 +25,19 @@ class NrtTensorSet { NrtTensorSet(NrtTensorSet &&other) noexcept; NrtTensorSet &operator=(NrtTensorSet &&other) noexcept; + bool is_freed() const; + bool is_owner() const { return spike_ != nullptr; } + + void add_tensor(const std::string &name, + std::shared_ptr tensor); void add_tensor(const std::string &name, const NrtTensor &tensor); nrt_tensor_set_t *get_ptr() const { return ptr_; } + void free(); private: nrt_tensor_set_t *ptr_; + const Spike *spike_; + std::vector> tensors_; }; } // namespace spike diff --git a/spike/src/nrt_wrapper.cpp b/spike/src/nrt_wrapper.cpp index f2478fd..f6f19a0 100644 --- a/spike/src/nrt_wrapper.cpp +++ b/spike/src/nrt_wrapper.cpp @@ -74,4 +74,13 @@ uint32_t NrtRuntime::get_visible_nc_count() { return count; } +uint32_t NrtRuntime::get_total_nc_count() { + uint32_t count = 0; + NRT_STATUS status = nrt_get_total_nc_count(&count); + if (status != 0) { + throw NrtError(status, "Failed to get total NC count"); + } + return count; +} + } // namespace spike diff --git a/spike/src/python_bindings.cpp b/spike/src/python_bindings.cpp index 904f612..95ed53f 100644 --- a/spike/src/python_bindings.cpp +++ b/spike/src/python_bindings.cpp @@ -3,8 +3,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -97,6 +99,23 @@ NB_MODULE(_spike, m) { "Is collective model") .def("__repr__", &NrtModel::to_string); + // NrtTensorSet class + nb::class_(m, "NrtTensorSet"); + + // NonBlock result structures + nb::class_(m, "NonBlockTensorReadResult") + .def_ro("id", &NonBlockTensorReadResult::id) + .def_ro("data", &NonBlockTensorReadResult::data) + .def_ro("err", &NonBlockTensorReadResult::err); + + nb::class_(m, "NonBlockTensorWriteResult") + .def_ro("id", &NonBlockTensorWriteResult::id) + .def_ro("err", &NonBlockTensorWriteResult::err); + + nb::class_(m, "NonBlockExecResult") + .def_ro("id", &NonBlockExecResult::id) + .def_ro("err", &NonBlockExecResult::err); + // Spike class - uses keep_alive for safe shared ownership with tensors/models nb::class_(m, "Spike") .def(nb::init(), "verbose_level"_a = 0, @@ -215,6 +234,117 @@ NB_MODULE(_spike, m) { "Read data from tensor to Python buffer protocol object (bytearray, " "memoryview, etc.)") + // Nonblocking operations + .def("init_nonblock", &Spike::init_nonblock, + "Initialize for nonblocking operations") + + .def( + "tensor_read_nonblock", + [](Spike &self, std::shared_ptr tensor, + size_t offset, size_t size) { + return self.tensor_read_nonblock(std::move(tensor), offset, size); + }, + "tensor"_a, "offset"_a = 0, "size"_a = 0, + "Read data from tensor as bytes nonblockingly") + + .def( + "tensor_read_nonblock", + [](Spike &self, std::shared_ptr tensor, + nb::ndarray<> dest, size_t offset, size_t size) { + return self.tensor_read_nonblock(std::move(tensor), std::move(dest), + offset, size); + }, + "tensor"_a, "dest"_a, "offset"_a = 0, "size"_a = 0, + "Read data from tensor into the provided destination nonblockingly") + + .def( + "tensor_write_nonblock", + [](Spike &self, std::shared_ptr tensor, nb::bytes data_obj, + size_t offset) { + return self.tensor_write_nonblock(std::move(tensor), + std::move(data_obj), offset); + }, + "tensor"_a, "data"_a, "offset"_a = 0, + "Write bytes data to tensor nonblockingly") + + .def( + "tensor_write_nonblock", + [](Spike &self, std::shared_ptr tensor, + nb::ndarray<> data_obj, size_t offset) { + return self.tensor_write_nonblock(std::move(tensor), + std::move(data_obj), offset); + }, + "tensor"_a, "data"_a, "offset"_a = 0, + "Write ndarray data to tensor nonblockingly") + + .def( + "tensor_write_nonblock", + [](Spike &self, std::shared_ptr tensor, int64_t data, + size_t size, size_t offset) { + return self.tensor_write_nonblock( + std::move(tensor), reinterpret_cast(data), size, + offset); + }, + "tensor"_a, "data"_a, "size"_a, "offset"_a = 0, + "Write raw pointer data to tensor nonblockingly") + + .def("tensor_write_nonblock_batched_prepare", + &Spike::tensor_write_nonblock_batched_prepare, "tensors"_a, + "data_objs"_a, "offsets"_a = std::nullopt, + "Prepare a batched tensor write") + + .def("tensor_write_nonblock_batched_start", + &Spike::tensor_write_nonblock_batched_start, "batch_id"_a, + "Start a prepared batched tensor write") + + .def("tensor_read_nonblock_batched_prepare", + &Spike::tensor_read_nonblock_batched_prepare, "tensors"_a, "dests"_a, + "offsets"_a = std::nullopt, "sizes"_a = std::nullopt, + "Prepare a batched tensor read") + + .def("tensor_read_nonblock_batched_start", + &Spike::tensor_read_nonblock_batched_start, "batch_id"_a, + "Start a prepared batched tensor read") + + .def("execute_nonblock", &Spike::execute_nonblock, "model"_a, "inputs"_a, + "outputs"_a, nb::arg("ntff_name") = nb::none(), + "save_trace"_a = false, + "Execute a model with given inputs and outputs nonblockingly") + + .def("try_poll", &Spike::try_poll, "Try to poll for nonblocking results") + + .def( + "create_tensor_set", + [](Spike &self, + const std::unordered_map< + std::string, std::shared_ptr> &tensor_map) { + return self.create_tensor_set(tensor_map); + }, + "tensors"_a, "Create a tensor set with the tensors") + + // Wrap existing NRT objects (for interop with external code) + .def( + "wrap_model", + [](Spike &self, int64_t ptr) { + return self.wrap_model(reinterpret_cast(ptr)); + }, + "ptr"_a, "Wrap an existing NRT model pointer") + + .def( + "wrap_tensor", + [](Spike &self, int64_t ptr) { + return self.wrap_tensor(reinterpret_cast(ptr)); + }, + "ptr"_a, "Wrap an existing NRT tensor pointer") + + .def( + "wrap_tensor_set", + [](Spike &self, int64_t ptr) { + return self.wrap_tensor_set( + reinterpret_cast(ptr)); + }, + "ptr"_a, "Wrap an existing NRT tensor set pointer") + // Model introspection .def("get_tensor_info", &Spike::get_tensor_info, "model"_a, "Get tensor information for a model"); diff --git a/spike/src/spike.cpp b/spike/src/spike.cpp index 82efb6b..fdd14f3 100644 --- a/spike/src/spike.cpp +++ b/spike/src/spike.cpp @@ -1,5 +1,6 @@ #include "spike.h" #include +#include namespace spike { @@ -35,6 +36,24 @@ uint32_t Spike::get_visible_neuron_core_count() { } int Spike::close() { + // Shut down nonblock threads if initialized + for (auto &q : exec_queues_) { + q.emplace(NonBlockCloseCmd{}); + } + for (auto &q : tensor_queues_) { + q.emplace(NonBlockCloseCmd{}); + } + for (auto &t : exec_threads_) { + if (t.joinable()) { + t.join(); + } + } + for (auto &t : tensor_threads_) { + if (t.joinable()) { + t.join(); + } + } + runtime_.reset(); g_alive_spike_instance_exists = false; return 0; @@ -85,9 +104,11 @@ void Spike::tensor_write_from_pybuffer(NrtTensor &tensor, const void *data, tensor_write(tensor, data, data_size, offset); } -NrtTensorSet Spike::create_tensor_sets( +// Create tensor set with non-owning references (for synchronous operations). +// Caller must ensure tensors remain valid during use. Lighter weight alternative. +NrtTensorSet Spike::create_tensor_set( const std::unordered_map &tensor_map) { - NrtTensorSet tensor_set; + NrtTensorSet tensor_set(this); for (const auto &[name, tensor] : tensor_map) { // FIXME: consider time-of-check-time-of-use (TOCTOU) race condition @@ -102,18 +123,492 @@ NrtTensorSet Spike::create_tensor_sets( return tensor_set; } +// Create tensor set with shared ownership (for async/nonblocking operations). +// Takes shared_ptrs to prevent premature tensor destruction during deferred +// execution in worker threads. Essential for correct async lifetime management. +NrtTensorSet Spike::create_tensor_set( + const std::unordered_map> + &tensor_map) { + NrtTensorSet tensor_set(this); + + for (const auto &[name, tensor] : tensor_map) { + if (tensor->is_freed()) { + throw SpikeError("Tensor '" + name + + "' is freed. Unable to add it into the tensor set. " + "Please check the lifetime of the tensor."); + } + tensor_set.add_tensor(name, tensor); + } + + return tensor_set; +} + void Spike::execute(NrtModel &model, const std::unordered_map &inputs, const std::unordered_map &outputs, std::optional ntff_name, bool save_trace) { // Create tensor sets - NrtTensorSet input_set = create_tensor_sets(inputs); - NrtTensorSet output_set = create_tensor_sets(outputs); + NrtTensorSet input_set = create_tensor_set(inputs); + NrtTensorSet output_set = create_tensor_set(outputs); // Execute model.execute(input_set, output_set, ntff_name, save_trace); } +void Spike::init_nonblock() { + // Initialize nonblocking thread pools: one execution thread and one tensor + // thread per physical NeuronCore. Uses total core count (not just visible) + // to avoid visible-to-physical ID mapping. Extra threads remain blocked + // with no overhead. + + int num_nc = NrtRuntime::get_total_nc_count(); + exec_queues_ = std::vector>(num_nc); + tensor_queues_ = std::vector>(num_nc); + + for (int i = 0; i < num_nc; ++i) { + exec_threads_.emplace_back([this, i]() { loop_execute(i); }); + tensor_threads_.emplace_back([this, i]() { loop_tensor(i); }); + } +} + +uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, + nb::bytes data_obj, + size_t offset) { + uint64_t cmd_id = next_non_block_id_++; + + const void *data = data_obj.data(); + size_t size = data_obj.size(); + uint32_t core_id = tensor->get_core_id(); + + NonBlockTensorWriteCmd cmd{cmd_id, std::move(tensor), data, + size, std::move(data_obj), offset}; + tensor_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, + nb::ndarray<> data_obj, + size_t offset) { + uint64_t cmd_id = next_non_block_id_++; + + const void *data = data_obj.data(); + size_t size = data_obj.nbytes(); + uint32_t core_id = tensor->get_core_id(); + + NonBlockTensorWriteCmd cmd{cmd_id, std::move(tensor), data, + size, std::move(data_obj), offset}; + tensor_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, + const void *data, size_t size, + size_t offset) { + uint64_t cmd_id = next_non_block_id_++; + + uint32_t core_id = tensor->get_core_id(); + + NonBlockTensorWriteCmd cmd; + cmd.id = cmd_id; + cmd.tensor = std::move(tensor); + cmd.data = data; + cmd.size = size; + cmd.offset = offset; + + tensor_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +uint64_t Spike::tensor_read_nonblock(std::shared_ptr tensor, + size_t offset, size_t size) { + size = (size == 0) ? (tensor->get_size() - offset) : size; + + uint64_t cmd_id = next_non_block_id_++; + + uint32_t core_id = tensor->get_core_id(); + + nb::bytes dest(nullptr, size); + void *data = PyBytes_AsString(dest.ptr()); + + NonBlockTensorReadCmd cmd{cmd_id, std::move(tensor), offset, + size, data, std::move(dest)}; + tensor_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +uint64_t Spike::tensor_read_nonblock(std::shared_ptr tensor, + nb::ndarray<> dest, size_t offset, + size_t size) { + size = (size == 0) ? (tensor->get_size() - offset) : size; + + if (dest.nbytes() < size) { + throw SpikeError("The read operation exceeds the destination bound."); + } + + uint64_t cmd_id = next_non_block_id_++; + + uint32_t core_id = tensor->get_core_id(); + + void *data = dest.data(); + + NonBlockTensorReadCmd cmd{cmd_id, std::move(tensor), offset, + size, data, std::move(dest)}; + tensor_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +uint64_t Spike::tensor_write_nonblock_batched_prepare( + std::vector> tensors, + std::vector> data_objs, + std::optional> offsets) { + if (tensors.size() == 0) { + throw SpikeError("The batched write operation needs at least one tensor."); + } + + if (tensors.size() != data_objs.size() || + (offsets.has_value() && data_objs.size() != offsets.value().size())) { + throw SpikeError("All parameters must be lists of same length."); + } + + uint64_t batch_id = next_batch_id_++; + + std::vector cmds; + cmds.reserve(tensors.size()); + + uint32_t core_id = tensors[0]->get_core_id(); + + for (size_t i = 0; i < tensors.size(); ++i) { + std::shared_ptr tensor = std::move(tensors[i]); + nb::ndarray<> data_obj = std::move(data_objs[i]); + size_t offset; + if (offsets.has_value()) { + offset = offsets.value()[i]; + } else { + offset = 0; + } + + const void *data = data_obj.data(); + size_t size = data_obj.nbytes(); + uint32_t tensor_core_id = tensor->get_core_id(); + if (core_id != tensor_core_id) { + throw SpikeError("All tensors must be on the same NeuronCore."); + } + + NonBlockTensorWriteCmd cmd{batch_id, std::move(tensor), data, + size, std::move(data_obj), offset}; + cmds.push_back(std::move(cmd)); + } + + std::scoped_lock lk(tensor_write_batched_cmds_mtx_); + tensor_write_batched_cmds_[batch_id] = std::move(cmds); + + return batch_id; +} + +uint64_t Spike::tensor_write_nonblock_batched_start(uint64_t batch_id) { + uint64_t cmd_id = next_non_block_id_++; + + NonBlockTensorWriteBatchedCmd cmd; + cmd.id = cmd_id; + cmd.batch_id = batch_id; + + std::shared_lock lk(tensor_write_batched_cmds_mtx_); + + auto it = tensor_write_batched_cmds_.find(batch_id); + if (it == tensor_write_batched_cmds_.end()) { + lk.unlock(); + throw SpikeError("The batch ID does not exist."); + } + uint32_t core_id = it->second[0].tensor->get_core_id(); + lk.unlock(); + + tensor_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +uint64_t Spike::tensor_read_nonblock_batched_prepare( + std::vector> tensors, + std::vector> dests, + std::optional> offsets, + std::optional> sizes) { + if (tensors.size() == 0) { + throw SpikeError("The batched read operation needs at least one tensor."); + } + + if (tensors.size() != dests.size() || + (offsets.has_value() && dests.size() != offsets.value().size()) || + (sizes.has_value() && dests.size() != sizes.value().size())) { + throw SpikeError("All parameters must be lists of same length."); + } + + uint64_t batch_id = next_batch_id_++; + + std::vector cmds; + cmds.reserve(tensors.size()); + + uint32_t core_id = tensors[0]->get_core_id(); + + for (size_t i = 0; i < tensors.size(); ++i) { + std::shared_ptr tensor = std::move(tensors[i]); + nb::ndarray<> dest = std::move(dests[i]); + + size_t offset = offsets.has_value() ? offsets.value()[i] : 0; + size_t size = sizes.has_value() ? sizes.value()[i] : + (tensor->get_size() - offset); + + if (dest.nbytes() < size) { + throw SpikeError("The read operation exceeds the destination bound."); + } + + void *data = dest.data(); + uint32_t tensor_core_id = tensor->get_core_id(); + if (core_id != tensor_core_id) { + throw SpikeError("All tensors must be on the same NeuronCore."); + } + + NonBlockTensorReadCmd cmd{batch_id, std::move(tensor), offset, + size, data, std::move(dest)}; + cmds.push_back(std::move(cmd)); + } + + std::scoped_lock lk(tensor_read_batched_cmds_mtx_); + tensor_read_batched_cmds_[batch_id] = std::move(cmds); + + return batch_id; +} + +uint64_t Spike::tensor_read_nonblock_batched_start(uint64_t batch_id) { + uint64_t cmd_id = next_non_block_id_++; + + NonBlockTensorReadBatchedCmd cmd; + cmd.id = cmd_id; + cmd.batch_id = batch_id; + + std::shared_lock lk(tensor_read_batched_cmds_mtx_); + + auto it = tensor_read_batched_cmds_.find(batch_id); + if (it == tensor_read_batched_cmds_.end()) { + lk.unlock(); + throw SpikeError("The batch ID does not exist."); + } + uint32_t core_id = it->second[0].tensor->get_core_id(); + lk.unlock(); + + tensor_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +uint64_t Spike::execute_nonblock( + std::shared_ptr model, + std::shared_ptr input_set, + std::shared_ptr output_set, + std::optional ntff_name, bool save_trace) { + uint64_t cmd_id = next_non_block_id_++; + + uint32_t core_id = model->get_core_id(); + + NonBlockExecCmd cmd{cmd_id, std::move(model), std::move(input_set), + std::move(output_set), ntff_name, save_trace}; + exec_queues_[core_id].emplace(std::move(cmd)); + + return cmd_id; +} + +std::optional Spike::try_poll() { + std::optional internal_res = noti_queue_.try_pop(); + if (internal_res.has_value()) { + return convert_internal_result(internal_res.value()); + } else { + return std::nullopt; + } +} + +NrtModel Spike::wrap_model(nrt_model_t *ptr) { + uint32_t core_id = ptr->start_vnc; + uint32_t rank_id = ptr->gid; + uint32_t world_size = ptr->vnc_count; + // FIXME: Value of cc_enabled is fake, but no one uses it, so it is fine for + // the purpose. + return NrtModel(ptr, core_id, true, rank_id, world_size); +} + +NrtTensor Spike::wrap_tensor(nrt_tensor_t *ptr) { + if (ptr->sto->vtpb_idx == -1) { + // CPU tensor wrapping not supported + throw SpikeError("Wrapping a CPU tensor is not supported"); + } + + size_t size = ptr->_size; + const char *name = ptr->name; + // Use vtpb_idx as the core ID (this is the logical NC ID) + uint32_t core_id = ptr->sto->vtpb_idx; + return NrtTensor(ptr, core_id, size, name, this); +} + +NrtTensorSet Spike::wrap_tensor_set(nrt_tensor_set_t *ptr) { + return NrtTensorSet(ptr); +} + +void Spike::loop_tensor(int core_id) { + while (true) { + NonBlockTensorCmd cmd_ = tensor_queues_[core_id].pop(); + + if (std::holds_alternative(cmd_)) { + NonBlockTensorReadCmd &cmd = std::get(cmd_); + + NonBlockTensorReadInternalResult res; + + try { + cmd.tensor->read(cmd.data, cmd.size, cmd.offset); + res.err = std::nullopt; + } catch (SpikeError &err) { + res.err = std::move(err); + } catch (NrtError &err) { + res.err = std::move(err); + } + + res.id = cmd.id; + res.tensor = std::move(cmd.tensor); + res.data_obj = std::move(cmd.data_obj); + + noti_queue_.emplace(std::move(res)); + } else if (std::holds_alternative(cmd_)) { + NonBlockTensorWriteCmd &cmd = std::get(cmd_); + + NonBlockTensorWriteInternalResult res; + + try { + cmd.tensor->write(cmd.data, cmd.size, cmd.offset); + res.err = std::nullopt; + } catch (SpikeError &err) { + res.err = std::move(err); + } catch (NrtError &err) { + res.err = std::move(err); + } + + res.id = cmd.id; + res.tensor = std::move(cmd.tensor); + res.data_obj = std::move(cmd.data_obj); + + noti_queue_.emplace(std::move(res)); + } else if (std::holds_alternative(cmd_)) { + NonBlockTensorWriteBatchedCmd &batched_cmd = + std::get(cmd_); + + NonBlockTensorWriteInternalResult res; + res.id = batched_cmd.id; + + std::shared_lock lk(tensor_write_batched_cmds_mtx_); + + auto &cmds = tensor_write_batched_cmds_[batched_cmd.batch_id]; + + for (auto &cmd : cmds) { + try { + cmd.tensor->write(cmd.data, cmd.size, cmd.offset); + res.err = std::nullopt; + } catch (SpikeError &err) { + res.err = std::move(err); + } catch (NrtError &err) { + res.err = std::move(err); + } + + if (res.err.has_value()) { + break; + } + } + + noti_queue_.emplace(std::move(res)); + } else if (std::holds_alternative(cmd_)) { + NonBlockTensorReadBatchedCmd &batched_cmd = + std::get(cmd_); + + NonBlockTensorReadInternalResult res; + res.id = batched_cmd.id; + + std::shared_lock lk(tensor_read_batched_cmds_mtx_); + + auto &cmds = tensor_read_batched_cmds_[batched_cmd.batch_id]; + + for (auto &cmd : cmds) { + try { + cmd.tensor->read(cmd.data, cmd.size, cmd.offset); + res.err = std::nullopt; + } catch (SpikeError &err) { + res.err = std::move(err); + } catch (NrtError &err) { + res.err = std::move(err); + } + + if (res.err.has_value()) { + break; + } + } + + noti_queue_.emplace(std::move(res)); + } else { + break; + } + } +} + +void Spike::loop_execute(int core_id) { + while (true) { + NonBlockExecOrCloseCmd cmd_ = exec_queues_[core_id].pop(); + + if (std::holds_alternative(cmd_)) { + NonBlockExecCmd &cmd = std::get(cmd_); + + NonBlockExecInternalResult res; + + try { + cmd.model->execute(*cmd.input_set, *cmd.output_set, cmd.ntff_name, + cmd.save_trace); + res.err = std::nullopt; + } catch (SpikeError &err) { + res.err = std::move(err); + } catch (NrtError &err) { + res.err = std::move(err); + } + + res.id = cmd.id; + res.model = std::move(cmd.model); + res.input_set = std::move(cmd.input_set); + res.output_set = std::move(cmd.output_set); + + noti_queue_.emplace(std::move(res)); + } else { + break; + } + } +} + +// Convert InternalResult to Result in GIL context. +// Moves nanobind objects to Result, leaving shared_ptrs in InternalResult to be +// destroyed here (with GIL held). This avoids calling Python object destructors +// in worker threads, preventing GIL deadlocks and context switch overhead. +NonBlockResult Spike::convert_internal_result(NonBlockInternalResult &internal_res_) { + if (std::holds_alternative(internal_res_)) { + NonBlockTensorReadInternalResult &internal_res = std::get(internal_res_); + NonBlockTensorReadResult res{internal_res.id, std::move(internal_res.data_obj), std::move(internal_res.err)}; + return res; + } else if (std::holds_alternative(internal_res_)) { + NonBlockTensorWriteInternalResult &internal_res = std::get(internal_res_); + NonBlockTensorWriteResult res{internal_res.id, std::move(internal_res.err)}; + return res; + } else { + NonBlockExecInternalResult &internal_res = std::get(internal_res_); + return NonBlockExecResult{internal_res.id, std::move(internal_res.err)}; + } +} + + std::string Spike::dtype_to_string(nrt_dtype_t dtype) { switch (dtype) { case NRT_DTYPE_FLOAT32: diff --git a/spike/src/spike/__init__.py b/spike/src/spike/__init__.py index a8c6224..9570748 100644 --- a/spike/src/spike/__init__.py +++ b/spike/src/spike/__init__.py @@ -1,21 +1,33 @@ from ._spike import ( ModelTensorInfo, + NonBlockExecResult, + NonBlockTensorReadResult, + NonBlockTensorWriteResult, NrtError, NrtModel, NrtTensor, + NrtTensorSet, + Spike, SpikeError, SystemTraceSession, TensorMetadata, ) from .profiler_adapter import SpikeProfiler +from .spike_async import SpikeAsync, SpikeStream from .spike_model import BenchmarkResult, SpikeModel from .spike_singleton import configure, get_spike_singleton, reset from .spike_tensor import SpikeTensor +# Type alias for convenience +NonBlockResult = NonBlockTensorReadResult | NonBlockTensorWriteResult | NonBlockExecResult + __all__ = [ "SpikeModel", "SpikeTensor", "SpikeProfiler", + "SpikeAsync", + "SpikeStream", + "Spike", "configure", "reset", "get_spike_singleton", @@ -26,5 +38,10 @@ "ModelTensorInfo", "NrtModel", "NrtTensor", + "NrtTensorSet", "TensorMetadata", + "NonBlockTensorReadResult", + "NonBlockTensorWriteResult", + "NonBlockExecResult", + "NonBlockResult", ] diff --git a/spike/src/spike/_spike.pyi b/spike/src/spike/_spike.pyi index e2fc986..0f85892 100644 --- a/spike/src/spike/_spike.pyi +++ b/spike/src/spike/_spike.pyi @@ -1,6 +1,9 @@ """NKIPy Spike Runtime C++ bindings""" -from collections.abc import Mapping +from collections.abc import Mapping, Sequence +from typing import overload + +from numpy.typing import NDArray class SpikeRuntimeError(RuntimeError): @@ -99,6 +102,33 @@ class NrtModel: def __repr__(self) -> str: ... +class NrtTensorSet: + pass + +class NonBlockTensorReadResult: + @property + def id(self) -> int: ... + + @property + def data(self) -> bytes | NDArray: ... + + @property + def err(self) -> "spike::SpikeError" | "spike::NrtError" | None: ... + +class NonBlockTensorWriteResult: + @property + def id(self) -> int: ... + + @property + def err(self) -> "spike::SpikeError" | "spike::NrtError" | None: ... + +class NonBlockExecResult: + @property + def id(self) -> int: ... + + @property + def err(self) -> "spike::SpikeError" | "spike::NrtError" | None: ... + class Spike: def __init__(self, verbose_level: int = 0) -> None: """Initialize Spike with verbose level""" @@ -144,5 +174,58 @@ class Spike: Read data from tensor to Python buffer protocol object (bytearray, memoryview, etc.) """ + def init_nonblock(self) -> None: + """Initialize for nonblocking operations""" + + @overload + def tensor_read_nonblock(self, tensor: NrtTensor, offset: int = 0, size: int = 0) -> int: + """Read data from tensor as bytes nonblockingly""" + + @overload + def tensor_read_nonblock(self, tensor: NrtTensor, dest: NDArray, offset: int = 0, size: int = 0) -> int: + """Read data from tensor into the provided destination nonblockingly""" + + @overload + def tensor_write_nonblock(self, tensor: NrtTensor, data: bytes, offset: int = 0) -> int: + """Write bytes data to tensor nonblockingly""" + + @overload + def tensor_write_nonblock(self, tensor: NrtTensor, data: NDArray, offset: int = 0) -> int: + """Write ndarray data to tensor nonblockingly""" + + @overload + def tensor_write_nonblock(self, tensor: NrtTensor, data: int, size: int, offset: int = 0) -> int: + """Write raw pointer data to tensor nonblockingly""" + + def tensor_write_nonblock_batched_prepare(self, tensors: Sequence[NrtTensor], data_objs: Sequence[NDArray], offsets: Sequence[int] | None = None) -> int: + """Prepare a batched tensor write""" + + def tensor_write_nonblock_batched_start(self, batch_id: int) -> int: + """Start a prepared batched tensor write""" + + def tensor_read_nonblock_batched_prepare(self, tensors: Sequence[NrtTensor], dests: Sequence[NDArray], offsets: Sequence[int] | None = None, sizes: Sequence[int] | None = None) -> int: + """Prepare a batched tensor read""" + + def tensor_read_nonblock_batched_start(self, batch_id: int) -> int: + """Start a prepared batched tensor read""" + + def execute_nonblock(self, model: NrtModel, inputs: NrtTensorSet, outputs: NrtTensorSet, ntff_name: str | None = None, save_trace: bool = False) -> int: + """Execute a model with given inputs and outputs nonblockingly""" + + def try_poll(self) -> NonBlockTensorReadResult | NonBlockTensorWriteResult | NonBlockExecResult | None: + """Try to poll for nonblocking results""" + + def create_tensor_set(self, tensors: Mapping[str, NrtTensor]) -> NrtTensorSet: + """Create a tensor set with the tensors""" + + def wrap_model(self, ptr: int) -> NrtModel: + """Wrap an existing NRT model pointer""" + + def wrap_tensor(self, ptr: int) -> NrtTensor: + """Wrap an existing NRT tensor pointer""" + + def wrap_tensor_set(self, ptr: int) -> NrtTensorSet: + """Wrap an existing NRT tensor set pointer""" + def get_tensor_info(self, model: NrtModel) -> ModelTensorInfo: """Get tensor information for a model""" diff --git a/spike/src/spike/spike_async.py b/spike/src/spike/spike_async.py new file mode 100644 index 0000000..d9b7bda --- /dev/null +++ b/spike/src/spike/spike_async.py @@ -0,0 +1,416 @@ +"""Async/await interface for Spike runtime operations.""" + +from __future__ import annotations + +from typing import Any, Callable, Coroutine, Optional, Sequence +from ._spike import ( + NonBlockExecResult, + NonBlockTensorReadResult, + NonBlockTensorWriteResult, + NrtModel, + NrtTensor, + NrtTensorSet, + Spike, +) + +import asyncio +import numpy as np + + +class SpikeAsyncSelector: + """Event selector for async operations.""" + + def __init__(self, spike: Spike) -> None: + self._spike: Spike = spike + + def select(self, timeout: float | None) -> list[NonBlockResult]: + """Poll for completed operations.""" + res = self._spike.try_poll() + if res is None: + return [] + else: + return [res] + + +class SpikeAsyncFuture(asyncio.Future[Any]): + """Future that can be waited on synchronously.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(SpikeAsyncFuture, self).__init__(*args, **kwargs) + + def wait(self) -> Any: + """Wait for the future to complete (blocking).""" + return self._loop.run_until_complete(self) + + +class SpikeAsyncTask(asyncio.Task[Any]): + """Task that can be waited on synchronously.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(SpikeAsyncTask, self).__init__(*args, **kwargs) + + def wait(self) -> Any: + """Wait for the task to complete (blocking).""" + return self._loop.run_until_complete(self) + + +class SpikeAsyncEventLoop(asyncio.BaseEventLoop): + """Custom event loop for Spike async operations.""" + + def __init__(self, selector: SpikeAsyncSelector) -> None: + super().__init__() + self._selector: SpikeAsyncSelector = selector + + def task_factory( + loop: asyncio.AbstractEventLoop, + coro: Coroutine[Any, Any, Any], + **kwargs: Any + ) -> SpikeAsyncTask: + return SpikeAsyncTask(coro, loop=loop, **kwargs) + + self.set_task_factory(task_factory) + + self._tensor_ops: dict[int, SpikeAsyncFuture] = {} + self._exec_ops: dict[int, SpikeAsyncFuture] = {} + + def create_future(self) -> SpikeAsyncFuture: + """Create a future for this event loop.""" + return SpikeAsyncFuture(loop=self) + + def register_tensor_op(self, req_id: int, fut: SpikeAsyncFuture) -> None: + """Register a tensor operation future.""" + assert req_id not in self._tensor_ops + self._tensor_ops[req_id] = fut + + def register_exec_op(self, req_id: int, fut: SpikeAsyncFuture) -> None: + """Register an execution operation future.""" + assert req_id not in self._exec_ops + self._exec_ops[req_id] = fut + + def _process_events(self, event_list: list[NonBlockResult]) -> None: + """Process completed events.""" + for event in event_list: + match event: + case NonBlockTensorWriteResult(id=id, err=err): + fut = self._tensor_ops[id] + if err is None: + fut.set_result(None) + else: + fut.set_exception(err) + + del self._tensor_ops[id] + + case NonBlockTensorReadResult(id=id, data=data, err=err): + fut = self._tensor_ops[id] + if err is None: + fut.set_result(data) + else: + fut.set_exception(err) + + del self._tensor_ops[id] + + case NonBlockExecResult(id=id, err=err): + fut = self._exec_ops[id] + if err is None: + fut.set_result(None) + else: + fut.set_exception(err) + + del self._exec_ops[id] + + +class SpikeAsync: + """Async interface for Spike runtime.""" + + def __init__(self, verbose_level: int = 0) -> None: + self.spike: Spike = Spike(verbose_level=verbose_level) + self.spike.init_nonblock() + + self._selector: SpikeAsyncSelector = SpikeAsyncSelector(self.spike) + self._loop: SpikeAsyncEventLoop = SpikeAsyncEventLoop(self._selector) + + self._default_stream: SpikeStream | None = None + + def _wrapper( + self, + closure: Callable[[], SpikeAsyncFuture], + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None, + stream: SpikeStream | None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Wrap an operation with dependency and stream management.""" + # Try to use default stream if no stream is explicitly specified + if stream is None: + stream = self._default_stream + + # If no explicit dependency, see if there is any stream induced dependency + if deps is None or len(deps) == 0: + if stream is None: + # No stream, fast path + return closure() + elif stream._last_fut is None: + # There is stream but no stream induced dependency + # Fast path, but also put the current op to the stream + fut = closure() + stream._last_fut = fut + return fut + + # If there is a stream induced dependency, add it to explicit dependency + if stream is not None and stream._last_fut is not None: + if deps is None: + deps = [] + else: + deps = list(deps) # Make mutable copy + + deps.append(stream._last_fut) + + # Heavy path for having any dependency + async def internal_async(): + for dep in deps: + await dep + return await closure() + + fut = self._loop.create_task(internal_async()) + + if stream is not None: + stream._last_fut = fut + + return fut + + def create_stream(self) -> SpikeStream: + """Create a new stream for operation sequencing.""" + return SpikeStream(self) + + def create_tensor_set( + self, + tensor_map: dict[str, NrtTensor] + ) -> NrtTensorSet: + """Create a tensor set from a dictionary of tensors. + + Convenience method for spike.create_tensor_set(). + """ + return self.spike.create_tensor_set(tensor_map) + + def load_model( + self, + neff_file: str, + core_id: int = 0, + cc_enabled: bool = False, + rank_id: int = 0, + world_size: int = 1 + ) -> NrtModel: + """Load a model from a NEFF file. + + Convenience method for spike.load_model(). + """ + return self.spike.load_model(neff_file, core_id, cc_enabled, rank_id, world_size) + + def allocate_tensor( + self, + size: int, + core_id: int = 0, + name: str | None = None + ) -> NrtTensor: + """Allocate a tensor on a NeuronCore. + + Convenience method for spike.allocate_tensor(). + """ + return self.spike.allocate_tensor(size, core_id, name) + + def tensor_write( + self, + tensor: NrtTensor, + data: Any, + offset: int = 0, + size: int = 0, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Write to a tensor asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + if size > 0: + req_id = self.spike.tensor_write_nonblock(tensor, data, size, offset) + else: + req_id = self.spike.tensor_write_nonblock(tensor, data, offset) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def tensor_write_batched_start( + self, + batch_id: int, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Start a batched tensor write asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + req_id = self.spike.tensor_write_nonblock_batched_start(batch_id) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def tensor_read_batched_start( + self, + batch_id: int, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Start a batched tensor read asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + req_id = self.spike.tensor_read_nonblock_batched_start(batch_id) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def tensor_read( + self, + tensor: NrtTensor, + dest: Any = None, + offset: int = 0, + size: int = 0, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Read from a tensor asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + if dest is None: + req_id = self.spike.tensor_read_nonblock(tensor, offset, size) + else: + req_id = self.spike.tensor_read_nonblock(tensor, dest, offset, size) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def execute( + self, + model: NrtModel, + inputs: NrtTensorSet, + outputs: NrtTensorSet, + ntff_name: str | None = None, + save_trace: bool = False, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Execute a model asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + req_id = self.spike.execute_nonblock( + model, inputs, outputs, ntff_name, save_trace + ) + self._loop.register_exec_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def custom_op( + self, + coro: Coroutine[Any, Any, Any], + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncTask: + """Submit a custom coroutine as an operation.""" + # Try to use default stream if no stream is explicitly specified + if stream is None: + stream = self._default_stream + + # If no explicit dependency, see if there is any stream induced dependency + if deps is None or len(deps) == 0: + if stream is None: + # No stream, fast path + return self._loop.create_task(coro) + elif stream._last_fut is None: + # There is stream but no stream induced dependency + # Fast path, but also put the current op to the stream + fut = self._loop.create_task(coro) + stream._last_fut = fut + return fut + + # If there is a stream induced dependency, add it to explicit dependency + if stream is not None and stream._last_fut is not None: + if deps is None: + deps = [] + else: + deps = list(deps) # Make mutable copy + + deps.append(stream._last_fut) + + # Heavy path for having any dependency + async def internal_async() -> Any: + for dep in deps: + await dep + return await coro + + fut = self._loop.create_task(internal_async()) + + if stream is not None: + stream._last_fut = fut + + return fut + + def submit( + self, + coro: Coroutine[Any, Any, Any], + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncTask: + """Submit a coroutine to the event loop.""" + return self.custom_op(coro, deps, stream) + + @staticmethod + async def all(futs: Sequence[SpikeAsyncFuture | SpikeAsyncTask]) -> list[Any]: + """Wait for all futures to complete.""" + res: list[Any] = [] + for fut in futs: + res.append(await fut) + return res + + +class SpikeStream: + """Stream for sequencing operations.""" + + def __init__(self, spike_async: SpikeAsync) -> None: + self._last_fut: SpikeAsyncFuture | SpikeAsyncTask | None = None + self._spike_async: SpikeAsync = spike_async + self._prev_stream: SpikeStream | None = None + + def __enter__(self) -> SpikeStream: + """Enter context manager, making this the default stream.""" + self._prev_stream = self._spike_async._default_stream + self._spike_async._default_stream = self + return self + + def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: + """Exit context manager, restoring previous default stream.""" + self._spike_async._default_stream = self._prev_stream + self._prev_stream = None + + def wait(self) -> Any: + """Wait for all operations in this stream to complete.""" + if self._last_fut is not None: + return self._last_fut.wait() + + def record_event(self) -> SpikeAsyncFuture | SpikeAsyncTask | None: + """Record an event at the current point in the stream.""" + return self._last_fut + + def wait_event(self, event: SpikeAsyncFuture | SpikeAsyncTask) -> None: + """Wait for an event from another stream.""" + + async def internal(fut: SpikeAsyncFuture | SpikeAsyncTask | None) -> None: + if fut is not None: + await fut + await event + + self._last_fut = self._spike_async.custom_op(internal(self._last_fut)) diff --git a/spike/src/tensor.cpp b/spike/src/tensor.cpp index 1722fab..b5df6de 100644 --- a/spike/src/tensor.cpp +++ b/spike/src/tensor.cpp @@ -19,6 +19,20 @@ NrtTensor::NrtTensor(nrt_tensor_placement_t placement, uint32_t core_id, } } +NrtTensor::NrtTensor(nrt_tensor_t *ptr, uint32_t core_id, uint64_t size, + const std::string &name, const Spike *spike) + : ptr_(nullptr), core_id_(core_id), size_(size), name_(name), + spike_(spike) { + // Wrap an existing tensor by creating a slice + NRT_STATUS status = + nrt_tensor_allocate_slice(ptr, 0, size, name.c_str(), &ptr_); + if (status != 0) { + ptr_ = nullptr; + throw NrtError(status, + "Failed to wrap a raw tensor by allocating a tensor slice"); + } +} + NrtTensor::NrtTensor(const NrtTensor &source, size_t offset, size_t size, const std::string &name) : ptr_(nullptr), core_id_(source.core_id_), size_(size), name_(name), diff --git a/spike/src/tensor_set.cpp b/spike/src/tensor_set.cpp index 41ff30d..28d60a1 100644 --- a/spike/src/tensor_set.cpp +++ b/spike/src/tensor_set.cpp @@ -1,37 +1,50 @@ #include "tensor_set.h" +#include "spike.h" namespace spike { -NrtTensorSet::NrtTensorSet() : ptr_(nullptr) { +NrtTensorSet::NrtTensorSet(const Spike *spike) + : ptr_(nullptr), spike_(spike) { NRT_STATUS status = nrt_allocate_tensor_set(reinterpret_cast(&ptr_)); if (status != 0) { throw NrtError(status, "Failed to allocate tensor set"); } } -NrtTensorSet::~NrtTensorSet() { - if (ptr_) { - nrt_destroy_tensor_set(reinterpret_cast(&ptr_)); - } -} +NrtTensorSet::NrtTensorSet(nrt_tensor_set_t *ptr) + : ptr_(ptr), spike_(nullptr) {} + +NrtTensorSet::~NrtTensorSet() { free(); } -NrtTensorSet::NrtTensorSet(NrtTensorSet &&other) noexcept : ptr_(other.ptr_) { +NrtTensorSet::NrtTensorSet(NrtTensorSet &&other) noexcept + : ptr_(other.ptr_), spike_(other.spike_), + tensors_(std::move(other.tensors_)) { other.ptr_ = nullptr; } NrtTensorSet &NrtTensorSet::operator=(NrtTensorSet &&other) noexcept { if (this != &other) { - if (ptr_) { - nrt_destroy_tensor_set(reinterpret_cast(&ptr_)); - } + free(); ptr_ = other.ptr_; + spike_ = other.spike_; + tensors_ = std::move(other.tensors_); other.ptr_ = nullptr; } return *this; } +bool NrtTensorSet::is_freed() const { + return ptr_ == nullptr || (is_owner() && spike_->is_closed()); +} + +void NrtTensorSet::add_tensor(const std::string &name, + std::shared_ptr tensor) { + add_tensor(name, *tensor); + tensors_.push_back(std::move(tensor)); +} + void NrtTensorSet::add_tensor(const std::string &name, - const NrtTensor &tensor) { + const NrtTensor &tensor) { NRT_STATUS status = nrt_add_tensor_to_tensor_set(ptr_, name.c_str(), tensor.get_ptr()); if (status != 0) { @@ -39,4 +52,14 @@ void NrtTensorSet::add_tensor(const std::string &name, } } +void NrtTensorSet::free() { + if (is_freed() || !is_owner()) { + // Do not throw exception as this is perfectly fine + return; + } + + nrt_destroy_tensor_set(reinterpret_cast(&ptr_)); + ptr_ = nullptr; +} + } // namespace spike diff --git a/spike/tests/conftest.py b/spike/tests/conftest.py new file mode 100644 index 0000000..78c8589 --- /dev/null +++ b/spike/tests/conftest.py @@ -0,0 +1,16 @@ +def pytest_addoption(parser): + parser.addoption( + "--test-mode", + default="correctness", + choices=["correctness", "overlapping"], + help=( + "Test mode: 'correctness' uses small random tensors to verify numerical accuracy; " + "'overlapping' uses large zero tensors to demonstrate async operation overlap." + ), + ) + parser.addoption( + "--num-pipelines", + type=int, + default=3, + help="Number of concurrent pipelines to allocate and run in test_spike_async.py tests.", + ) diff --git a/spike/tests/test_spike_async.py b/spike/tests/test_spike_async.py new file mode 100644 index 0000000..520a81c --- /dev/null +++ b/spike/tests/test_spike_async.py @@ -0,0 +1,210 @@ +""" +Tests for Spike async/nonblock functionality. + +NOTE: These tests require actual NeuronCore hardware and a compiled NEFF model. +They serve as integration tests and usage examples. +""" + +import os +import pytest +import numpy as np + +try: + from spike._spike import Spike + + from spike import reset as spike_reset + + available_core_count = Spike.get_visible_neuron_core_count() + if available_core_count < 1: + pytest.skip( + "Skipping all tests: No compatible Neuron hardware detected", + allow_module_level=True, + ) + +except ImportError as e: + pytest.skip(f"Required packages not available: {e}", allow_module_level=True) + +@pytest.fixture(scope="module") +def neff_path(request): + """Compile a matmul kernel for testing. + + Shape is chosen based on --test-mode: + correctness -- small (128x256) x (256x128), fast enough for CPU reference check + overlapping -- large (4096x8192) x (8192x4096), long enough to exhibit async overlap + """ + from nkipy.core.compile import compile_to_neff, trace + + def my_matmul(x, y): + return np.matmul(x, y) + + test_mode = request.config.getoption("--test-mode") + if test_mode == "overlapping": + x = np.zeros((4096, 8192), dtype=np.float16) + y = np.zeros((8192, 4096), dtype=np.float16) + else: # correctness + x = np.zeros((128, 256), dtype=np.float16) + y = np.zeros((256, 128), dtype=np.float16) + + traced_kernel = trace(my_matmul) + traced_kernel.specialize(x, y) + + output_dir = os.path.join(os.path.dirname(__file__), "artifacts") + neff_path = compile_to_neff(trace_kernel=traced_kernel, output_dir=output_dir) + + return neff_path + + +@pytest.fixture(scope="module") +def spike_async(): + """Initialize SpikeAsync for testing.""" + from spike import SpikeAsync + + spike_async = SpikeAsync() + yield spike_async + spike_async.spike.close() + + +@pytest.fixture(scope="module") +def model_and_tensors(request, neff_path, spike_async): + """Load model and prepare input/output tensors for matmul. + + Data and shape are chosen based on --test-mode: + correctness -- small (128x256) x (256x128) with random values for numerical verification + overlapping -- large (4096x8192) x (8192x4096) with zeros to demonstrate async overlap + """ + # Load model + model = spike_async.load_model(neff_path, core_id=0) + + # Create input arrays for matmul + test_mode = request.config.getoption("--test-mode") + if test_mode == "overlapping": + x = np.zeros((4096, 8192), dtype=np.float16) + y = np.zeros((8192, 4096), dtype=np.float16) + else: # correctness + x = np.random.rand(128, 256).astype(np.float16) + y = np.random.rand(256, 128).astype(np.float16) + + num_pipelines = request.config.getoption("--num-pipelines") + + # Allocate input tensors for concurrent pipelines + inputs_x = [] + inputs_y = [] + input_sets = [] + for i in range(num_pipelines): + input_x = spike_async.allocate_tensor(size=x.nbytes, core_id=0, name=f"input_x_{i}") + input_y = spike_async.allocate_tensor(size=y.nbytes, core_id=0, name=f"input_y_{i}") + inputs_x.append(input_x) + inputs_y.append(input_y) + input_sets.append(spike_async.create_tensor_set({"x": input_x, "y": input_y})) + + # Allocate output tensors for concurrent pipelines + output_shape = (x.shape[0], y.shape[1]) + output_size = np.prod(output_shape) * x.dtype.itemsize + outputs = [] + output_sets = [] + output_arrays = [] # Pre-allocated CPU buffers for reading outputs + for i in range(num_pipelines): + out = spike_async.allocate_tensor(size=output_size, core_id=0, name=f"output_{i}") + outputs.append(out) + output_sets.append(spike_async.create_tensor_set({"output0": out})) + # Pre-allocate CPU array for zero-copy tensor_read + output_arrays.append(np.empty(output_shape, dtype=x.dtype)) + + print('Done initializing inputs, outputs, and model.') + + return model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays + + +@pytest.fixture(scope="module") +def expected_output(model_and_tensors): + """Compute expected matmul output once for all tests.""" + _, x, y, *_ = model_and_tensors + return np.matmul(x, y) + + +def test_stream_api(spike_async, model_and_tensors, expected_output): + """Test stream-based API for operation sequencing.""" + model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors + + # Use stream APIs - each iteration uses different input and output tensors + streams = [] + + for i in range(len(inputs_x)): + with spike_async.create_stream() as stream: + spike_async.tensor_write(inputs_x[i], x) + spike_async.tensor_write(inputs_y[i], y) + spike_async.execute(model, input_sets[i], output_sets[i]) + spike_async.tensor_read(outputs[i], output_arrays[i]) + streams.append(stream) + + # Verify outputs - data is already in pre-allocated arrays + for i, stream in enumerate(streams): + stream.wait() + assert np.allclose(output_arrays[i], expected_output, rtol=1e-3), f"Output mismatch for pipeline {i}" + + +def test_explicit_dependency_api(spike_async, model_and_tensors, expected_output): + """Test explicit dependency management API.""" + model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors + + # Use explicit dependency APIs - each iteration uses different input and output tensors + read_futs = [] + + for i in range(len(inputs_x)): + write_x_fut = spike_async.tensor_write(inputs_x[i], x) + write_y_fut = spike_async.tensor_write(inputs_y[i], y) + exec_fut = spike_async.execute(model, input_sets[i], output_sets[i], deps=[write_x_fut, write_y_fut]) + read_fut = spike_async.tensor_read(outputs[i], output_arrays[i], deps=[exec_fut]) + read_futs.append(read_fut) + + # Verify outputs - data is already in pre-allocated arrays + for i, read_fut in enumerate(read_futs): + read_fut.wait() + assert np.allclose(output_arrays[i], expected_output, rtol=1e-3), f"Output mismatch for pipeline {i}" + + +def test_coroutine_api(spike_async, model_and_tensors, expected_output): + """Test coroutine/async-await API.""" + model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors + + async def inference_pipeline(pipeline_idx): + await spike_async.tensor_write(inputs_x[pipeline_idx], x) + await spike_async.tensor_write(inputs_y[pipeline_idx], y) + await spike_async.execute(model, input_sets[pipeline_idx], output_sets[pipeline_idx]) + await spike_async.tensor_read(outputs[pipeline_idx], output_arrays[pipeline_idx]) + + # Use coroutine APIs - each pipeline uses different input and output tensors + futs = [] + for i in range(len(inputs_x)): + fut = spike_async.submit(inference_pipeline(i)) + futs.append(fut) + + # Verify outputs - data is already in pre-allocated arrays + for i, fut in enumerate(futs): + fut.wait() + assert np.allclose(output_arrays[i], expected_output, rtol=1e-3), f"Output mismatch for pipeline {i}" + + +def test_tensor_read_write(spike_async): + """Test basic tensor read/write operations.""" + # Allocate a test tensor + tensor_size = 1024 * 1024 # 1MB + tensor = spike_async.allocate_tensor(size=tensor_size, core_id=0, name="test_tensor") + + # Create test data + test_data = np.random.rand(tensor_size // 8).astype(np.float64) + test_bytes = test_data.tobytes() + + # Submit async write and read operations + write_fut = spike_async.tensor_write(tensor, test_bytes) + read_fut = spike_async.tensor_read(tensor, deps=[write_fut]) + + # Wait for completion + read_data = read_fut.wait() + + # Verify data + read_array = np.frombuffer(read_data, dtype=np.float64) + assert np.array_equal(read_array, test_data), "Tensor read/write data mismatch" + +if __name__ == "__main__": + pytest.main(["-v", __file__]) \ No newline at end of file From 5732e19ba50423f1b5cec6cab511a5fc33b86f09 Mon Sep 17 00:00:00 2001 From: Kelvin Ng Date: Mon, 23 Mar 2026 15:30:26 -0700 Subject: [PATCH 2/5] Add a section to explain when to use SpikeAsync --- spike/ASYNC_API.md | 152 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/spike/ASYNC_API.md b/spike/ASYNC_API.md index b0fc8d6..0ff1bd9 100644 --- a/spike/ASYNC_API.md +++ b/spike/ASYNC_API.md @@ -11,6 +11,158 @@ The Spike runtime supports two levels of asynchronous operation: **Most users should use the high-level Async API**, which provides a cleaner interface and automatic dependency management. +## When to Use SpikeAsync + +### Overlapping Operations for Better Hardware Utilization + +A typical inference task follows a sequential pattern: **tensor write → execute → tensor read**. Done naively, each step blocks the next, leaving hardware idle. + +SpikeAsync lets you overlap these stages across iterations so that, while one iteration is executing on the device, the next is writing its inputs and the previous is reading its outputs: + +``` +tensor_write -> execute -> tensor_read + tensor_write -> execute -> tensor_read + tensor_write -> execute -> tensor_read +``` + +A more advanced example is **dynamic multi-LoRA**, where you can overlap loading an adapter from host memory with the execution of the current request — hiding the adapter load latency entirely. + +With SpikeAsync, this looks like natural sequential code: + +```python +async def inference(model, input_data, input_tensor, output_tensor): + await spike_async.tensor_write(input_tensor, input_data) + await spike_async.execute(model, input_set, output_set) + return await spike_async.tensor_read(output_tensor) + +for i in range(n): + spike_async.submit(inference(model, data[i], in_tensor[i], out_tensor[i])) +``` + +### Why Not Just Use Stream APIs? + +You may wonder whether CUDA-style stream APIs already solve this. They do provide asynchronous dispatch, but two problems make them awkward for this pattern. + +#### Problem 1: CPU Work Cannot Overlap Naturally + +Streams sequence GPU/device operations, but inserting CPU logic between async operations breaks the overlap. Consider a loop that runs a matmul on device, copies the result to host, then runs a CPU function on it: + +```cpp +// Attempt with CUDA streams +for (int i = 0; i < n; ++i) { + cudaStream_t stream; + cudaStreamCreate(&stream); + + matmul<<<..., stream>>>(dev_out[i], dev_in[i]); + cudaMemcpyAsync(host_out[i], dev_out[i], stream); + cpu_function(host_out[i]); // WRONG: runs before matmul/copy finish +} +``` + +Adding `cudaStreamSynchronize` fixes correctness but kills overlap — the CPU waits for each iteration before the next begins: + +``` +Matmul -> Copy -> CPU + Matmul -> Copy -> CPU + Matmul -> Copy -> CPU +``` + +Pulling the CPU work out after a `cudaDeviceSynchronize` improves device overlap, but all CPU work is serialized at the end — you cannot overlap CPU work with device work: + +``` +Matmul -> Copy + Matmul -> Copy + Matmul -> Copy + CPU -> CPU -> CPU +``` + +The core issue is that **stream APIs have no mechanism to resume CPU-side logic precisely when a specific prior operation completes**, without callbacks or restructuring the program into a state machine. + +SpikeAsync achieves the fully-overlapped target pipeline naturally: + +``` +Matmul -> Copy -> CPU + Matmul -> Copy -> CPU + Matmul -> Copy -> CPU +``` + +```python +async def pipeline(dev_in, dev_out): + await spike_async.execute(matmul_model, dev_in, dev_out) + host_out = await spike_async.tensor_read(dev_out) + cpu_function(host_out) + +for i in range(n): + spike_async.submit(pipeline(dev_in[i], dev_out[i])) +``` + +Each `await` suspends only that coroutine until the awaited operation finishes, while other submitted coroutines continue to make progress — including their CPU work. + +#### Problem 2: Fine-Grained Dependency Control Is Unnatural + +Stream APIs model dependencies coarsely: all operations in a stream are ordered, and cross-stream synchronization requires explicit events between streams. Consider this dependency graph, which arises naturally when two independent models feed into a third, while a fourth model only needs the second: + +``` +model_a ──┐ + ├──► model_c +model_b ──┘ + │ + └─────────► model_d +``` + +With CUDA streams, you must manually create streams, record events, and wire up waits: + +```cpp +cudaStream_t s1, s2, s3, s4; +cudaStreamCreate(&s1); +cudaStreamCreate(&s2); +cudaStreamCreate(&s3); +cudaStreamCreate(&s4); + +// Run model_a and model_b in parallel +run_model<<<..., s1>>>(model_a, ...); +run_model<<<..., s2>>>(model_b, ...); + +// Record events to mark completion +cudaEvent_t event_a, event_b; +cudaEventCreate(&event_a); +cudaEventCreate(&event_b); +cudaEventRecord(event_a, s1); +cudaEventRecord(event_b, s2); + +// model_c waits for both a and b +cudaStreamWaitEvent(s3, event_a); +cudaStreamWaitEvent(s3, event_b); +run_model<<<..., s3>>>(model_c, ...); + +// model_d waits only for b — easy to accidentally add event_a here too +cudaStreamWaitEvent(s4, event_b); +run_model<<<..., s4>>>(model_d, ...); + +// Cleanup +cudaEventDestroy(event_a); +cudaEventDestroy(event_b); +cudaStreamDestroy(s1); cudaStreamDestroy(s2); +cudaStreamDestroy(s3); cudaStreamDestroy(s4); +``` + +The dependency structure is buried in scattered `cudaStreamWaitEvent` calls. A misplaced wait or a forgotten event silently introduces wrong ordering or unnecessary serialization. + +With SpikeAsync's `deps=` parameter, the same graph is expressed directly: + +```python +fut_a = spike_async.execute(model_a, in_a, out_a) +fut_b = spike_async.execute(model_b, in_b, out_b) + +# model_c starts only after both a and b finish +fut_c = spike_async.execute(model_c, in_c, out_c, deps=[fut_a, fut_b]) + +# model_d starts as soon as b finishes, independent of a or c +fut_d = spike_async.execute(model_d, in_d, out_d, deps=[fut_b]) +``` + +Each operation declares exactly what it depends on, co-located with the operation itself. There are no streams to create, no events to record, and no risk of accidentally over-constraining or under-constraining the graph. + ## High-Level Async API The high-level API provides an asyncio-like interface with automatic dependency tracking and stream support. From b5de23b36a17db383a288ce8a2846891588b92a3 Mon Sep 17 00:00:00 2001 From: Kelvin Ng Date: Thu, 18 Jun 2026 20:52:05 +0000 Subject: [PATCH 3/5] Add a performance test for SpikeAsync --- spike/tests/conftest.py | 12 ++ spike/tests/test_spike_async.py | 218 ++++++++++++++++++++++++++++++-- 2 files changed, 222 insertions(+), 8 deletions(-) diff --git a/spike/tests/conftest.py b/spike/tests/conftest.py index 78c8589..bf0e86a 100644 --- a/spike/tests/conftest.py +++ b/spike/tests/conftest.py @@ -14,3 +14,15 @@ def pytest_addoption(parser): default=3, help="Number of concurrent pipelines to allocate and run in test_spike_async.py tests.", ) + parser.addoption( + "--warmup-iters", + type=int, + default=3, + help="Number of untimed warmup iterations before measurement in the perf test.", + ) + parser.addoption( + "--measure-iters", + type=int, + default=10, + help="Number of timed measurement iterations in the perf test.", + ) diff --git a/spike/tests/test_spike_async.py b/spike/tests/test_spike_async.py index 520a81c..a209f8c 100644 --- a/spike/tests/test_spike_async.py +++ b/spike/tests/test_spike_async.py @@ -6,6 +6,7 @@ """ import os +import time import pytest import numpy as np @@ -24,13 +25,32 @@ except ImportError as e: pytest.skip(f"Required packages not available: {e}", allow_module_level=True) +# Matmul shapes per --test-mode, expressed as (x_shape, y_shape) for out = x @ y. +# +# The "overlapping" shape is tuned so that, with the weights `y` pre-written once +# (mimicking ML model parameters), a single iteration's `write x` + `read out` +# data-movement time balances the matmul execution time (~4 ms each on the +# reference hardware). The balance follows from the measured rates +# (write 0.3125 ms/MiB, read 0.09375 ms/MiB, exec 1.46e-11 ms/MAC): writing x +# (M*K) plus reading out (M*N) equals exec (M*K*N) when 1.79e-7/K + 5.96e-7/N ~= +# 1.46e-11. All dims are powers of 2: K=32768, N=65536 satisfies it (the M factor +# cancels, so M=128 just sets the absolute scale -- exec mid-range at ~4 ms). +# Then write x = 8 MiB (2.5 ms), read out = 16 MiB (1.5 ms), exec ~= 4 ms. +# Note: the pre-written weights y are 32768*65536 fp16 = 4 GiB per core. +def _matmul_shapes(test_mode): + if test_mode == "overlapping": + return (128, 32768), (32768, 65536) + else: # correctness -- small enough for a CPU reference check + return (128, 256), (256, 128) + + @pytest.fixture(scope="module") def neff_path(request): """Compile a matmul kernel for testing. - Shape is chosen based on --test-mode: + Shape is chosen based on --test-mode (see _matmul_shapes): correctness -- small (128x256) x (256x128), fast enough for CPU reference check - overlapping -- large (4096x8192) x (8192x4096), long enough to exhibit async overlap + overlapping -- large, balanced so write+read ~= exec with weights pre-written """ from nkipy.core.compile import compile_to_neff, trace @@ -38,12 +58,9 @@ def my_matmul(x, y): return np.matmul(x, y) test_mode = request.config.getoption("--test-mode") - if test_mode == "overlapping": - x = np.zeros((4096, 8192), dtype=np.float16) - y = np.zeros((8192, 4096), dtype=np.float16) - else: # correctness - x = np.zeros((128, 256), dtype=np.float16) - y = np.zeros((256, 128), dtype=np.float16) + x_shape, y_shape = _matmul_shapes(test_mode) + x = np.zeros(x_shape, dtype=np.float16) + y = np.zeros(y_shape, dtype=np.float16) traced_kernel = trace(my_matmul) traced_kernel.specialize(x, y) @@ -115,6 +132,92 @@ def model_and_tensors(request, neff_path, spike_async): return model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays +@pytest.fixture(scope="module") +def model_and_tensors_all_cores(request, neff_path, spike_async): + """Like model_and_tensors, but allocates one pipeline per visible NeuronCore. + + Loads the model and allocates input/output tensors on every core so that one + inference pipeline can run concurrently on each core. Mirrors the structure of + model_and_tensors, with all per-pipeline lists indexed by core_id. + + Each core hosts --num-pipelines independent pipelines so that multiple + inferences can overlap within a core (e.g. one pipeline's tensor_read can run + while another's execute is in flight). Per-pipeline lists are nested and + indexed [core_id][pipeline_idx]; per-core lists (models, inputs_y) are indexed + [core_id]. The weights `y` are pre-written once per core (mimicking ML model + parameters loaded ahead of inference) and shared by all pipelines on that core, + so each pipeline only writes its own `x` and reads its own output. Shapes come + from _matmul_shapes: + correctness -- small (128x256) x (256x128) with random values for numerical verification + overlapping -- large, balanced so write x + read out ~= exec (y excluded, pre-written) + """ + num_cores = Spike.get_visible_neuron_core_count() + num_pipelines = request.config.getoption("--num-pipelines") + + # Create input arrays for matmul + test_mode = request.config.getoption("--test-mode") + x_shape, y_shape = _matmul_shapes(test_mode) + if test_mode == "overlapping": + x = np.zeros(x_shape, dtype=np.float16) + y = np.zeros(y_shape, dtype=np.float16) + else: # correctness + x = np.random.rand(*x_shape).astype(np.float16) + y = np.random.rand(*y_shape).astype(np.float16) + + output_shape = (x.shape[0], y.shape[1]) + output_size = np.prod(output_shape) * x.dtype.itemsize + + # Per-core lists (one entry per core) + models = [] + inputs_y = [] + # Per-pipeline lists (nested: [core_id][pipeline_idx]) + inputs_x = [] + input_sets = [] + outputs = [] + output_sets = [] + output_arrays = [] # Pre-allocated CPU buffers for reading outputs + preload_futs = [] + for core_id in range(num_cores): + models.append(spike_async.load_model(neff_path, core_id=core_id)) + + # One shared weights tensor per core, pre-written once and reused by every + # pipeline on the core. + input_y = spike_async.allocate_tensor(size=y.nbytes, core_id=core_id, name=f"input_y_{core_id}") + inputs_y.append(input_y) + preload_futs.append(spike_async.tensor_write(input_y, y)) + + # Independent x/output tensors per pipeline so pipelines don't alias buffers. + core_inputs_x = [] + core_input_sets = [] + core_outputs = [] + core_output_sets = [] + core_output_arrays = [] + for p in range(num_pipelines): + input_x = spike_async.allocate_tensor(size=x.nbytes, core_id=core_id, name=f"input_x_{core_id}_{p}") + core_inputs_x.append(input_x) + core_input_sets.append(spike_async.create_tensor_set({"x": input_x, "y": input_y})) + + out = spike_async.allocate_tensor(size=output_size, core_id=core_id, name=f"output_{core_id}_{p}") + core_outputs.append(out) + core_output_sets.append(spike_async.create_tensor_set({"output0": out})) + # Pre-allocate CPU array for zero-copy tensor_read + core_output_arrays.append(np.empty(output_shape, dtype=x.dtype)) + + inputs_x.append(core_inputs_x) + input_sets.append(core_input_sets) + outputs.append(core_outputs) + output_sets.append(core_output_sets) + output_arrays.append(core_output_arrays) + + # Wait for all weight pre-writes to land before any test runs. + for fut in preload_futs: + fut.wait() + + print(f'Done initializing {num_pipelines} pipeline(s) each on {num_cores} cores.') + + return models, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays + + @pytest.fixture(scope="module") def expected_output(model_and_tensors): """Compute expected matmul output once for all tests.""" @@ -185,6 +288,105 @@ async def inference_pipeline(pipeline_idx): assert np.allclose(output_arrays[i], expected_output, rtol=1e-3), f"Output mismatch for pipeline {i}" +def test_all_cores_concurrent_perf(request, spike_async, model_and_tensors_all_cores): + """Performance test: run tensor_write -> execute -> tensor_read concurrently on all cores. + + Submits --num-pipelines coroutine pipelines per core (using tensors from the + model_and_tensors_all_cores fixture). Running multiple pipelines concurrently on + a single core lets independent operations overlap -- e.g. one pipeline's + tensor_read or tensor_write (DMA-bound) can run while another pipeline's execute + (compute-bound) is in flight -- which is the benefit of the async system. + + Two configurations are compared: + single-core -- all pipelines on core 0 only + all-cores -- the same per-core pipelines on every visible core, concurrently + + Because the cores execute independently in parallel, a zero-overhead coroutine + system would finish the all-cores run in the same time as the single-core run; + the difference quantifies the event-loop/coroutine scheduling overhead. + + Each configuration is run for --warmup-iters untimed warmup iterations followed + by --measure-iters timed iterations; the reported time is the mean over the + measured iterations. + """ + models, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors_all_cores + num_cores = len(models) + if num_cores < 1: + pytest.skip("No visible NeuronCores available") + + num_pipelines = request.config.getoption("--num-pipelines") + warmup_iters = request.config.getoption("--warmup-iters") + measure_iters = request.config.getoption("--measure-iters") + if measure_iters < 1: + pytest.skip("--measure-iters must be >= 1") + + expected = np.matmul(x, y) + + # Weights y are pre-written by the fixture (like ML model parameters), so each + # pipeline only writes its x, executes, and reads its output. With the balanced + # overlapping shape, write x + read out ~= exec time, so overlapping multiple + # pipelines on a core can hide DMA behind compute (and vice versa). + async def inference_pipeline(core_id, p): + await spike_async.tensor_write(inputs_x[core_id][p], x) + await spike_async.execute(models[core_id], input_sets[core_id][p], output_sets[core_id][p]) + await spike_async.tensor_read(outputs[core_id][p], output_arrays[core_id][p]) + + def run_cores(core_ids): + """One iteration: num_pipelines pipelines on each given core, all concurrent. + + Returns elapsed seconds for the whole batch to complete. + """ + start = time.perf_counter() + futs = [ + spike_async.submit(inference_pipeline(core_id, p)) + for core_id in core_ids + for p in range(num_pipelines) + ] + for fut in futs: + fut.wait() + return time.perf_counter() - start + + def measure(core_ids): + """Run warmup iterations (untimed), then measurement iterations (timed).""" + for _ in range(warmup_iters): + run_cores(core_ids) + return [run_cores(core_ids) for _ in range(measure_iters)] + + def check_outputs(core_ids, label): + for core_id in core_ids: + for p in range(num_pipelines): + assert np.allclose(output_arrays[core_id][p], expected, rtol=1e-3), \ + f"Output mismatch on core {core_id} pipeline {p} ({label})" + + # Baseline: all pipelines on core 0 only. Since the all-cores run executes the + # same per-core pipelines on every core concurrently, in a zero-overhead + # coroutine system the two wall-clock times should be identical. Any difference + # measures the overhead the coroutine/event-loop system adds when scheduling + # many concurrent pipelines across cores. + single_times = measure([0]) + check_outputs([0], "single-core baseline") + + all_core_ids = list(range(num_cores)) + all_times = measure(all_core_ids) + check_outputs(all_core_ids, "all-cores") + + single_mean = float(np.mean(single_times)) + all_mean = float(np.mean(all_times)) + overhead = all_mean - single_mean + print( + f"\n({num_pipelines} pipeline(s)/core; warmup={warmup_iters}, measure={measure_iters} iters; " + f"times are mean +/- std)\n" + f"Single-core (core 0, {num_pipelines} pipelines): " + f"{single_mean * 1e3:8.2f} +/- {np.std(single_times) * 1e3:6.2f} ms " + f"({num_pipelines / single_mean:.1f} pipelines/s)\n" + f"All cores ({num_cores}x{num_pipelines}={num_cores * num_pipelines} pipelines): " + f"{all_mean * 1e3:8.2f} +/- {np.std(all_times) * 1e3:6.2f} ms " + f"({num_cores * num_pipelines / all_mean:.1f} pipelines/s)\n" + f"Coroutine overhead: {overhead * 1e3:8.2f} ms " + f"({all_mean / single_mean:.2f}x single-core)" + ) + + def test_tensor_read_write(spike_async): """Test basic tensor read/write operations.""" # Allocate a test tensor From 8778fa539beb01b4bf5d1a321bf237ff0b008493 Mon Sep 17 00:00:00 2001 From: Kelvin Ng Date: Thu, 18 Jun 2026 22:56:48 +0000 Subject: [PATCH 4/5] Port SpikeAsync to use nrta instead of thread pool --- spike/src/include/spike.h | 265 ++++++-------- spike/src/spike.cpp | 704 +++++++++++++++++++++++--------------- 2 files changed, 542 insertions(+), 427 deletions(-) diff --git a/spike/src/include/spike.h b/spike/src/include/spike.h index 60a7b44..19249e2 100644 --- a/spike/src/include/spike.h +++ b/spike/src/include/spike.h @@ -6,18 +6,16 @@ #include "tensor.h" #include "tensor_set.h" +#include + #include #include -#include -#include +#include #include +#include #include -#include #include -#include -#include -#include #include #include #include @@ -39,11 +37,18 @@ struct ModelTensorInfo { std::unordered_map outputs; }; -// NonBlock command structures -struct NonBlockCloseCmd {}; +// Prepared-batch records. These only hold the arguments to be used when +// `_batched_start` is called; at start time we just submit one nrta_* request +// per entry and group them under a single cmd_id. +struct PreparedTensorWrite { + std::shared_ptr tensor; + const void *data; + size_t size; + size_t offset; + std::variant> data_obj; +}; -struct NonBlockTensorReadCmd { - uint64_t id; +struct PreparedTensorRead { std::shared_ptr tensor; size_t offset; size_t size; @@ -51,114 +56,62 @@ struct NonBlockTensorReadCmd { std::variant> data_obj; }; -struct NonBlockTensorWriteCmd { - uint64_t id; +// Pending nonblocking operations. Each cmd_id we hand back to Python maps to +// one PendingOp. The id and wait_seq are common to every kind of op, so they +// live directly on PendingOp; the per-kind payload is the variant below. The +// batched variants hold N sub-requests submitted back-to-back on the same +// (lnc, xu, queue=0); their seq numbers are consecutive, so wait_seq stores +// the last one and a completed wait_seq implies every prior sub-request is +// complete too. + +// The nrta_* APIs store the op's completion status through the `ret` pointer +// *after* submission, so `ret` must outlive the call. Callers therefore enqueue +// the PendingOp first and pass a pointer into the deque-resident copy (see +// enqueue_pending); std::deque never relocates existing elements on push_back, +// so that address stays valid until the op is harvested. +struct PendingTensorWrite { + NRT_STATUS ret; std::shared_ptr tensor; - const void *data; - size_t size; + // Anchors the Python-owned source buffer until the op completes. Empty for + // the raw-pointer overload (caller manages the buffer lifetime). + std::optional>> data_obj; +}; + +struct PendingTensorRead { + NRT_STATUS ret; + std::shared_ptr tensor; + // The destination buffer; also returned to Python via + // NonBlockTensorReadResult.data. std::variant> data_obj; - size_t offset; }; -struct NonBlockTensorWriteBatchedCmd { - uint64_t id; +struct PendingTensorWriteBatched { + // Lifetime anchors live in tensor_write_batched_prepared_[batch_id], which + // persists until close() (or a future explicit-release API) so the same + // prepared batch can be _start'd many times. uint64_t batch_id; + std::vector rets; }; -struct NonBlockTensorReadBatchedCmd { - uint64_t id; +struct PendingTensorReadBatched { uint64_t batch_id; + std::vector rets; }; -typedef std::variant - NonBlockTensorCmd; - -struct NonBlockExecCmd { - uint64_t id; +struct PendingExecute { + NRT_STATUS ret; std::shared_ptr model; std::shared_ptr input_set; std::shared_ptr output_set; - std::optional ntff_name; - bool save_trace; }; -typedef std::variant - NonBlockExecOrCloseCmd; - -// Thread-safe queue template (blocking and non-blocking versions) -template class LockedQueue { -public: - LockedQueue() { - if constexpr (!is_blocking) { - size_ = 0; - } - } - - void push(const T &value) { - std::unique_lock lk(mtx_); - q_.push(value); - lk.unlock(); - if constexpr (is_blocking) { - cv_.notify_one(); - } else { - size_.fetch_add(1, std::memory_order_release); - } - } - - void push(T &&value) { - std::unique_lock lk(mtx_); - q_.push(std::move(value)); - lk.unlock(); - if constexpr (is_blocking) { - cv_.notify_one(); - } else { - size_.fetch_add(1, std::memory_order_release); - } - } - - template void emplace(Args &&...args) { - std::unique_lock lk(mtx_); - q_.emplace(std::forward(args)...); - lk.unlock(); - if constexpr (is_blocking) { - cv_.notify_one(); - } else { - size_.fetch_add(1, std::memory_order_release); - } - } - - template std::enable_if_t pop() { - std::unique_lock lk(mtx_); - cv_.wait(lk, [this]() { return !q_.empty(); }); - T ret(std::move(q_.front())); - q_.pop(); - lk.unlock(); - return ret; - } - - // This function is only safe with one consumer - template - std::enable_if_t> try_pop() { - if (size_.load(std::memory_order_acquire) >= 1) { - size_.fetch_sub(1, std::memory_order_release); - std::lock_guard lk(mtx_); - T ret(std::move(q_.front())); - q_.pop(); - return ret; - } else { - return std::nullopt; - } - } - -private: - struct Empty {}; - - std::queue q_; - std::mutex mtx_; - std::conditional_t cv_; - std::conditional_t size_; +struct PendingOp { + uint64_t id; + nrta_seq_t wait_seq; + std::variant + op; }; // NonBlock result structures (exposed to Python) @@ -182,40 +135,6 @@ typedef std::variant NonBlockResult; -// NonBlock internal result structures (used in worker threads to avoid GIL) -// These hold shared_ptrs to keep resources alive until convert_internal_result() -// transfers nanobind objects to Result types. This two-phase pattern ensures -// nanobind/Python object destructors run in GIL context, not in worker threads. -struct NonBlockTensorReadInternalResult { - uint64_t id; - std::variant> data_obj; - std::optional> err; - - std::shared_ptr tensor; -}; - -struct NonBlockTensorWriteInternalResult { - uint64_t id; - std::optional> err; - - std::shared_ptr tensor; - std::variant> data_obj; -}; - -struct NonBlockExecInternalResult { - uint64_t id; - std::optional> err; - - std::shared_ptr model; - std::shared_ptr input_set; - std::shared_ptr output_set; -}; - -typedef std::variant - NonBlockInternalResult; - // Main Spike class - Python interface class Spike { public: @@ -320,32 +239,74 @@ class Spike { int verbose_level_; std::unique_ptr runtime_; - // Nonblock support + // Nonblock state uint64_t next_non_block_id_ = 0; uint64_t next_batch_id_ = 0; - std::vector tensor_threads_; - std::vector exec_threads_; - std::vector> tensor_queues_; - std::vector> exec_queues_; - LockedQueue noti_queue_; + // One pending-op queue per (lnc, xu) channel. Each queue is FIFO by + // submission order, and within a queue nrta_seq_t values are monotonically + // increasing, so a channel only needs its front op's wait_seq checked + // against nrta_get_sequence's latest-completed seq for that channel. + static constexpr uint32_t MAX_LNC = 128; + static constexpr uint32_t NUM_CHANNELS = MAX_LNC * NRTA_XU_TYPE_NUM; + std::array, NRTA_XU_TYPE_NUM>, MAX_LNC> + xu_queues_; + + // epoll-based completion multiplexing. Each (lnc, xu) channel that has ever + // had work gets one eventfd, registered with the runtime via + // nrta_event_register_xu_completion() and added to a single epoll instance. + // The runtime signals a channel's eventfd whenever that XU completes any + // sequence, so try_poll() can ask epoll which channels made progress instead + // of probing all MAX_LNC * NRTA_XU_TYPE_NUM channels every call. + int epoll_fd_ = -1; + // Per-channel eventfd, -1 until the channel is first registered. + std::array channel_event_fds_; + + // Channels whose front op may be ready to harvest. Populated by epoll_wait + // (each signaled eventfd) and by try_poll itself (a channel stays queued + // after a successful harvest, since one eventfd signal can cover several + // completed ops). Removed once the front is found not-yet-complete or the + // queue empties. + // + // A FIFO queue (rather than an ordered set) gives round-robin fairness: every + // try_poll() pops the front channel, and a channel that still has pollable + // work is pushed to the back, so a continuously-busy low-index channel can't + // starve the others. scan_channel_queued_ tracks membership in O(1) so a + // channel is never enqueued twice. + std::deque scan_channels_; + std::array scan_channel_queued_; + + static constexpr uint32_t channel_index(uint32_t lnc, uint32_t xu) { + return lnc * NRTA_XU_TYPE_NUM + xu; + } + + // Lazily creates the epoll instance and the channel's eventfd, registering + // the latter with the runtime and the epoll set. Idempotent per channel. + void ensure_channel_registered(uint32_t lnc, nrta_xu_t xu); - std::unordered_map> - tensor_write_batched_cmds_; - std::shared_mutex tensor_write_batched_cmds_mtx_; + // Prepared batches (data kept alive between _prepare and _start). + std::unordered_map> + tensor_write_batched_prepared_; - std::unordered_map> - tensor_read_batched_cmds_; - std::shared_mutex tensor_read_batched_cmds_mtx_; + std::unordered_map> + tensor_read_batched_prepared_; - void loop_execute(int core_id); - void loop_tensor(int core_id); + // Appends op to the (lnc, xu) queue and returns a reference to the enqueued + // element. std::deque never relocates existing elements on push_back, so the + // returned reference (and pointers into its ret field) stay valid until the + // op is popped in try_poll. Scalar callers enqueue *before* submitting so the + // nrta_* call can write completion status straight into the queued op. + PendingOp &enqueue_pending(uint32_t lnc, nrta_xu_t xu, PendingOp op); // Helper methods NrtTensorSet create_tensor_set( const std::unordered_map &tensor_map); std::string dtype_to_string(nrt_dtype_t dtype); - NonBlockResult convert_internal_result(NonBlockInternalResult &internal_res_); + + static std::optional> + ret_to_err(NRT_STATUS ret); + static std::optional> + rets_to_err(const std::vector &rets); }; } // namespace spike diff --git a/spike/src/spike.cpp b/spike/src/spike.cpp index fdd14f3..3eeb69c 100644 --- a/spike/src/spike.cpp +++ b/spike/src/spike.cpp @@ -1,6 +1,10 @@ #include "spike.h" -#include #include +#include +#include +#include +#include +#include namespace spike { @@ -16,6 +20,10 @@ Spike::Spike(int verbose_level) "tensors/models first."); } + // Channel eventfds are created lazily on first use; -1 means unregistered. + channel_event_fds_.fill(-1); + scan_channel_queued_.fill(false); + // RAII: Initialize NRT in constructor if (verbose_level_ > 0) { std::cout << "Initializing SPIKE Runtime" << std::endl; @@ -36,29 +44,105 @@ uint32_t Spike::get_visible_neuron_core_count() { } int Spike::close() { - // Shut down nonblock threads if initialized - for (auto &q : exec_queues_) { - q.emplace(NonBlockCloseCmd{}); - } - for (auto &q : tensor_queues_) { - q.emplace(NonBlockCloseCmd{}); - } - for (auto &t : exec_threads_) { - if (t.joinable()) { - t.join(); + for (auto &per_lnc : xu_queues_) { + for (auto &q : per_lnc) { + q.clear(); } } - for (auto &t : tensor_threads_) { - if (t.joinable()) { - t.join(); + tensor_write_batched_prepared_.clear(); + tensor_read_batched_prepared_.clear(); + + // Tear down the epoll multiplexing state. Deregister each channel's eventfd + // from the runtime (negative fd deregisters) before closing it. + for (uint32_t lnc = 0; lnc < MAX_LNC; ++lnc) { + for (uint32_t xu = 0; xu < NRTA_XU_TYPE_NUM; ++xu) { + int &fd = channel_event_fds_[channel_index(lnc, xu)]; + if (fd < 0) { + continue; + } + nrta_event_register_xu_completion(static_cast(lnc), + static_cast(xu), + /*queue=*/0, /*fd=*/-1); + ::close(fd); + fd = -1; } } + if (epoll_fd_ >= 0) { + ::close(epoll_fd_); + epoll_fd_ = -1; + } + scan_channels_.clear(); + scan_channel_queued_.fill(false); runtime_.reset(); g_alive_spike_instance_exists = false; return 0; } +void Spike::ensure_channel_registered(uint32_t lnc, nrta_xu_t xu) { + if (lnc >= MAX_LNC) { + throw SpikeError("LNC index exceeds MAX_LNC."); + } + if (xu >= NRTA_XU_TYPE_NUM) { + throw SpikeError("XU index out of range."); + } + + if (epoll_fd_ < 0) { + epoll_fd_ = epoll_create1(EPOLL_CLOEXEC); + if (epoll_fd_ < 0) { + throw SpikeError(std::string("epoll_create1 failed: ") + + std::strerror(errno)); + } + } + + int &fd = channel_event_fds_[channel_index(lnc, xu)]; + if (fd >= 0) { + return; // Already registered. + } + + fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (fd < 0) { + throw SpikeError(std::string("eventfd failed: ") + std::strerror(errno)); + } + + // The data carries the channel index so epoll_wait tells us exactly which + // (lnc, xu) channel signaled. + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.u32 = channel_index(lnc, xu); + if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) { + int err = errno; + ::close(fd); + fd = -1; + throw SpikeError(std::string("epoll_ctl ADD failed: ") + + std::strerror(err)); + } + + // Hand the eventfd to the runtime; it signals it whenever this XU completes + // any sequence on queue 0. + NRT_STATUS status = nrta_event_register_xu_completion( + static_cast(lnc), xu, /*queue=*/0, fd); + if (status != NRT_SUCCESS) { + epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); + ::close(fd); + fd = -1; + throw NrtError(status, "Failed to register XU completion event"); + } +} + +PendingOp &Spike::enqueue_pending(uint32_t lnc, nrta_xu_t xu, PendingOp op) { + // Channel registration happens before submission (in the calling + // tensor_*/execute_nonblock helpers), so we know an eventfd is in place for + // every completion. xu must be the same XU the channel was registered on so + // the queue index here matches the one try_poll derives from the epoll event. + if (lnc >= MAX_LNC || xu >= NRTA_XU_TYPE_NUM) { + throw SpikeError("Channel index out of range."); + } + std::deque &q = xu_queues_[lnc][xu]; + q.push_back(std::move(op)); + return q.back(); +} + NrtModel Spike::load_model(const std::string &neff_file, uint32_t core_id, bool cc_enabled, uint32_t rank_id, uint32_t world_size) { @@ -155,34 +239,37 @@ void Spike::execute(NrtModel &model, model.execute(input_set, output_set, ntff_name, save_trace); } -void Spike::init_nonblock() { - // Initialize nonblocking thread pools: one execution thread and one tensor - // thread per physical NeuronCore. Uses total core count (not just visible) - // to avoid visible-to-physical ID mapping. Extra threads remain blocked - // with no overhead. - - int num_nc = NrtRuntime::get_total_nc_count(); - exec_queues_ = std::vector>(num_nc); - tensor_queues_ = std::vector>(num_nc); - - for (int i = 0; i < num_nc; ++i) { - exec_threads_.emplace_back([this, i]() { loop_execute(i); }); - tensor_threads_.emplace_back([this, i]() { loop_tensor(i); }); - } -} +// Kept for backward compatibility with SpikeAsync. With the nrta_* backend we +// don't need any thread pools; there's nothing to initialize. +void Spike::init_nonblock() {} uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, nb::bytes data_obj, size_t offset) { - uint64_t cmd_id = next_non_block_id_++; - const void *data = data_obj.data(); size_t size = data_obj.size(); - uint32_t core_id = tensor->get_core_id(); - NonBlockTensorWriteCmd cmd{cmd_id, std::move(tensor), data, - size, std::move(data_obj), offset}; - tensor_queues_[core_id].emplace(std::move(cmd)); + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = tensor->get_core_id(); + + PendingTensorWrite p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(data_obj); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + // Enqueue first so nrta_* can write completion status into the deque-resident + // op (its address must outlive this call); roll back if scheduling fails. + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorWrite &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_write( + tensor->get_ptr(), data, offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor write"); + } return cmd_id; } @@ -190,15 +277,28 @@ uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, nb::ndarray<> data_obj, size_t offset) { - uint64_t cmd_id = next_non_block_id_++; - const void *data = data_obj.data(); size_t size = data_obj.nbytes(); - uint32_t core_id = tensor->get_core_id(); - NonBlockTensorWriteCmd cmd{cmd_id, std::move(tensor), data, - size, std::move(data_obj), offset}; - tensor_queues_[core_id].emplace(std::move(cmd)); + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = tensor->get_core_id(); + + PendingTensorWrite p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(data_obj); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorWrite &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_write( + tensor->get_ptr(), data, offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor write"); + } return cmd_id; } @@ -207,17 +307,24 @@ uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, const void *data, size_t size, size_t offset) { uint64_t cmd_id = next_non_block_id_++; - - uint32_t core_id = tensor->get_core_id(); - - NonBlockTensorWriteCmd cmd; - cmd.id = cmd_id; - cmd.tensor = std::move(tensor); - cmd.data = data; - cmd.size = size; - cmd.offset = offset; - - tensor_queues_[core_id].emplace(std::move(cmd)); + uint32_t lnc = tensor->get_core_id(); + + PendingTensorWrite p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + // No data_obj: caller manages the raw buffer's lifetime. + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorWrite &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_write( + tensor->get_ptr(), data, offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor write"); + } return cmd_id; } @@ -227,15 +334,27 @@ uint64_t Spike::tensor_read_nonblock(std::shared_ptr tensor, size = (size == 0) ? (tensor->get_size() - offset) : size; uint64_t cmd_id = next_non_block_id_++; - - uint32_t core_id = tensor->get_core_id(); + uint32_t lnc = tensor->get_core_id(); nb::bytes dest(nullptr, size); void *data = PyBytes_AsString(dest.ptr()); - NonBlockTensorReadCmd cmd{cmd_id, std::move(tensor), offset, - size, data, std::move(dest)}; - tensor_queues_[core_id].emplace(std::move(cmd)); + PendingTensorRead p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(dest); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorRead &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_read( + data, tensor->get_ptr(), offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor read"); + } return cmd_id; } @@ -250,14 +369,26 @@ uint64_t Spike::tensor_read_nonblock(std::shared_ptr tensor, } uint64_t cmd_id = next_non_block_id_++; - - uint32_t core_id = tensor->get_core_id(); + uint32_t lnc = tensor->get_core_id(); void *data = dest.data(); - NonBlockTensorReadCmd cmd{cmd_id, std::move(tensor), offset, - size, data, std::move(dest)}; - tensor_queues_[core_id].emplace(std::move(cmd)); + PendingTensorRead p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(dest); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorRead &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_read( + data, tensor->get_ptr(), offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor read"); + } return cmd_id; } @@ -277,20 +408,15 @@ uint64_t Spike::tensor_write_nonblock_batched_prepare( uint64_t batch_id = next_batch_id_++; - std::vector cmds; - cmds.reserve(tensors.size()); + std::vector prepared; + prepared.reserve(tensors.size()); uint32_t core_id = tensors[0]->get_core_id(); for (size_t i = 0; i < tensors.size(); ++i) { std::shared_ptr tensor = std::move(tensors[i]); nb::ndarray<> data_obj = std::move(data_objs[i]); - size_t offset; - if (offsets.has_value()) { - offset = offsets.value()[i]; - } else { - offset = 0; - } + size_t offset = offsets.has_value() ? offsets.value()[i] : 0; const void *data = data_obj.data(); size_t size = data_obj.nbytes(); @@ -299,36 +425,56 @@ uint64_t Spike::tensor_write_nonblock_batched_prepare( throw SpikeError("All tensors must be on the same NeuronCore."); } - NonBlockTensorWriteCmd cmd{batch_id, std::move(tensor), data, - size, std::move(data_obj), offset}; - cmds.push_back(std::move(cmd)); + PreparedTensorWrite entry; + entry.tensor = std::move(tensor); + entry.data = data; + entry.size = size; + entry.offset = offset; + entry.data_obj = std::move(data_obj); + prepared.push_back(std::move(entry)); } - std::scoped_lock lk(tensor_write_batched_cmds_mtx_); - tensor_write_batched_cmds_[batch_id] = std::move(cmds); + tensor_write_batched_prepared_[batch_id] = std::move(prepared); return batch_id; } uint64_t Spike::tensor_write_nonblock_batched_start(uint64_t batch_id) { - uint64_t cmd_id = next_non_block_id_++; - - NonBlockTensorWriteBatchedCmd cmd; - cmd.id = cmd_id; - cmd.batch_id = batch_id; - - std::shared_lock lk(tensor_write_batched_cmds_mtx_); - - auto it = tensor_write_batched_cmds_.find(batch_id); - if (it == tensor_write_batched_cmds_.end()) { - lk.unlock(); + auto it = tensor_write_batched_prepared_.find(batch_id); + if (it == tensor_write_batched_prepared_.end()) { throw SpikeError("The batch ID does not exist."); } - uint32_t core_id = it->second[0].tensor->get_core_id(); - lk.unlock(); + const std::vector &prepared = it->second; - tensor_queues_[core_id].emplace(std::move(cmd)); + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = prepared[0].tensor->get_core_id(); + + PendingTensorWriteBatched p; + p.batch_id = batch_id; + p.rets.assign(prepared.size(), NRT_SUCCESS); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + // p.rets is a heap vector; the runtime keeps &p.rets[i] and fills it on + // completion. Moving p into the deque below preserves that buffer, so the + // pointers stay valid. All sub-requests hit the same (lnc, xu=TENSOR_OP, + // queue=0) back-to-back, so seq numbers are consecutive and we only wait on + // the last. + nrta_seq_t wait_seq = 0; + for (size_t i = 0; i < prepared.size(); ++i) { + const PreparedTensorWrite &e = prepared[i]; + nrta_seq_t seq; + NRT_STATUS status = nrta_tensor_write( + e.tensor->get_ptr(), e.data, e.offset, e.size, static_cast(lnc), + /*queue=*/0, &p.rets[i], &seq); + if (status != NRT_SUCCESS) { + throw NrtError(status, + "Failed to schedule nonblocking batched tensor write"); + } + wait_seq = seq; + } + enqueue_pending(lnc, NRTA_XU_TENSOR_OP, + PendingOp{cmd_id, wait_seq, std::move(p)}); return cmd_id; } @@ -349,8 +495,8 @@ uint64_t Spike::tensor_read_nonblock_batched_prepare( uint64_t batch_id = next_batch_id_++; - std::vector cmds; - cmds.reserve(tensors.size()); + std::vector prepared; + prepared.reserve(tensors.size()); uint32_t core_id = tensors[0]->get_core_id(); @@ -359,8 +505,8 @@ uint64_t Spike::tensor_read_nonblock_batched_prepare( nb::ndarray<> dest = std::move(dests[i]); size_t offset = offsets.has_value() ? offsets.value()[i] : 0; - size_t size = sizes.has_value() ? sizes.value()[i] : - (tensor->get_size() - offset); + size_t size = sizes.has_value() ? sizes.value()[i] + : (tensor->get_size() - offset); if (dest.nbytes() < size) { throw SpikeError("The read operation exceeds the destination bound."); @@ -372,36 +518,54 @@ uint64_t Spike::tensor_read_nonblock_batched_prepare( throw SpikeError("All tensors must be on the same NeuronCore."); } - NonBlockTensorReadCmd cmd{batch_id, std::move(tensor), offset, - size, data, std::move(dest)}; - cmds.push_back(std::move(cmd)); + PreparedTensorRead entry; + entry.tensor = std::move(tensor); + entry.offset = offset; + entry.size = size; + entry.data = data; + entry.data_obj = std::move(dest); + prepared.push_back(std::move(entry)); } - std::scoped_lock lk(tensor_read_batched_cmds_mtx_); - tensor_read_batched_cmds_[batch_id] = std::move(cmds); + tensor_read_batched_prepared_[batch_id] = std::move(prepared); return batch_id; } uint64_t Spike::tensor_read_nonblock_batched_start(uint64_t batch_id) { - uint64_t cmd_id = next_non_block_id_++; - - NonBlockTensorReadBatchedCmd cmd; - cmd.id = cmd_id; - cmd.batch_id = batch_id; - - std::shared_lock lk(tensor_read_batched_cmds_mtx_); - - auto it = tensor_read_batched_cmds_.find(batch_id); - if (it == tensor_read_batched_cmds_.end()) { - lk.unlock(); + auto it = tensor_read_batched_prepared_.find(batch_id); + if (it == tensor_read_batched_prepared_.end()) { throw SpikeError("The batch ID does not exist."); } - uint32_t core_id = it->second[0].tensor->get_core_id(); - lk.unlock(); + const std::vector &prepared = it->second; - tensor_queues_[core_id].emplace(std::move(cmd)); + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = prepared[0].tensor->get_core_id(); + + PendingTensorReadBatched p; + p.batch_id = batch_id; + p.rets.assign(prepared.size(), NRT_SUCCESS); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + // p.rets is a heap vector; the runtime keeps &p.rets[i] and fills it on + // completion. Moving p into the deque below preserves that buffer, so the + // pointers stay valid. Seq numbers are consecutive; wait on the last. + nrta_seq_t wait_seq = 0; + for (size_t i = 0; i < prepared.size(); ++i) { + const PreparedTensorRead &e = prepared[i]; + nrta_seq_t seq; + NRT_STATUS status = nrta_tensor_read( + e.data, e.tensor->get_ptr(), e.offset, e.size, static_cast(lnc), + /*queue=*/0, &p.rets[i], &seq); + if (status != NRT_SUCCESS) { + throw NrtError(status, + "Failed to schedule nonblocking batched tensor read"); + } + wait_seq = seq; + } + enqueue_pending(lnc, NRTA_XU_TENSOR_OP, + PendingOp{cmd_id, wait_seq, std::move(p)}); return cmd_id; } @@ -410,24 +574,167 @@ uint64_t Spike::execute_nonblock( std::shared_ptr input_set, std::shared_ptr output_set, std::optional ntff_name, bool save_trace) { + // nrta_execute_schedule has no profiling hook. The nrt_profile_start/stop + // pair in the sync path straddles a single synchronous nrt_execute, which + // can't be mapped safely onto an asynchronous schedule. + if (save_trace) { + throw SpikeError("save_trace is not supported with nonblocking execute."); + } + (void)ntff_name; + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = model->get_core_id(); + + PendingExecute p; + p.ret = NRT_SUCCESS; + p.model = model; + p.input_set = input_set; + p.output_set = output_set; + + ensure_channel_registered(lnc, NRTA_XU_COMPUTE); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_COMPUTE, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingExecute &q = std::get(enq.op); + NRT_STATUS status = + nrta_execute_schedule(model->get_ptr(), input_set->get_ptr(), + output_set->get_ptr(), /*queue=*/0, &q.ret, + &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_COMPUTE].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking execute"); + } - uint32_t core_id = model->get_core_id(); + return cmd_id; +} - NonBlockExecCmd cmd{cmd_id, std::move(model), std::move(input_set), - std::move(output_set), ntff_name, save_trace}; - exec_queues_[core_id].emplace(std::move(cmd)); +std::optional> +Spike::ret_to_err(NRT_STATUS ret) { + if (ret != NRT_SUCCESS) { + return NrtError(ret, "Nonblocking operation failed"); + } + return std::nullopt; +} - return cmd_id; +std::optional> +Spike::rets_to_err(const std::vector &rets) { + for (NRT_STATUS r : rets) { + if (r != NRT_SUCCESS) { + return NrtError(r, "Nonblocking operation failed"); + } + } + return std::nullopt; } std::optional Spike::try_poll() { - std::optional internal_res = noti_queue_.try_pop(); - if (internal_res.has_value()) { - return convert_internal_result(internal_res.value()); - } else { - return std::nullopt; + // The runtime signals each channel's eventfd whenever its XU completes a + // sequence. We ask epoll (non-blocking) which channels made progress and + // only scan those — rather than probing all MAX_LNC * NRTA_XU_TYPE_NUM + // channels every call. Registration is done before submission (see the + // tensor_*/execute_nonblock helpers), so every completion is guaranteed to + // signal epoll. + if (epoll_fd_ >= 0) { + struct epoll_event events[NUM_CHANNELS]; + int n; + do { + n = epoll_wait(epoll_fd_, events, NUM_CHANNELS, /*timeout=*/0); + } while (n < 0 && errno == EINTR); + if (n < 0) { + throw SpikeError(std::string("epoll_wait failed: ") + + std::strerror(errno)); + } + for (int i = 0; i < n; ++i) { + uint32_t ch = events[i].data.u32; + // Drain the eventfd so it only fires again on a future completion + // (level-triggered). A single read resets the counter regardless of how + // many completions it accumulated; nrta_get_sequence reports the latest. + uint64_t counter; + int fd = channel_event_fds_[ch]; + if (fd >= 0) { + ssize_t rc; + do { + rc = read(fd, &counter, sizeof(counter)); + } while (rc < 0 && errno == EINTR); + } + // Enqueue for scanning, guarding against duplicates in O(1). + if (!scan_channel_queued_[ch]) { + scan_channel_queued_[ch] = true; + scan_channels_.push_back(ch); + } + } + } + + // Each (lnc, xu) queue is FIFO in submission order, and within it nrta_seq_t + // values are monotonically increasing, so we only check the front op's + // wait_seq against nrta_get_sequence's latest-completed seq for that channel. + // + // scan_channels_ is a round-robin FIFO: we pop the front channel, and if it + // yields a completed op we re-queue it at the back before returning, so a + // continuously-busy channel can't starve the others. A channel that is empty + // or whose front is not yet complete is dropped (its eventfd will re-queue it + // on the next completion). + while (!scan_channels_.empty()) { + uint32_t ch = scan_channels_.front(); + scan_channels_.pop_front(); + scan_channel_queued_[ch] = false; + + uint32_t lnc = ch / NRTA_XU_TYPE_NUM; + uint32_t xu_idx = ch % NRTA_XU_TYPE_NUM; + std::deque &q = xu_queues_[lnc][xu_idx]; + + if (q.empty()) { + continue; + } + nrta_seq_t front_wait_seq = q.front().wait_seq; + + nrta_xu_t xu = static_cast(xu_idx); + nrta_seq_t latest = 0; + NRT_STATUS s = nrta_get_sequence(lnc, xu, /*queue=*/0, &latest); + if (s != NRT_SUCCESS || + NRTA_SEQ_GET_SEQ_NUM(front_wait_seq) > NRTA_SEQ_GET_SEQ_NUM(latest)) { + // Front not completed yet; stop scanning this channel until its eventfd + // signals the next completion. + continue; + } + + PendingOp op = std::move(q.front()); + q.pop_front(); + + // A single completion may have advanced the sequence past several queued + // ops, so re-queue this channel at the back: the next try_poll() re-checks + // its new front (and drops it if empty/not-ready) before trusting epoll. + if (!scan_channel_queued_[ch]) { + scan_channel_queued_[ch] = true; + scan_channels_.push_back(ch); + } + + uint64_t id = op.id; + return std::visit( + [id](auto &p) -> NonBlockResult { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return NonBlockTensorWriteResult{id, ret_to_err(p.ret)}; + } else if constexpr (std::is_same_v) { + return NonBlockTensorReadResult{id, std::move(p.data_obj), + ret_to_err(p.ret)}; + } else if constexpr (std::is_same_v) { + return NonBlockTensorWriteResult{id, rets_to_err(p.rets)}; + } else if constexpr (std::is_same_v) { + // Lifetime anchors stay in tensor_read_batched_prepared_[batch_id] + // so the same prepared batch can be _start'd again. Callers read + // the data via the dests array they passed to _prepare, so the + // result's data field is left default-constructed. + return NonBlockTensorReadResult{ + id, std::variant>{}, + rets_to_err(p.rets)}; + } else { + static_assert(std::is_same_v); + return NonBlockExecResult{id, ret_to_err(p.ret)}; + } + }, + op.op); } + + return std::nullopt; } NrtModel Spike::wrap_model(nrt_model_t *ptr) { @@ -456,159 +763,6 @@ NrtTensorSet Spike::wrap_tensor_set(nrt_tensor_set_t *ptr) { return NrtTensorSet(ptr); } -void Spike::loop_tensor(int core_id) { - while (true) { - NonBlockTensorCmd cmd_ = tensor_queues_[core_id].pop(); - - if (std::holds_alternative(cmd_)) { - NonBlockTensorReadCmd &cmd = std::get(cmd_); - - NonBlockTensorReadInternalResult res; - - try { - cmd.tensor->read(cmd.data, cmd.size, cmd.offset); - res.err = std::nullopt; - } catch (SpikeError &err) { - res.err = std::move(err); - } catch (NrtError &err) { - res.err = std::move(err); - } - - res.id = cmd.id; - res.tensor = std::move(cmd.tensor); - res.data_obj = std::move(cmd.data_obj); - - noti_queue_.emplace(std::move(res)); - } else if (std::holds_alternative(cmd_)) { - NonBlockTensorWriteCmd &cmd = std::get(cmd_); - - NonBlockTensorWriteInternalResult res; - - try { - cmd.tensor->write(cmd.data, cmd.size, cmd.offset); - res.err = std::nullopt; - } catch (SpikeError &err) { - res.err = std::move(err); - } catch (NrtError &err) { - res.err = std::move(err); - } - - res.id = cmd.id; - res.tensor = std::move(cmd.tensor); - res.data_obj = std::move(cmd.data_obj); - - noti_queue_.emplace(std::move(res)); - } else if (std::holds_alternative(cmd_)) { - NonBlockTensorWriteBatchedCmd &batched_cmd = - std::get(cmd_); - - NonBlockTensorWriteInternalResult res; - res.id = batched_cmd.id; - - std::shared_lock lk(tensor_write_batched_cmds_mtx_); - - auto &cmds = tensor_write_batched_cmds_[batched_cmd.batch_id]; - - for (auto &cmd : cmds) { - try { - cmd.tensor->write(cmd.data, cmd.size, cmd.offset); - res.err = std::nullopt; - } catch (SpikeError &err) { - res.err = std::move(err); - } catch (NrtError &err) { - res.err = std::move(err); - } - - if (res.err.has_value()) { - break; - } - } - - noti_queue_.emplace(std::move(res)); - } else if (std::holds_alternative(cmd_)) { - NonBlockTensorReadBatchedCmd &batched_cmd = - std::get(cmd_); - - NonBlockTensorReadInternalResult res; - res.id = batched_cmd.id; - - std::shared_lock lk(tensor_read_batched_cmds_mtx_); - - auto &cmds = tensor_read_batched_cmds_[batched_cmd.batch_id]; - - for (auto &cmd : cmds) { - try { - cmd.tensor->read(cmd.data, cmd.size, cmd.offset); - res.err = std::nullopt; - } catch (SpikeError &err) { - res.err = std::move(err); - } catch (NrtError &err) { - res.err = std::move(err); - } - - if (res.err.has_value()) { - break; - } - } - - noti_queue_.emplace(std::move(res)); - } else { - break; - } - } -} - -void Spike::loop_execute(int core_id) { - while (true) { - NonBlockExecOrCloseCmd cmd_ = exec_queues_[core_id].pop(); - - if (std::holds_alternative(cmd_)) { - NonBlockExecCmd &cmd = std::get(cmd_); - - NonBlockExecInternalResult res; - - try { - cmd.model->execute(*cmd.input_set, *cmd.output_set, cmd.ntff_name, - cmd.save_trace); - res.err = std::nullopt; - } catch (SpikeError &err) { - res.err = std::move(err); - } catch (NrtError &err) { - res.err = std::move(err); - } - - res.id = cmd.id; - res.model = std::move(cmd.model); - res.input_set = std::move(cmd.input_set); - res.output_set = std::move(cmd.output_set); - - noti_queue_.emplace(std::move(res)); - } else { - break; - } - } -} - -// Convert InternalResult to Result in GIL context. -// Moves nanobind objects to Result, leaving shared_ptrs in InternalResult to be -// destroyed here (with GIL held). This avoids calling Python object destructors -// in worker threads, preventing GIL deadlocks and context switch overhead. -NonBlockResult Spike::convert_internal_result(NonBlockInternalResult &internal_res_) { - if (std::holds_alternative(internal_res_)) { - NonBlockTensorReadInternalResult &internal_res = std::get(internal_res_); - NonBlockTensorReadResult res{internal_res.id, std::move(internal_res.data_obj), std::move(internal_res.err)}; - return res; - } else if (std::holds_alternative(internal_res_)) { - NonBlockTensorWriteInternalResult &internal_res = std::get(internal_res_); - NonBlockTensorWriteResult res{internal_res.id, std::move(internal_res.err)}; - return res; - } else { - NonBlockExecInternalResult &internal_res = std::get(internal_res_); - return NonBlockExecResult{internal_res.id, std::move(internal_res.err)}; - } -} - - std::string Spike::dtype_to_string(nrt_dtype_t dtype) { switch (dtype) { case NRT_DTYPE_FLOAT32: From dc7a734b3c67913fb7e8496a43b76fe05d6749ad Mon Sep 17 00:00:00 2001 From: Kelvin Ng Date: Fri, 19 Jun 2026 00:08:53 +0000 Subject: [PATCH 5/5] Update ASYNC_API.md to reflect new implementation; remove init_nonblock completely as it is no longer necessary --- spike/ASYNC_API.md | 72 ++++++++++++++++++++++++---------- spike/src/include/spike.h | 2 - spike/src/python_bindings.cpp | 3 -- spike/src/spike.cpp | 4 -- spike/src/spike/_spike.pyi | 3 -- spike/src/spike/spike_async.py | 1 - 6 files changed, 51 insertions(+), 34 deletions(-) diff --git a/spike/ASYNC_API.md b/spike/ASYNC_API.md index 0ff1bd9..3221850 100644 --- a/spike/ASYNC_API.md +++ b/spike/ASYNC_API.md @@ -173,7 +173,6 @@ The high-level API provides an asyncio-like interface with automatic dependency from spike import SpikeAsync spike_async = SpikeAsync(verbose_level=1) -# Nonblock mode is automatically initialized # Load models and allocate tensors model = spike_async.load_model("model.neff", core_id=0) @@ -358,7 +357,6 @@ The low-level API provides direct access to non-blocking operations. You must ma from spike import Spike spike = Spike(verbose_level=1) -spike.init_nonblock() # Initialize thread pools for async operations ``` ### Nonblocking Operations @@ -382,6 +380,11 @@ output_set = spike.create_tensor_set({"output": output_tensor}) exec_id = spike.execute_nonblock(model, input_set, output_set) ``` +> **Note:** `save_trace` is not supported for nonblocking execute. The +> `nrta_execute_schedule` API has no profiling hook, so the synchronous +> profiling path cannot be mapped onto an asynchronous schedule. Passing +> `save_trace=True` raises an error. + ### Polling for Results ```python @@ -457,18 +460,49 @@ while True: ## Architecture -### Thread Pools +The nonblock layer is built directly on the runtime's asynchronous `nrta_*` +APIs. Operations are submitted to the hardware from the calling thread, and +their completion is tracked with per-channel sequence numbers. + +### Direct Submission + +Each nonblocking call (`tensor_write_nonblock`, `tensor_read_nonblock`, +`execute_nonblock`, and the batched variants) submits its request synchronously +via the corresponding `nrta_*` API (`nrta_tensor_write`, `nrta_tensor_read`, +`nrta_execute_schedule`). Submission returns immediately with an operation ID; +the runtime carries out the work on the device and records completion status +through a pointer the caller supplies. + +### Per-Channel Sequence Queues -When `init_nonblock()` is called, Spike creates two thread pools: +Work is organized into `(lnc, xu)` channels — one per (NeuronCore, execution +unit) pair, where the execution unit is either `NRTA_XU_TENSOR_OP` (tensor +read/write) or `NRTA_XU_COMPUTE` (model execution). Each channel has a FIFO +queue of pending operations. -- **Tensor threads**: Handle tensor read/write operations (one per NeuronCore) -- **Execution threads**: Handle model execution (one per NeuronCore) +Within a channel, the runtime assigns monotonically increasing sequence numbers +to submitted requests. To check whether the operation at the front of a channel +queue is done, `try_poll()` compares its `wait_seq` against the latest completed +sequence reported by `nrta_get_sequence` for that channel. Because sequence +numbers are monotonic and queues are FIFO, only the front operation of each +channel needs checking. -Operations are dispatched to the appropriate thread based on the NeuronCore ID of the tensor or model. +A batched operation submits its N sub-requests back-to-back on the same channel, +so their sequence numbers are consecutive; the batch waits only on the last +sequence number, which implies all prior sub-requests have completed too. -### Result Queue +### epoll-Based Completion Multiplexing -Completed operations are pushed to a lock-free notification queue. The `try_poll()` method (or the async event loop) checks this queue for results. +To avoid probing every channel on each poll, each channel that has had work gets +an `eventfd` registered with the runtime via +`nrta_event_register_xu_completion()` and added to a single `epoll` instance. +The runtime signals a channel's eventfd whenever that channel completes a +sequence. + +`try_poll()` first calls `epoll_wait` (non-blocking) to learn which channels +made progress, then scans only those channels' front operations. Channels are +scanned in round-robin order so a continuously-busy channel cannot starve the +others. ### Async Event Loop @@ -478,9 +512,15 @@ The `SpikeAsyncEventLoop` integrates with the nonblock API: - The selector polls `try_poll()` periodically - When results arrive, corresponding futures are resolved -### InternalResult → Result Pattern +### Resource Lifetime -To avoid GIL overheads, the worker threads use `InternalResult` structures that hold shared_ptrs to keep resources alive. When polled from the main thread (with GIL held), these are converted to `Result` structures. This ensures nanobind/Python object destructors run in GIL context, not in worker threads. +Because the runtime writes completion status through a pointer after submission, +each pending operation lives in a `std::deque` (which never relocates existing +elements on `push_back`), keeping that pointer valid until the operation is +harvested. The pending op also anchors any Python-owned source/destination +buffers (and `shared_ptr`s to tensors/models) so they stay alive until +completion. All submission and harvesting runs on the calling thread with the +GIL held. ## Best Practices @@ -490,13 +530,3 @@ To avoid GIL overheads, the worker threads use `InternalResult` structures that 4. **Use explicit dependencies for complex graphs**: When you have multiple parallel streams that need to synchronize in complex patterns 5. **Batch operations when possible**: Batched operations reduce overhead for multiple small transfers 6. **Don't mix sync and async**: Once you submit async or non-blocking operations, submitting sync operations may cause unexpected behaviors. - -## Migration from Old API - -If you're migrating from the old `NeuronPy` codebase: - -- `SpikeCore` → `Spike` -- `spike_cpp` module → `_spike` module -- `device_id` → `core_id` (parameter rename) -- API is otherwise compatible at the low level -- High-level `SpikeAsync` API is unchanged diff --git a/spike/src/include/spike.h b/spike/src/include/spike.h index 19249e2..a968ea9 100644 --- a/spike/src/include/spike.h +++ b/spike/src/include/spike.h @@ -184,8 +184,6 @@ class Spike { bool save_trace = false); // Nonblocking operations - void init_nonblock(); - uint64_t tensor_write_nonblock(std::shared_ptr tensor, nb::bytes data_obj, size_t offset = 0); uint64_t tensor_write_nonblock(std::shared_ptr tensor, diff --git a/spike/src/python_bindings.cpp b/spike/src/python_bindings.cpp index 95ed53f..064eda1 100644 --- a/spike/src/python_bindings.cpp +++ b/spike/src/python_bindings.cpp @@ -235,9 +235,6 @@ NB_MODULE(_spike, m) { "memoryview, etc.)") // Nonblocking operations - .def("init_nonblock", &Spike::init_nonblock, - "Initialize for nonblocking operations") - .def( "tensor_read_nonblock", [](Spike &self, std::shared_ptr tensor, diff --git a/spike/src/spike.cpp b/spike/src/spike.cpp index 3eeb69c..d2a22bb 100644 --- a/spike/src/spike.cpp +++ b/spike/src/spike.cpp @@ -239,10 +239,6 @@ void Spike::execute(NrtModel &model, model.execute(input_set, output_set, ntff_name, save_trace); } -// Kept for backward compatibility with SpikeAsync. With the nrta_* backend we -// don't need any thread pools; there's nothing to initialize. -void Spike::init_nonblock() {} - uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, nb::bytes data_obj, size_t offset) { diff --git a/spike/src/spike/_spike.pyi b/spike/src/spike/_spike.pyi index 0f85892..01c8b2c 100644 --- a/spike/src/spike/_spike.pyi +++ b/spike/src/spike/_spike.pyi @@ -174,9 +174,6 @@ class Spike: Read data from tensor to Python buffer protocol object (bytearray, memoryview, etc.) """ - def init_nonblock(self) -> None: - """Initialize for nonblocking operations""" - @overload def tensor_read_nonblock(self, tensor: NrtTensor, offset: int = 0, size: int = 0) -> int: """Read data from tensor as bytes nonblockingly""" diff --git a/spike/src/spike/spike_async.py b/spike/src/spike/spike_async.py index d9b7bda..cab3096 100644 --- a/spike/src/spike/spike_async.py +++ b/spike/src/spike/spike_async.py @@ -124,7 +124,6 @@ class SpikeAsync: def __init__(self, verbose_level: int = 0) -> None: self.spike: Spike = Spike(verbose_level=verbose_level) - self.spike.init_nonblock() self._selector: SpikeAsyncSelector = SpikeAsyncSelector(self.spike) self._loop: SpikeAsyncEventLoop = SpikeAsyncEventLoop(self._selector)