diff --git a/spike/ASYNC_API.md b/spike/ASYNC_API.md new file mode 100644 index 0000000..3221850 --- /dev/null +++ b/spike/ASYNC_API.md @@ -0,0 +1,532 @@ +# Spike Async/Nonblock API + +This document describes the asynchronous and non-blocking APIs added to the Spike runtime. + +## Overview + +The Spike runtime supports two levels of asynchronous operation: + +1. **High-level Async API** (Recommended): Python asyncio-style interface with automatic dependency management +2. **Low-level Nonblock API** (Advanced): Direct control over non-blocking operations with manual polling + +**Most users should use the high-level Async API**, which provides a cleaner interface and automatic dependency management. + +## When to Use SpikeAsync + +### Overlapping Operations for Better Hardware Utilization + +A typical inference task follows a sequential pattern: **tensor write → execute → tensor read**. Done naively, each step blocks the next, leaving hardware idle. + +SpikeAsync lets you overlap these stages across iterations so that, while one iteration is executing on the device, the next is writing its inputs and the previous is reading its outputs: + +``` +tensor_write -> execute -> tensor_read + tensor_write -> execute -> tensor_read + tensor_write -> execute -> tensor_read +``` + +A more advanced example is **dynamic multi-LoRA**, where you can overlap loading an adapter from host memory with the execution of the current request — hiding the adapter load latency entirely. + +With SpikeAsync, this looks like natural sequential code: + +```python +async def inference(model, input_data, input_tensor, output_tensor): + await spike_async.tensor_write(input_tensor, input_data) + await spike_async.execute(model, input_set, output_set) + return await spike_async.tensor_read(output_tensor) + +for i in range(n): + spike_async.submit(inference(model, data[i], in_tensor[i], out_tensor[i])) +``` + +### Why Not Just Use Stream APIs? + +You may wonder whether CUDA-style stream APIs already solve this. They do provide asynchronous dispatch, but two problems make them awkward for this pattern. + +#### Problem 1: CPU Work Cannot Overlap Naturally + +Streams sequence GPU/device operations, but inserting CPU logic between async operations breaks the overlap. Consider a loop that runs a matmul on device, copies the result to host, then runs a CPU function on it: + +```cpp +// Attempt with CUDA streams +for (int i = 0; i < n; ++i) { + cudaStream_t stream; + cudaStreamCreate(&stream); + + matmul<<<..., stream>>>(dev_out[i], dev_in[i]); + cudaMemcpyAsync(host_out[i], dev_out[i], stream); + cpu_function(host_out[i]); // WRONG: runs before matmul/copy finish +} +``` + +Adding `cudaStreamSynchronize` fixes correctness but kills overlap — the CPU waits for each iteration before the next begins: + +``` +Matmul -> Copy -> CPU + Matmul -> Copy -> CPU + Matmul -> Copy -> CPU +``` + +Pulling the CPU work out after a `cudaDeviceSynchronize` improves device overlap, but all CPU work is serialized at the end — you cannot overlap CPU work with device work: + +``` +Matmul -> Copy + Matmul -> Copy + Matmul -> Copy + CPU -> CPU -> CPU +``` + +The core issue is that **stream APIs have no mechanism to resume CPU-side logic precisely when a specific prior operation completes**, without callbacks or restructuring the program into a state machine. + +SpikeAsync achieves the fully-overlapped target pipeline naturally: + +``` +Matmul -> Copy -> CPU + Matmul -> Copy -> CPU + Matmul -> Copy -> CPU +``` + +```python +async def pipeline(dev_in, dev_out): + await spike_async.execute(matmul_model, dev_in, dev_out) + host_out = await spike_async.tensor_read(dev_out) + cpu_function(host_out) + +for i in range(n): + spike_async.submit(pipeline(dev_in[i], dev_out[i])) +``` + +Each `await` suspends only that coroutine until the awaited operation finishes, while other submitted coroutines continue to make progress — including their CPU work. + +#### Problem 2: Fine-Grained Dependency Control Is Unnatural + +Stream APIs model dependencies coarsely: all operations in a stream are ordered, and cross-stream synchronization requires explicit events between streams. Consider this dependency graph, which arises naturally when two independent models feed into a third, while a fourth model only needs the second: + +``` +model_a ──┐ + ├──► model_c +model_b ──┘ + │ + └─────────► model_d +``` + +With CUDA streams, you must manually create streams, record events, and wire up waits: + +```cpp +cudaStream_t s1, s2, s3, s4; +cudaStreamCreate(&s1); +cudaStreamCreate(&s2); +cudaStreamCreate(&s3); +cudaStreamCreate(&s4); + +// Run model_a and model_b in parallel +run_model<<<..., s1>>>(model_a, ...); +run_model<<<..., s2>>>(model_b, ...); + +// Record events to mark completion +cudaEvent_t event_a, event_b; +cudaEventCreate(&event_a); +cudaEventCreate(&event_b); +cudaEventRecord(event_a, s1); +cudaEventRecord(event_b, s2); + +// model_c waits for both a and b +cudaStreamWaitEvent(s3, event_a); +cudaStreamWaitEvent(s3, event_b); +run_model<<<..., s3>>>(model_c, ...); + +// model_d waits only for b — easy to accidentally add event_a here too +cudaStreamWaitEvent(s4, event_b); +run_model<<<..., s4>>>(model_d, ...); + +// Cleanup +cudaEventDestroy(event_a); +cudaEventDestroy(event_b); +cudaStreamDestroy(s1); cudaStreamDestroy(s2); +cudaStreamDestroy(s3); cudaStreamDestroy(s4); +``` + +The dependency structure is buried in scattered `cudaStreamWaitEvent` calls. A misplaced wait or a forgotten event silently introduces wrong ordering or unnecessary serialization. + +With SpikeAsync's `deps=` parameter, the same graph is expressed directly: + +```python +fut_a = spike_async.execute(model_a, in_a, out_a) +fut_b = spike_async.execute(model_b, in_b, out_b) + +# model_c starts only after both a and b finish +fut_c = spike_async.execute(model_c, in_c, out_c, deps=[fut_a, fut_b]) + +# model_d starts as soon as b finishes, independent of a or c +fut_d = spike_async.execute(model_d, in_d, out_d, deps=[fut_b]) +``` + +Each operation declares exactly what it depends on, co-located with the operation itself. There are no streams to create, no events to record, and no risk of accidentally over-constraining or under-constraining the graph. + +## High-Level Async API + +The high-level API provides an asyncio-like interface with automatic dependency tracking and stream support. + +### Initialization + +```python +from spike import SpikeAsync + +spike_async = SpikeAsync(verbose_level=1) + +# Load models and allocate tensors +model = spike_async.load_model("model.neff", core_id=0) +input_tensor = spike_async.allocate_tensor(1024 * 1024, core_id=0) +output_tensor = spike_async.allocate_tensor(1024 * 1024, core_id=0) +``` + +### Basic Operations + +All operations return a Future that can be awaited: + +```python +# Tensor operations +write_fut = spike_async.tensor_write(tensor, data) +read_fut = spike_async.tensor_read(tensor) + +# Model execution +exec_fut = spike_async.execute(model, input_set, output_set) + +# Wait for completion (blocking) +data = read_fut.wait() +``` + +### Async/Await Style Programming (Recommended) + +The most powerful feature is using async/await to write sequential-looking code that runs asynchronously. You can `await` futures directly, making your code clean and readable: + +```python +async def inference_pipeline(model, input_data, input_tensor, output_tensor): + # Write sequential code, but it runs asynchronously! + # Each await releases control, allowing other operations to run + + # Prepare input data + await spike_async.tensor_write(input_tensor, input_data) + + # Run inference + input_set = spike_async.create_tensor_set({"input": input_tensor}) + output_set = spike_async.create_tensor_set({"output": output_tensor}) + await spike_async.execute(model, input_set, output_set) + + # Read results + output_data = await spike_async.tensor_read(output_tensor) + + return output_data + +# Run TWO pipelines concurrently +pipeline1 = spike_async.submit( + inference_pipeline(model1, data1, input_tensor1, output_tensor1) +) +pipeline2 = spike_async.submit( + inference_pipeline(model2, data2, input_tensor2, output_tensor2) +) + +# Both pipelines are now running concurrently! +# While pipeline1 is executing the model, pipeline2 can do tensor I/O +result1 = pipeline1.wait() +result2 = pipeline2.wait() +``` + +**Why this is powerful:** +- Code reads like synchronous operations, but executes asynchronously +- Multiple pipelines run concurrently without manual dependency tracking +- Different operation types (tensor I/O, model execution) can overlap, maximizing hardware utilization +- You can mix async Spike operations with regular Python async operations + +### Explicit Dependency Management + +Alternatively, you can explicitly specify dependencies between operations: + +```python +# Write, then read +write_fut = spike_async.tensor_write(tensor, data) +read_fut = spike_async.tensor_read(tensor, deps=[write_fut]) + +# Chain multiple operations +fut1 = spike_async.execute(model1, in1, out1) +fut2 = spike_async.execute(model2, in2, out2, deps=[fut1]) +fut3 = spike_async.execute(model3, in3, out3, deps=[fut1, fut2]) +``` + +This approach is useful when you need fine-grained control or want to build complex DAGs without writing async functions. + +### Streams + +Streams provide automatic sequencing of operations - all operations in a stream execute in order: + +```python +# All operations in a stream are sequenced +with spike_async.create_stream() as stream: + spike_async.tensor_write(tensor1, data1) + spike_async.tensor_write(tensor2, data2) + spike_async.execute(model, inputs, outputs) + +# Wait for stream to complete +stream.wait() +``` + +Multiple streams can run concurrently: + +```python +streams = [] +for i in range(num_parallel): + with spike_async.create_stream() as stream: + # Operations in this stream + spike_async.execute(models[i], inputs[i], outputs[i]) + streams.append(stream) + +# Wait for all streams +for stream in streams: + stream.wait() +``` + +### Stream Events + +Synchronize between streams using events: + +```python +with spike_async.create_stream() as stream1: + spike_async.tensor_write(tensor, data) + event = stream1.record_event() # Capture current point + +with spike_async.create_stream() as stream2: + stream2.wait_event(event) # Wait for stream1's event + spike_async.execute(model, inputs, outputs) +``` + +### Batched Operations + +For efficiency, you can batch multiple tensor operations: + +**Batched Writes:** +```python +# Prepare a batch of writes +batch_id = spike_async.spike.tensor_write_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + data_objs=[data1, data2, data3], + offsets=[0, 0, 0] # Optional +) + +# Start the batch operation (returns a single future) +write_fut = spike_async.tensor_write_batched_start(batch_id) +write_fut.wait() # Wait for all writes to complete +``` + +**Batched Reads:** +```python +# Prepare a batch of reads +batch_id = spike_async.spike.tensor_read_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + dests=[dest1, dest2, dest3], # Pre-allocated numpy arrays + offsets=[0, 0, 0], # Optional + sizes=[size1, size2, size3] # Optional +) + +# Start the batch operation (returns a single future) +read_fut = spike_async.tensor_read_batched_start(batch_id) +read_fut.wait() # Wait for all reads to complete +``` + +Batched operations reduce overhead when you have many small operations. + +### Waiting for Multiple Operations + +```python +# Launch multiple operations +futs = [] +for i in range(10): + fut = spike_async.execute(model, inputs[i], outputs[i]) + futs.append(fut) + +# Wait for all +all_results = await spike_async.all(futs) +``` + +## Low-Level Nonblock API + +The low-level API provides direct access to non-blocking operations. You must manually poll for completion. **Use this only if you need fine-grained control.** + +### Initialization + +```python +from spike import Spike + +spike = Spike(verbose_level=1) +``` + +### Nonblocking Operations + +**Tensor Write:** +```python +# Returns operation ID +write_id = spike.tensor_write_nonblock(tensor, data, offset=0) +``` + +**Tensor Read:** +```python +# Returns operation ID +read_id = spike.tensor_read_nonblock(tensor, offset=0, size=0) +``` + +**Model Execution:** +```python +input_set = spike.create_tensor_set({"input": input_tensor}) +output_set = spike.create_tensor_set({"output": output_tensor}) +exec_id = spike.execute_nonblock(model, input_set, output_set) +``` + +> **Note:** `save_trace` is not supported for nonblocking execute. The +> `nrta_execute_schedule` API has no profiling hook, so the synchronous +> profiling path cannot be mapped onto an asynchronous schedule. Passing +> `save_trace=True` raises an error. + +### Polling for Results + +```python +while True: + result = spike.try_poll() # Non-blocking poll + if result is not None: + # Check result type + if isinstance(result, NonBlockTensorWriteResult): + if result.err is None: + print(f"Write {result.id} completed successfully") + else: + print(f"Write {result.id} failed: {result.err}") + elif isinstance(result, NonBlockTensorReadResult): + if result.err is None: + data = result.data # Retrieved data + else: + print(f"Read {result.id} failed: {result.err}") + elif isinstance(result, NonBlockExecResult): + if result.err is None: + print(f"Execution {result.id} completed successfully") + else: + print(f"Execution {result.id} failed: {result.err}") + + # Check if this is the operation we're waiting for + if result.id == target_id: + break +``` + +### Batched Operations (Low-Level) + +**Batched Writes:** +```python +# Prepare batch +batch_id = spike.tensor_write_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + data_objs=[data1, data2, data3], + offsets=[0, 0, 0] # Optional +) + +# Start the batch (returns single operation ID) +write_id = spike.tensor_write_nonblock_batched_start(batch_id) + +# Poll for single completion +while True: + result = spike.try_poll() + if result is not None and result.id == write_id: + break +``` + +**Batched Reads:** +```python +# Prepare batch +batch_id = spike.tensor_read_nonblock_batched_prepare( + tensors=[tensor1, tensor2, tensor3], + dests=[dest1, dest2, dest3], # Pre-allocated numpy arrays + offsets=[0, 0, 0], # Optional + sizes=[size1, size2, size3] # Optional +) + +# Start the batch (returns single operation ID) +read_id = spike.tensor_read_nonblock_batched_start(batch_id) + +# Poll for completion +while True: + result = spike.try_poll() + if result is not None and result.id == read_id: + if result.err is None: + # All reads completed successfully + # Data is already in dest1, dest2, dest3 + pass + break +``` + +## Architecture + +The nonblock layer is built directly on the runtime's asynchronous `nrta_*` +APIs. Operations are submitted to the hardware from the calling thread, and +their completion is tracked with per-channel sequence numbers. + +### Direct Submission + +Each nonblocking call (`tensor_write_nonblock`, `tensor_read_nonblock`, +`execute_nonblock`, and the batched variants) submits its request synchronously +via the corresponding `nrta_*` API (`nrta_tensor_write`, `nrta_tensor_read`, +`nrta_execute_schedule`). Submission returns immediately with an operation ID; +the runtime carries out the work on the device and records completion status +through a pointer the caller supplies. + +### Per-Channel Sequence Queues + +Work is organized into `(lnc, xu)` channels — one per (NeuronCore, execution +unit) pair, where the execution unit is either `NRTA_XU_TENSOR_OP` (tensor +read/write) or `NRTA_XU_COMPUTE` (model execution). Each channel has a FIFO +queue of pending operations. + +Within a channel, the runtime assigns monotonically increasing sequence numbers +to submitted requests. To check whether the operation at the front of a channel +queue is done, `try_poll()` compares its `wait_seq` against the latest completed +sequence reported by `nrta_get_sequence` for that channel. Because sequence +numbers are monotonic and queues are FIFO, only the front operation of each +channel needs checking. + +A batched operation submits its N sub-requests back-to-back on the same channel, +so their sequence numbers are consecutive; the batch waits only on the last +sequence number, which implies all prior sub-requests have completed too. + +### epoll-Based Completion Multiplexing + +To avoid probing every channel on each poll, each channel that has had work gets +an `eventfd` registered with the runtime via +`nrta_event_register_xu_completion()` and added to a single `epoll` instance. +The runtime signals a channel's eventfd whenever that channel completes a +sequence. + +`try_poll()` first calls `epoll_wait` (non-blocking) to learn which channels +made progress, then scans only those channels' front operations. Channels are +scanned in round-robin order so a continuously-busy channel cannot starve the +others. + +### Async Event Loop + +The `SpikeAsyncEventLoop` integrates with the nonblock API: + +- Futures are registered with operation IDs +- The selector polls `try_poll()` periodically +- When results arrive, corresponding futures are resolved + +### Resource Lifetime + +Because the runtime writes completion status through a pointer after submission, +each pending operation lives in a `std::deque` (which never relocates existing +elements on `push_back`), keeping that pointer valid until the operation is +harvested. The pending op also anchors any Python-owned source/destination +buffers (and `shared_ptr`s to tensors/models) so they stay alive until +completion. All submission and harvesting runs on the calling thread with the +GIL held. + +## Best Practices + +1. **Use the high-level Async API**: It's simpler and handles dependency management automatically +2. **Use async/await for readable concurrent code**: Multiple pipelines can overlap tensor I/O with model execution +3. **Use streams for simple sequencing**: Streams are the easiest way to ensure operations execute in order +4. **Use explicit dependencies for complex graphs**: When you have multiple parallel streams that need to synchronize in complex patterns +5. **Batch operations when possible**: Batched operations reduce overhead for multiple small transfers +6. **Don't mix sync and async**: Once you submit async or non-blocking operations, submitting sync operations may cause unexpected behaviors. diff --git a/spike/src/include/nrt_wrapper.h b/spike/src/include/nrt_wrapper.h index b5f3970..7064cfe 100644 --- a/spike/src/include/nrt_wrapper.h +++ b/spike/src/include/nrt_wrapper.h @@ -10,6 +10,89 @@ #include #include +extern "C" { + +// Underlying tensor and model declaration copied from NRT +// This is a temporary hack for implementing nonblocking/async operations, +// as I will need some underlying info that is not exposed +// Will not need these after explicit async is ready and stable from NRT + +#define DX_CACHE_ALIGNED __attribute__((aligned(64))) + +typedef enum nrt_tensor_mem_type { + NRT_TENSOR_MEM_TYPE_INVALID = 0, + NRT_TENSOR_MEM_TYPE_MALLOC, + NRT_TENSOR_MEM_TYPE_DMA, + NRT_TENSOR_MEM_TYPE_FAKE, +} nrt_tensor_mem_type_t; + +// Memory, host or device that is used by +// a tensor. The memory is ref counted and can be shared among +// multiple tensors. +typedef struct nrt_tensor_storage { + uint32_t hbm_idx; + size_t allocated_size; + nrt_tensor_mem_type_t type; + union { + void *dmem; // dmem associated with addr, for tensor type + // NRT_TENSOR_MEM_TYPE_DMA + uint8_t + *vmem; // malloc'ed memory for tensor type NRT_TENSOR_MEM_TYPE_MALLOC + }; + volatile uint64_t ref_count DX_CACHE_ALIGNED; + bool mem_owned_by_tensor; + + pthread_mutex_t tensor_op_cv_lock; // Lock for async exec. Used with + // `tensor_op_cv` to block the thread while + // there are still pending execs. If this + // is NULL we are not in async exec mode. + pthread_cond_t tensor_op_cv; // used to block tensor op vars + volatile uint64_t pending_exec_count_read + DX_CACHE_ALIGNED; // count of pending execs that reads this location + volatile uint64_t pending_exec_count_write + DX_CACHE_ALIGNED; // count of pending execs that writes to this location + int32_t vtpb_idx; // same as vcore->vtpb_idx but -1 if no vcore for tensor + // (used for trace api) +} nrt_tensor_storage_t; + +typedef struct nrt_tensor { + char *name; // optional name + nrt_tensor_storage_t *sto; // the actual memory represented by the tensor + // don't access directly, use helper functions to ensure correctness + // params below allow a tensor to represent a slice of the memory + // pointed by "sto" + size_t _offset; // offset within the storage + size_t _size; // tensor size + void *extra; // used to store any metadata needed by the runtime + + volatile uint64_t ref_count + DX_CACHE_ALIGNED; // refcount for tensor. Only when this is 0 can we free + // the tensor it is incremented by + // `tensor_get_reference` and decremented by + // `tensor_free`. Tensor will automatically be freed in + // `tensor_free` once ref_count is zero. + volatile uint64_t output_completion_count + DX_CACHE_ALIGNED; // used to track the completion count of an output + // tensor. 0 means not complete; 1 and above means the + // number of completions +} nrt_tensor_t; + +typedef struct H_NN { + uint32_t id; +} H_NN; + +struct nrt_model { + uint32_t start_vnc; // VirtualNeuronCore start index + uint32_t vnc_count; // number of VirtualNeuronCore(s) requested + uint32_t instance_index; // instance index which will execute on the next call + // to nrt_execute + uint32_t instance_count; // number of loaded instances + uint32_t gid; // global id, for debug + char name[256]; + H_NN h_nn[]; // kmgr model id (instance_count entries) +}; +} + namespace spike { // RAII wrapper for NRT runtime @@ -26,6 +109,7 @@ class NrtRuntime { NrtRuntime &operator=(NrtRuntime &&) = default; static uint32_t get_visible_nc_count(); + static uint32_t get_total_nc_count(); private: bool initialized_; diff --git a/spike/src/include/spike.h b/spike/src/include/spike.h index e9f1cff..a968ea9 100644 --- a/spike/src/include/spike.h +++ b/spike/src/include/spike.h @@ -4,11 +4,24 @@ #include "model.h" #include "nrt_wrapper.h" #include "tensor.h" +#include "tensor_set.h" + +#include + +#include +#include + +#include +#include +#include #include #include #include +#include #include +namespace nb = nanobind; + namespace spike { // Tensor metadata structure @@ -24,6 +37,104 @@ struct ModelTensorInfo { std::unordered_map outputs; }; +// Prepared-batch records. These only hold the arguments to be used when +// `_batched_start` is called; at start time we just submit one nrta_* request +// per entry and group them under a single cmd_id. +struct PreparedTensorWrite { + std::shared_ptr tensor; + const void *data; + size_t size; + size_t offset; + std::variant> data_obj; +}; + +struct PreparedTensorRead { + std::shared_ptr tensor; + size_t offset; + size_t size; + void *data; + std::variant> data_obj; +}; + +// Pending nonblocking operations. Each cmd_id we hand back to Python maps to +// one PendingOp. The id and wait_seq are common to every kind of op, so they +// live directly on PendingOp; the per-kind payload is the variant below. The +// batched variants hold N sub-requests submitted back-to-back on the same +// (lnc, xu, queue=0); their seq numbers are consecutive, so wait_seq stores +// the last one and a completed wait_seq implies every prior sub-request is +// complete too. + +// The nrta_* APIs store the op's completion status through the `ret` pointer +// *after* submission, so `ret` must outlive the call. Callers therefore enqueue +// the PendingOp first and pass a pointer into the deque-resident copy (see +// enqueue_pending); std::deque never relocates existing elements on push_back, +// so that address stays valid until the op is harvested. +struct PendingTensorWrite { + NRT_STATUS ret; + std::shared_ptr tensor; + // Anchors the Python-owned source buffer until the op completes. Empty for + // the raw-pointer overload (caller manages the buffer lifetime). + std::optional>> data_obj; +}; + +struct PendingTensorRead { + NRT_STATUS ret; + std::shared_ptr tensor; + // The destination buffer; also returned to Python via + // NonBlockTensorReadResult.data. + std::variant> data_obj; +}; + +struct PendingTensorWriteBatched { + // Lifetime anchors live in tensor_write_batched_prepared_[batch_id], which + // persists until close() (or a future explicit-release API) so the same + // prepared batch can be _start'd many times. + uint64_t batch_id; + std::vector rets; +}; + +struct PendingTensorReadBatched { + uint64_t batch_id; + std::vector rets; +}; + +struct PendingExecute { + NRT_STATUS ret; + std::shared_ptr model; + std::shared_ptr input_set; + std::shared_ptr output_set; +}; + +struct PendingOp { + uint64_t id; + nrta_seq_t wait_seq; + std::variant + op; +}; + +// NonBlock result structures (exposed to Python) +struct NonBlockTensorReadResult { + uint64_t id; + std::variant> data; + std::optional> err; +}; + +struct NonBlockTensorWriteResult { + uint64_t id; + std::optional> err; +}; + +struct NonBlockExecResult { + uint64_t id; + std::optional> err; +}; + +typedef std::variant + NonBlockResult; + // Main Spike class - Python interface class Spike { public: @@ -72,6 +183,53 @@ class Spike { std::optional ntff_name = std::nullopt, bool save_trace = false); + // Nonblocking operations + uint64_t tensor_write_nonblock(std::shared_ptr tensor, + nb::bytes data_obj, size_t offset = 0); + uint64_t tensor_write_nonblock(std::shared_ptr tensor, + nb::ndarray<> data_obj, + size_t offset = 0); + uint64_t tensor_write_nonblock(std::shared_ptr tensor, + const void *data, size_t size, + size_t offset); + + uint64_t tensor_read_nonblock(std::shared_ptr tensor, + size_t offset = 0, size_t size = 0); + uint64_t tensor_read_nonblock(std::shared_ptr tensor, + nb::ndarray<> dest, size_t offset = 0, + size_t size = 0); + + uint64_t tensor_write_nonblock_batched_prepare( + std::vector> tensors, + std::vector> data_objs, + std::optional> offsets); + uint64_t tensor_write_nonblock_batched_start(uint64_t batch_id); + + uint64_t tensor_read_nonblock_batched_prepare( + std::vector> tensors, + std::vector> dests, + std::optional> offsets, + std::optional> sizes); + uint64_t tensor_read_nonblock_batched_start(uint64_t batch_id); + + uint64_t + execute_nonblock(std::shared_ptr model, + std::shared_ptr input_set, + std::shared_ptr output_set, + std::optional ntff_name = std::nullopt, + bool save_trace = false); + + std::optional try_poll(); + + NrtTensorSet create_tensor_set( + const std::unordered_map> + &tensor_map); + + // Wrap existing NRT objects (for interop with external code) + NrtModel wrap_model(nrt_model_t *ptr); + NrtTensor wrap_tensor(nrt_tensor_t *ptr); + NrtTensorSet wrap_tensor_set(nrt_tensor_set_t *ptr); + // Model introspection ModelTensorInfo get_tensor_info(NrtModel &model); @@ -79,10 +237,74 @@ class Spike { int verbose_level_; std::unique_ptr runtime_; + // Nonblock state + uint64_t next_non_block_id_ = 0; + uint64_t next_batch_id_ = 0; + + // One pending-op queue per (lnc, xu) channel. Each queue is FIFO by + // submission order, and within a queue nrta_seq_t values are monotonically + // increasing, so a channel only needs its front op's wait_seq checked + // against nrta_get_sequence's latest-completed seq for that channel. + static constexpr uint32_t MAX_LNC = 128; + static constexpr uint32_t NUM_CHANNELS = MAX_LNC * NRTA_XU_TYPE_NUM; + std::array, NRTA_XU_TYPE_NUM>, MAX_LNC> + xu_queues_; + + // epoll-based completion multiplexing. Each (lnc, xu) channel that has ever + // had work gets one eventfd, registered with the runtime via + // nrta_event_register_xu_completion() and added to a single epoll instance. + // The runtime signals a channel's eventfd whenever that XU completes any + // sequence, so try_poll() can ask epoll which channels made progress instead + // of probing all MAX_LNC * NRTA_XU_TYPE_NUM channels every call. + int epoll_fd_ = -1; + // Per-channel eventfd, -1 until the channel is first registered. + std::array channel_event_fds_; + + // Channels whose front op may be ready to harvest. Populated by epoll_wait + // (each signaled eventfd) and by try_poll itself (a channel stays queued + // after a successful harvest, since one eventfd signal can cover several + // completed ops). Removed once the front is found not-yet-complete or the + // queue empties. + // + // A FIFO queue (rather than an ordered set) gives round-robin fairness: every + // try_poll() pops the front channel, and a channel that still has pollable + // work is pushed to the back, so a continuously-busy low-index channel can't + // starve the others. scan_channel_queued_ tracks membership in O(1) so a + // channel is never enqueued twice. + std::deque scan_channels_; + std::array scan_channel_queued_; + + static constexpr uint32_t channel_index(uint32_t lnc, uint32_t xu) { + return lnc * NRTA_XU_TYPE_NUM + xu; + } + + // Lazily creates the epoll instance and the channel's eventfd, registering + // the latter with the runtime and the epoll set. Idempotent per channel. + void ensure_channel_registered(uint32_t lnc, nrta_xu_t xu); + + // Prepared batches (data kept alive between _prepare and _start). + std::unordered_map> + tensor_write_batched_prepared_; + + std::unordered_map> + tensor_read_batched_prepared_; + + // Appends op to the (lnc, xu) queue and returns a reference to the enqueued + // element. std::deque never relocates existing elements on push_back, so the + // returned reference (and pointers into its ret field) stay valid until the + // op is popped in try_poll. Scalar callers enqueue *before* submitting so the + // nrta_* call can write completion status straight into the queued op. + PendingOp &enqueue_pending(uint32_t lnc, nrta_xu_t xu, PendingOp op); + // Helper methods - NrtTensorSet create_tensor_sets( + NrtTensorSet create_tensor_set( const std::unordered_map &tensor_map); std::string dtype_to_string(nrt_dtype_t dtype); + + static std::optional> + ret_to_err(NRT_STATUS ret); + static std::optional> + rets_to_err(const std::vector &rets); }; } // namespace spike diff --git a/spike/src/include/tensor.h b/spike/src/include/tensor.h index 6a8d190..1f2ea00 100644 --- a/spike/src/include/tensor.h +++ b/spike/src/include/tensor.h @@ -16,6 +16,9 @@ class NrtTensor { // This constructor creates an NrtTensor that owns the tensor NrtTensor(nrt_tensor_placement_t placement, uint32_t core_id, size_t size, const std::string &name, const Spike *spike); + // This constructor creates an NrtTensor that references an existing tensor + NrtTensor(nrt_tensor_t *ptr, uint32_t core_id, uint64_t size, + const std::string &name, const Spike *spike); NrtTensor(const NrtTensor &source, size_t offset, size_t size, const std::string &name); ~NrtTensor(); diff --git a/spike/src/include/tensor_set.h b/spike/src/include/tensor_set.h index 435b5ec..418817c 100644 --- a/spike/src/include/tensor_set.h +++ b/spike/src/include/tensor_set.h @@ -2,14 +2,21 @@ #define SPIKE_SRC_INCLUDE_TENSOR_SET_H #include "tensor.h" +#include #include +#include namespace spike { +class Spike; + // RAII wrapper for NRT tensor set class NrtTensorSet { public: - NrtTensorSet(); + // Constructor that creates an owned tensor set + explicit NrtTensorSet(const Spike *spike); + // Constructor to wrap an existing tensor set (non-owning) + explicit NrtTensorSet(nrt_tensor_set_t *ptr); ~NrtTensorSet(); // Non-copyable, movable @@ -18,11 +25,19 @@ class NrtTensorSet { NrtTensorSet(NrtTensorSet &&other) noexcept; NrtTensorSet &operator=(NrtTensorSet &&other) noexcept; + bool is_freed() const; + bool is_owner() const { return spike_ != nullptr; } + + void add_tensor(const std::string &name, + std::shared_ptr tensor); void add_tensor(const std::string &name, const NrtTensor &tensor); nrt_tensor_set_t *get_ptr() const { return ptr_; } + void free(); private: nrt_tensor_set_t *ptr_; + const Spike *spike_; + std::vector> tensors_; }; } // namespace spike diff --git a/spike/src/nrt_wrapper.cpp b/spike/src/nrt_wrapper.cpp index f2478fd..f6f19a0 100644 --- a/spike/src/nrt_wrapper.cpp +++ b/spike/src/nrt_wrapper.cpp @@ -74,4 +74,13 @@ uint32_t NrtRuntime::get_visible_nc_count() { return count; } +uint32_t NrtRuntime::get_total_nc_count() { + uint32_t count = 0; + NRT_STATUS status = nrt_get_total_nc_count(&count); + if (status != 0) { + throw NrtError(status, "Failed to get total NC count"); + } + return count; +} + } // namespace spike diff --git a/spike/src/python_bindings.cpp b/spike/src/python_bindings.cpp index 904f612..064eda1 100644 --- a/spike/src/python_bindings.cpp +++ b/spike/src/python_bindings.cpp @@ -3,8 +3,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -97,6 +99,23 @@ NB_MODULE(_spike, m) { "Is collective model") .def("__repr__", &NrtModel::to_string); + // NrtTensorSet class + nb::class_(m, "NrtTensorSet"); + + // NonBlock result structures + nb::class_(m, "NonBlockTensorReadResult") + .def_ro("id", &NonBlockTensorReadResult::id) + .def_ro("data", &NonBlockTensorReadResult::data) + .def_ro("err", &NonBlockTensorReadResult::err); + + nb::class_(m, "NonBlockTensorWriteResult") + .def_ro("id", &NonBlockTensorWriteResult::id) + .def_ro("err", &NonBlockTensorWriteResult::err); + + nb::class_(m, "NonBlockExecResult") + .def_ro("id", &NonBlockExecResult::id) + .def_ro("err", &NonBlockExecResult::err); + // Spike class - uses keep_alive for safe shared ownership with tensors/models nb::class_(m, "Spike") .def(nb::init(), "verbose_level"_a = 0, @@ -215,6 +234,114 @@ NB_MODULE(_spike, m) { "Read data from tensor to Python buffer protocol object (bytearray, " "memoryview, etc.)") + // Nonblocking operations + .def( + "tensor_read_nonblock", + [](Spike &self, std::shared_ptr tensor, + size_t offset, size_t size) { + return self.tensor_read_nonblock(std::move(tensor), offset, size); + }, + "tensor"_a, "offset"_a = 0, "size"_a = 0, + "Read data from tensor as bytes nonblockingly") + + .def( + "tensor_read_nonblock", + [](Spike &self, std::shared_ptr tensor, + nb::ndarray<> dest, size_t offset, size_t size) { + return self.tensor_read_nonblock(std::move(tensor), std::move(dest), + offset, size); + }, + "tensor"_a, "dest"_a, "offset"_a = 0, "size"_a = 0, + "Read data from tensor into the provided destination nonblockingly") + + .def( + "tensor_write_nonblock", + [](Spike &self, std::shared_ptr tensor, nb::bytes data_obj, + size_t offset) { + return self.tensor_write_nonblock(std::move(tensor), + std::move(data_obj), offset); + }, + "tensor"_a, "data"_a, "offset"_a = 0, + "Write bytes data to tensor nonblockingly") + + .def( + "tensor_write_nonblock", + [](Spike &self, std::shared_ptr tensor, + nb::ndarray<> data_obj, size_t offset) { + return self.tensor_write_nonblock(std::move(tensor), + std::move(data_obj), offset); + }, + "tensor"_a, "data"_a, "offset"_a = 0, + "Write ndarray data to tensor nonblockingly") + + .def( + "tensor_write_nonblock", + [](Spike &self, std::shared_ptr tensor, int64_t data, + size_t size, size_t offset) { + return self.tensor_write_nonblock( + std::move(tensor), reinterpret_cast(data), size, + offset); + }, + "tensor"_a, "data"_a, "size"_a, "offset"_a = 0, + "Write raw pointer data to tensor nonblockingly") + + .def("tensor_write_nonblock_batched_prepare", + &Spike::tensor_write_nonblock_batched_prepare, "tensors"_a, + "data_objs"_a, "offsets"_a = std::nullopt, + "Prepare a batched tensor write") + + .def("tensor_write_nonblock_batched_start", + &Spike::tensor_write_nonblock_batched_start, "batch_id"_a, + "Start a prepared batched tensor write") + + .def("tensor_read_nonblock_batched_prepare", + &Spike::tensor_read_nonblock_batched_prepare, "tensors"_a, "dests"_a, + "offsets"_a = std::nullopt, "sizes"_a = std::nullopt, + "Prepare a batched tensor read") + + .def("tensor_read_nonblock_batched_start", + &Spike::tensor_read_nonblock_batched_start, "batch_id"_a, + "Start a prepared batched tensor read") + + .def("execute_nonblock", &Spike::execute_nonblock, "model"_a, "inputs"_a, + "outputs"_a, nb::arg("ntff_name") = nb::none(), + "save_trace"_a = false, + "Execute a model with given inputs and outputs nonblockingly") + + .def("try_poll", &Spike::try_poll, "Try to poll for nonblocking results") + + .def( + "create_tensor_set", + [](Spike &self, + const std::unordered_map< + std::string, std::shared_ptr> &tensor_map) { + return self.create_tensor_set(tensor_map); + }, + "tensors"_a, "Create a tensor set with the tensors") + + // Wrap existing NRT objects (for interop with external code) + .def( + "wrap_model", + [](Spike &self, int64_t ptr) { + return self.wrap_model(reinterpret_cast(ptr)); + }, + "ptr"_a, "Wrap an existing NRT model pointer") + + .def( + "wrap_tensor", + [](Spike &self, int64_t ptr) { + return self.wrap_tensor(reinterpret_cast(ptr)); + }, + "ptr"_a, "Wrap an existing NRT tensor pointer") + + .def( + "wrap_tensor_set", + [](Spike &self, int64_t ptr) { + return self.wrap_tensor_set( + reinterpret_cast(ptr)); + }, + "ptr"_a, "Wrap an existing NRT tensor set pointer") + // Model introspection .def("get_tensor_info", &Spike::get_tensor_info, "model"_a, "Get tensor information for a model"); diff --git a/spike/src/spike.cpp b/spike/src/spike.cpp index 82efb6b..d2a22bb 100644 --- a/spike/src/spike.cpp +++ b/spike/src/spike.cpp @@ -1,5 +1,10 @@ #include "spike.h" +#include +#include #include +#include +#include +#include namespace spike { @@ -15,6 +20,10 @@ Spike::Spike(int verbose_level) "tensors/models first."); } + // Channel eventfds are created lazily on first use; -1 means unregistered. + channel_event_fds_.fill(-1); + scan_channel_queued_.fill(false); + // RAII: Initialize NRT in constructor if (verbose_level_ > 0) { std::cout << "Initializing SPIKE Runtime" << std::endl; @@ -35,11 +44,105 @@ uint32_t Spike::get_visible_neuron_core_count() { } int Spike::close() { + for (auto &per_lnc : xu_queues_) { + for (auto &q : per_lnc) { + q.clear(); + } + } + tensor_write_batched_prepared_.clear(); + tensor_read_batched_prepared_.clear(); + + // Tear down the epoll multiplexing state. Deregister each channel's eventfd + // from the runtime (negative fd deregisters) before closing it. + for (uint32_t lnc = 0; lnc < MAX_LNC; ++lnc) { + for (uint32_t xu = 0; xu < NRTA_XU_TYPE_NUM; ++xu) { + int &fd = channel_event_fds_[channel_index(lnc, xu)]; + if (fd < 0) { + continue; + } + nrta_event_register_xu_completion(static_cast(lnc), + static_cast(xu), + /*queue=*/0, /*fd=*/-1); + ::close(fd); + fd = -1; + } + } + if (epoll_fd_ >= 0) { + ::close(epoll_fd_); + epoll_fd_ = -1; + } + scan_channels_.clear(); + scan_channel_queued_.fill(false); + runtime_.reset(); g_alive_spike_instance_exists = false; return 0; } +void Spike::ensure_channel_registered(uint32_t lnc, nrta_xu_t xu) { + if (lnc >= MAX_LNC) { + throw SpikeError("LNC index exceeds MAX_LNC."); + } + if (xu >= NRTA_XU_TYPE_NUM) { + throw SpikeError("XU index out of range."); + } + + if (epoll_fd_ < 0) { + epoll_fd_ = epoll_create1(EPOLL_CLOEXEC); + if (epoll_fd_ < 0) { + throw SpikeError(std::string("epoll_create1 failed: ") + + std::strerror(errno)); + } + } + + int &fd = channel_event_fds_[channel_index(lnc, xu)]; + if (fd >= 0) { + return; // Already registered. + } + + fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (fd < 0) { + throw SpikeError(std::string("eventfd failed: ") + std::strerror(errno)); + } + + // The data carries the channel index so epoll_wait tells us exactly which + // (lnc, xu) channel signaled. + struct epoll_event ev; + ev.events = EPOLLIN; + ev.data.u32 = channel_index(lnc, xu); + if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) { + int err = errno; + ::close(fd); + fd = -1; + throw SpikeError(std::string("epoll_ctl ADD failed: ") + + std::strerror(err)); + } + + // Hand the eventfd to the runtime; it signals it whenever this XU completes + // any sequence on queue 0. + NRT_STATUS status = nrta_event_register_xu_completion( + static_cast(lnc), xu, /*queue=*/0, fd); + if (status != NRT_SUCCESS) { + epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); + ::close(fd); + fd = -1; + throw NrtError(status, "Failed to register XU completion event"); + } +} + +PendingOp &Spike::enqueue_pending(uint32_t lnc, nrta_xu_t xu, PendingOp op) { + // Channel registration happens before submission (in the calling + // tensor_*/execute_nonblock helpers), so we know an eventfd is in place for + // every completion. xu must be the same XU the channel was registered on so + // the queue index here matches the one try_poll derives from the epoll event. + if (lnc >= MAX_LNC || xu >= NRTA_XU_TYPE_NUM) { + throw SpikeError("Channel index out of range."); + } + std::deque &q = xu_queues_[lnc][xu]; + q.push_back(std::move(op)); + return q.back(); +} + NrtModel Spike::load_model(const std::string &neff_file, uint32_t core_id, bool cc_enabled, uint32_t rank_id, uint32_t world_size) { @@ -85,9 +188,11 @@ void Spike::tensor_write_from_pybuffer(NrtTensor &tensor, const void *data, tensor_write(tensor, data, data_size, offset); } -NrtTensorSet Spike::create_tensor_sets( +// Create tensor set with non-owning references (for synchronous operations). +// Caller must ensure tensors remain valid during use. Lighter weight alternative. +NrtTensorSet Spike::create_tensor_set( const std::unordered_map &tensor_map) { - NrtTensorSet tensor_set; + NrtTensorSet tensor_set(this); for (const auto &[name, tensor] : tensor_map) { // FIXME: consider time-of-check-time-of-use (TOCTOU) race condition @@ -102,18 +207,558 @@ NrtTensorSet Spike::create_tensor_sets( return tensor_set; } +// Create tensor set with shared ownership (for async/nonblocking operations). +// Takes shared_ptrs to prevent premature tensor destruction during deferred +// execution in worker threads. Essential for correct async lifetime management. +NrtTensorSet Spike::create_tensor_set( + const std::unordered_map> + &tensor_map) { + NrtTensorSet tensor_set(this); + + for (const auto &[name, tensor] : tensor_map) { + if (tensor->is_freed()) { + throw SpikeError("Tensor '" + name + + "' is freed. Unable to add it into the tensor set. " + "Please check the lifetime of the tensor."); + } + tensor_set.add_tensor(name, tensor); + } + + return tensor_set; +} + void Spike::execute(NrtModel &model, const std::unordered_map &inputs, const std::unordered_map &outputs, std::optional ntff_name, bool save_trace) { // Create tensor sets - NrtTensorSet input_set = create_tensor_sets(inputs); - NrtTensorSet output_set = create_tensor_sets(outputs); + NrtTensorSet input_set = create_tensor_set(inputs); + NrtTensorSet output_set = create_tensor_set(outputs); // Execute model.execute(input_set, output_set, ntff_name, save_trace); } +uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, + nb::bytes data_obj, + size_t offset) { + const void *data = data_obj.data(); + size_t size = data_obj.size(); + + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = tensor->get_core_id(); + + PendingTensorWrite p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(data_obj); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + // Enqueue first so nrta_* can write completion status into the deque-resident + // op (its address must outlive this call); roll back if scheduling fails. + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorWrite &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_write( + tensor->get_ptr(), data, offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor write"); + } + + return cmd_id; +} + +uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, + nb::ndarray<> data_obj, + size_t offset) { + const void *data = data_obj.data(); + size_t size = data_obj.nbytes(); + + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = tensor->get_core_id(); + + PendingTensorWrite p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(data_obj); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorWrite &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_write( + tensor->get_ptr(), data, offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor write"); + } + + return cmd_id; +} + +uint64_t Spike::tensor_write_nonblock(std::shared_ptr tensor, + const void *data, size_t size, + size_t offset) { + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = tensor->get_core_id(); + + PendingTensorWrite p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + // No data_obj: caller manages the raw buffer's lifetime. + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorWrite &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_write( + tensor->get_ptr(), data, offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor write"); + } + + return cmd_id; +} + +uint64_t Spike::tensor_read_nonblock(std::shared_ptr tensor, + size_t offset, size_t size) { + size = (size == 0) ? (tensor->get_size() - offset) : size; + + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = tensor->get_core_id(); + + nb::bytes dest(nullptr, size); + void *data = PyBytes_AsString(dest.ptr()); + + PendingTensorRead p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(dest); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorRead &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_read( + data, tensor->get_ptr(), offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor read"); + } + + return cmd_id; +} + +uint64_t Spike::tensor_read_nonblock(std::shared_ptr tensor, + nb::ndarray<> dest, size_t offset, + size_t size) { + size = (size == 0) ? (tensor->get_size() - offset) : size; + + if (dest.nbytes() < size) { + throw SpikeError("The read operation exceeds the destination bound."); + } + + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = tensor->get_core_id(); + + void *data = dest.data(); + + PendingTensorRead p; + p.ret = NRT_SUCCESS; + p.tensor = tensor; + p.data_obj = std::move(dest); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_TENSOR_OP, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingTensorRead &q = std::get(enq.op); + NRT_STATUS status = nrta_tensor_read( + data, tensor->get_ptr(), offset, size, static_cast(lnc), + /*queue=*/0, &q.ret, &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_TENSOR_OP].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking tensor read"); + } + + return cmd_id; +} + +uint64_t Spike::tensor_write_nonblock_batched_prepare( + std::vector> tensors, + std::vector> data_objs, + std::optional> offsets) { + if (tensors.size() == 0) { + throw SpikeError("The batched write operation needs at least one tensor."); + } + + if (tensors.size() != data_objs.size() || + (offsets.has_value() && data_objs.size() != offsets.value().size())) { + throw SpikeError("All parameters must be lists of same length."); + } + + uint64_t batch_id = next_batch_id_++; + + std::vector prepared; + prepared.reserve(tensors.size()); + + uint32_t core_id = tensors[0]->get_core_id(); + + for (size_t i = 0; i < tensors.size(); ++i) { + std::shared_ptr tensor = std::move(tensors[i]); + nb::ndarray<> data_obj = std::move(data_objs[i]); + size_t offset = offsets.has_value() ? offsets.value()[i] : 0; + + const void *data = data_obj.data(); + size_t size = data_obj.nbytes(); + uint32_t tensor_core_id = tensor->get_core_id(); + if (core_id != tensor_core_id) { + throw SpikeError("All tensors must be on the same NeuronCore."); + } + + PreparedTensorWrite entry; + entry.tensor = std::move(tensor); + entry.data = data; + entry.size = size; + entry.offset = offset; + entry.data_obj = std::move(data_obj); + prepared.push_back(std::move(entry)); + } + + tensor_write_batched_prepared_[batch_id] = std::move(prepared); + + return batch_id; +} + +uint64_t Spike::tensor_write_nonblock_batched_start(uint64_t batch_id) { + auto it = tensor_write_batched_prepared_.find(batch_id); + if (it == tensor_write_batched_prepared_.end()) { + throw SpikeError("The batch ID does not exist."); + } + const std::vector &prepared = it->second; + + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = prepared[0].tensor->get_core_id(); + + PendingTensorWriteBatched p; + p.batch_id = batch_id; + p.rets.assign(prepared.size(), NRT_SUCCESS); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + // p.rets is a heap vector; the runtime keeps &p.rets[i] and fills it on + // completion. Moving p into the deque below preserves that buffer, so the + // pointers stay valid. All sub-requests hit the same (lnc, xu=TENSOR_OP, + // queue=0) back-to-back, so seq numbers are consecutive and we only wait on + // the last. + nrta_seq_t wait_seq = 0; + for (size_t i = 0; i < prepared.size(); ++i) { + const PreparedTensorWrite &e = prepared[i]; + nrta_seq_t seq; + NRT_STATUS status = nrta_tensor_write( + e.tensor->get_ptr(), e.data, e.offset, e.size, static_cast(lnc), + /*queue=*/0, &p.rets[i], &seq); + if (status != NRT_SUCCESS) { + throw NrtError(status, + "Failed to schedule nonblocking batched tensor write"); + } + wait_seq = seq; + } + + enqueue_pending(lnc, NRTA_XU_TENSOR_OP, + PendingOp{cmd_id, wait_seq, std::move(p)}); + return cmd_id; +} + +uint64_t Spike::tensor_read_nonblock_batched_prepare( + std::vector> tensors, + std::vector> dests, + std::optional> offsets, + std::optional> sizes) { + if (tensors.size() == 0) { + throw SpikeError("The batched read operation needs at least one tensor."); + } + + if (tensors.size() != dests.size() || + (offsets.has_value() && dests.size() != offsets.value().size()) || + (sizes.has_value() && dests.size() != sizes.value().size())) { + throw SpikeError("All parameters must be lists of same length."); + } + + uint64_t batch_id = next_batch_id_++; + + std::vector prepared; + prepared.reserve(tensors.size()); + + uint32_t core_id = tensors[0]->get_core_id(); + + for (size_t i = 0; i < tensors.size(); ++i) { + std::shared_ptr tensor = std::move(tensors[i]); + nb::ndarray<> dest = std::move(dests[i]); + + size_t offset = offsets.has_value() ? offsets.value()[i] : 0; + size_t size = sizes.has_value() ? sizes.value()[i] + : (tensor->get_size() - offset); + + if (dest.nbytes() < size) { + throw SpikeError("The read operation exceeds the destination bound."); + } + + void *data = dest.data(); + uint32_t tensor_core_id = tensor->get_core_id(); + if (core_id != tensor_core_id) { + throw SpikeError("All tensors must be on the same NeuronCore."); + } + + PreparedTensorRead entry; + entry.tensor = std::move(tensor); + entry.offset = offset; + entry.size = size; + entry.data = data; + entry.data_obj = std::move(dest); + prepared.push_back(std::move(entry)); + } + + tensor_read_batched_prepared_[batch_id] = std::move(prepared); + + return batch_id; +} + +uint64_t Spike::tensor_read_nonblock_batched_start(uint64_t batch_id) { + auto it = tensor_read_batched_prepared_.find(batch_id); + if (it == tensor_read_batched_prepared_.end()) { + throw SpikeError("The batch ID does not exist."); + } + const std::vector &prepared = it->second; + + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = prepared[0].tensor->get_core_id(); + + PendingTensorReadBatched p; + p.batch_id = batch_id; + p.rets.assign(prepared.size(), NRT_SUCCESS); + + ensure_channel_registered(lnc, NRTA_XU_TENSOR_OP); + // p.rets is a heap vector; the runtime keeps &p.rets[i] and fills it on + // completion. Moving p into the deque below preserves that buffer, so the + // pointers stay valid. Seq numbers are consecutive; wait on the last. + nrta_seq_t wait_seq = 0; + for (size_t i = 0; i < prepared.size(); ++i) { + const PreparedTensorRead &e = prepared[i]; + nrta_seq_t seq; + NRT_STATUS status = nrta_tensor_read( + e.data, e.tensor->get_ptr(), e.offset, e.size, static_cast(lnc), + /*queue=*/0, &p.rets[i], &seq); + if (status != NRT_SUCCESS) { + throw NrtError(status, + "Failed to schedule nonblocking batched tensor read"); + } + wait_seq = seq; + } + + enqueue_pending(lnc, NRTA_XU_TENSOR_OP, + PendingOp{cmd_id, wait_seq, std::move(p)}); + return cmd_id; +} + +uint64_t Spike::execute_nonblock( + std::shared_ptr model, + std::shared_ptr input_set, + std::shared_ptr output_set, + std::optional ntff_name, bool save_trace) { + // nrta_execute_schedule has no profiling hook. The nrt_profile_start/stop + // pair in the sync path straddles a single synchronous nrt_execute, which + // can't be mapped safely onto an asynchronous schedule. + if (save_trace) { + throw SpikeError("save_trace is not supported with nonblocking execute."); + } + (void)ntff_name; + + uint64_t cmd_id = next_non_block_id_++; + uint32_t lnc = model->get_core_id(); + + PendingExecute p; + p.ret = NRT_SUCCESS; + p.model = model; + p.input_set = input_set; + p.output_set = output_set; + + ensure_channel_registered(lnc, NRTA_XU_COMPUTE); + PendingOp &enq = enqueue_pending( + lnc, NRTA_XU_COMPUTE, PendingOp{cmd_id, /*wait_seq=*/0, std::move(p)}); + PendingExecute &q = std::get(enq.op); + NRT_STATUS status = + nrta_execute_schedule(model->get_ptr(), input_set->get_ptr(), + output_set->get_ptr(), /*queue=*/0, &q.ret, + &enq.wait_seq); + if (status != NRT_SUCCESS) { + xu_queues_[lnc][NRTA_XU_COMPUTE].pop_back(); + throw NrtError(status, "Failed to schedule nonblocking execute"); + } + + return cmd_id; +} + +std::optional> +Spike::ret_to_err(NRT_STATUS ret) { + if (ret != NRT_SUCCESS) { + return NrtError(ret, "Nonblocking operation failed"); + } + return std::nullopt; +} + +std::optional> +Spike::rets_to_err(const std::vector &rets) { + for (NRT_STATUS r : rets) { + if (r != NRT_SUCCESS) { + return NrtError(r, "Nonblocking operation failed"); + } + } + return std::nullopt; +} + +std::optional Spike::try_poll() { + // The runtime signals each channel's eventfd whenever its XU completes a + // sequence. We ask epoll (non-blocking) which channels made progress and + // only scan those — rather than probing all MAX_LNC * NRTA_XU_TYPE_NUM + // channels every call. Registration is done before submission (see the + // tensor_*/execute_nonblock helpers), so every completion is guaranteed to + // signal epoll. + if (epoll_fd_ >= 0) { + struct epoll_event events[NUM_CHANNELS]; + int n; + do { + n = epoll_wait(epoll_fd_, events, NUM_CHANNELS, /*timeout=*/0); + } while (n < 0 && errno == EINTR); + if (n < 0) { + throw SpikeError(std::string("epoll_wait failed: ") + + std::strerror(errno)); + } + for (int i = 0; i < n; ++i) { + uint32_t ch = events[i].data.u32; + // Drain the eventfd so it only fires again on a future completion + // (level-triggered). A single read resets the counter regardless of how + // many completions it accumulated; nrta_get_sequence reports the latest. + uint64_t counter; + int fd = channel_event_fds_[ch]; + if (fd >= 0) { + ssize_t rc; + do { + rc = read(fd, &counter, sizeof(counter)); + } while (rc < 0 && errno == EINTR); + } + // Enqueue for scanning, guarding against duplicates in O(1). + if (!scan_channel_queued_[ch]) { + scan_channel_queued_[ch] = true; + scan_channels_.push_back(ch); + } + } + } + + // Each (lnc, xu) queue is FIFO in submission order, and within it nrta_seq_t + // values are monotonically increasing, so we only check the front op's + // wait_seq against nrta_get_sequence's latest-completed seq for that channel. + // + // scan_channels_ is a round-robin FIFO: we pop the front channel, and if it + // yields a completed op we re-queue it at the back before returning, so a + // continuously-busy channel can't starve the others. A channel that is empty + // or whose front is not yet complete is dropped (its eventfd will re-queue it + // on the next completion). + while (!scan_channels_.empty()) { + uint32_t ch = scan_channels_.front(); + scan_channels_.pop_front(); + scan_channel_queued_[ch] = false; + + uint32_t lnc = ch / NRTA_XU_TYPE_NUM; + uint32_t xu_idx = ch % NRTA_XU_TYPE_NUM; + std::deque &q = xu_queues_[lnc][xu_idx]; + + if (q.empty()) { + continue; + } + nrta_seq_t front_wait_seq = q.front().wait_seq; + + nrta_xu_t xu = static_cast(xu_idx); + nrta_seq_t latest = 0; + NRT_STATUS s = nrta_get_sequence(lnc, xu, /*queue=*/0, &latest); + if (s != NRT_SUCCESS || + NRTA_SEQ_GET_SEQ_NUM(front_wait_seq) > NRTA_SEQ_GET_SEQ_NUM(latest)) { + // Front not completed yet; stop scanning this channel until its eventfd + // signals the next completion. + continue; + } + + PendingOp op = std::move(q.front()); + q.pop_front(); + + // A single completion may have advanced the sequence past several queued + // ops, so re-queue this channel at the back: the next try_poll() re-checks + // its new front (and drops it if empty/not-ready) before trusting epoll. + if (!scan_channel_queued_[ch]) { + scan_channel_queued_[ch] = true; + scan_channels_.push_back(ch); + } + + uint64_t id = op.id; + return std::visit( + [id](auto &p) -> NonBlockResult { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return NonBlockTensorWriteResult{id, ret_to_err(p.ret)}; + } else if constexpr (std::is_same_v) { + return NonBlockTensorReadResult{id, std::move(p.data_obj), + ret_to_err(p.ret)}; + } else if constexpr (std::is_same_v) { + return NonBlockTensorWriteResult{id, rets_to_err(p.rets)}; + } else if constexpr (std::is_same_v) { + // Lifetime anchors stay in tensor_read_batched_prepared_[batch_id] + // so the same prepared batch can be _start'd again. Callers read + // the data via the dests array they passed to _prepare, so the + // result's data field is left default-constructed. + return NonBlockTensorReadResult{ + id, std::variant>{}, + rets_to_err(p.rets)}; + } else { + static_assert(std::is_same_v); + return NonBlockExecResult{id, ret_to_err(p.ret)}; + } + }, + op.op); + } + + return std::nullopt; +} + +NrtModel Spike::wrap_model(nrt_model_t *ptr) { + uint32_t core_id = ptr->start_vnc; + uint32_t rank_id = ptr->gid; + uint32_t world_size = ptr->vnc_count; + // FIXME: Value of cc_enabled is fake, but no one uses it, so it is fine for + // the purpose. + return NrtModel(ptr, core_id, true, rank_id, world_size); +} + +NrtTensor Spike::wrap_tensor(nrt_tensor_t *ptr) { + if (ptr->sto->vtpb_idx == -1) { + // CPU tensor wrapping not supported + throw SpikeError("Wrapping a CPU tensor is not supported"); + } + + size_t size = ptr->_size; + const char *name = ptr->name; + // Use vtpb_idx as the core ID (this is the logical NC ID) + uint32_t core_id = ptr->sto->vtpb_idx; + return NrtTensor(ptr, core_id, size, name, this); +} + +NrtTensorSet Spike::wrap_tensor_set(nrt_tensor_set_t *ptr) { + return NrtTensorSet(ptr); +} + std::string Spike::dtype_to_string(nrt_dtype_t dtype) { switch (dtype) { case NRT_DTYPE_FLOAT32: diff --git a/spike/src/spike/__init__.py b/spike/src/spike/__init__.py index a8c6224..9570748 100644 --- a/spike/src/spike/__init__.py +++ b/spike/src/spike/__init__.py @@ -1,21 +1,33 @@ from ._spike import ( ModelTensorInfo, + NonBlockExecResult, + NonBlockTensorReadResult, + NonBlockTensorWriteResult, NrtError, NrtModel, NrtTensor, + NrtTensorSet, + Spike, SpikeError, SystemTraceSession, TensorMetadata, ) from .profiler_adapter import SpikeProfiler +from .spike_async import SpikeAsync, SpikeStream from .spike_model import BenchmarkResult, SpikeModel from .spike_singleton import configure, get_spike_singleton, reset from .spike_tensor import SpikeTensor +# Type alias for convenience +NonBlockResult = NonBlockTensorReadResult | NonBlockTensorWriteResult | NonBlockExecResult + __all__ = [ "SpikeModel", "SpikeTensor", "SpikeProfiler", + "SpikeAsync", + "SpikeStream", + "Spike", "configure", "reset", "get_spike_singleton", @@ -26,5 +38,10 @@ "ModelTensorInfo", "NrtModel", "NrtTensor", + "NrtTensorSet", "TensorMetadata", + "NonBlockTensorReadResult", + "NonBlockTensorWriteResult", + "NonBlockExecResult", + "NonBlockResult", ] diff --git a/spike/src/spike/_spike.pyi b/spike/src/spike/_spike.pyi index e2fc986..01c8b2c 100644 --- a/spike/src/spike/_spike.pyi +++ b/spike/src/spike/_spike.pyi @@ -1,6 +1,9 @@ """NKIPy Spike Runtime C++ bindings""" -from collections.abc import Mapping +from collections.abc import Mapping, Sequence +from typing import overload + +from numpy.typing import NDArray class SpikeRuntimeError(RuntimeError): @@ -99,6 +102,33 @@ class NrtModel: def __repr__(self) -> str: ... +class NrtTensorSet: + pass + +class NonBlockTensorReadResult: + @property + def id(self) -> int: ... + + @property + def data(self) -> bytes | NDArray: ... + + @property + def err(self) -> "spike::SpikeError" | "spike::NrtError" | None: ... + +class NonBlockTensorWriteResult: + @property + def id(self) -> int: ... + + @property + def err(self) -> "spike::SpikeError" | "spike::NrtError" | None: ... + +class NonBlockExecResult: + @property + def id(self) -> int: ... + + @property + def err(self) -> "spike::SpikeError" | "spike::NrtError" | None: ... + class Spike: def __init__(self, verbose_level: int = 0) -> None: """Initialize Spike with verbose level""" @@ -144,5 +174,55 @@ class Spike: Read data from tensor to Python buffer protocol object (bytearray, memoryview, etc.) """ + @overload + def tensor_read_nonblock(self, tensor: NrtTensor, offset: int = 0, size: int = 0) -> int: + """Read data from tensor as bytes nonblockingly""" + + @overload + def tensor_read_nonblock(self, tensor: NrtTensor, dest: NDArray, offset: int = 0, size: int = 0) -> int: + """Read data from tensor into the provided destination nonblockingly""" + + @overload + def tensor_write_nonblock(self, tensor: NrtTensor, data: bytes, offset: int = 0) -> int: + """Write bytes data to tensor nonblockingly""" + + @overload + def tensor_write_nonblock(self, tensor: NrtTensor, data: NDArray, offset: int = 0) -> int: + """Write ndarray data to tensor nonblockingly""" + + @overload + def tensor_write_nonblock(self, tensor: NrtTensor, data: int, size: int, offset: int = 0) -> int: + """Write raw pointer data to tensor nonblockingly""" + + def tensor_write_nonblock_batched_prepare(self, tensors: Sequence[NrtTensor], data_objs: Sequence[NDArray], offsets: Sequence[int] | None = None) -> int: + """Prepare a batched tensor write""" + + def tensor_write_nonblock_batched_start(self, batch_id: int) -> int: + """Start a prepared batched tensor write""" + + def tensor_read_nonblock_batched_prepare(self, tensors: Sequence[NrtTensor], dests: Sequence[NDArray], offsets: Sequence[int] | None = None, sizes: Sequence[int] | None = None) -> int: + """Prepare a batched tensor read""" + + def tensor_read_nonblock_batched_start(self, batch_id: int) -> int: + """Start a prepared batched tensor read""" + + def execute_nonblock(self, model: NrtModel, inputs: NrtTensorSet, outputs: NrtTensorSet, ntff_name: str | None = None, save_trace: bool = False) -> int: + """Execute a model with given inputs and outputs nonblockingly""" + + def try_poll(self) -> NonBlockTensorReadResult | NonBlockTensorWriteResult | NonBlockExecResult | None: + """Try to poll for nonblocking results""" + + def create_tensor_set(self, tensors: Mapping[str, NrtTensor]) -> NrtTensorSet: + """Create a tensor set with the tensors""" + + def wrap_model(self, ptr: int) -> NrtModel: + """Wrap an existing NRT model pointer""" + + def wrap_tensor(self, ptr: int) -> NrtTensor: + """Wrap an existing NRT tensor pointer""" + + def wrap_tensor_set(self, ptr: int) -> NrtTensorSet: + """Wrap an existing NRT tensor set pointer""" + def get_tensor_info(self, model: NrtModel) -> ModelTensorInfo: """Get tensor information for a model""" diff --git a/spike/src/spike/spike_async.py b/spike/src/spike/spike_async.py new file mode 100644 index 0000000..cab3096 --- /dev/null +++ b/spike/src/spike/spike_async.py @@ -0,0 +1,415 @@ +"""Async/await interface for Spike runtime operations.""" + +from __future__ import annotations + +from typing import Any, Callable, Coroutine, Optional, Sequence +from ._spike import ( + NonBlockExecResult, + NonBlockTensorReadResult, + NonBlockTensorWriteResult, + NrtModel, + NrtTensor, + NrtTensorSet, + Spike, +) + +import asyncio +import numpy as np + + +class SpikeAsyncSelector: + """Event selector for async operations.""" + + def __init__(self, spike: Spike) -> None: + self._spike: Spike = spike + + def select(self, timeout: float | None) -> list[NonBlockResult]: + """Poll for completed operations.""" + res = self._spike.try_poll() + if res is None: + return [] + else: + return [res] + + +class SpikeAsyncFuture(asyncio.Future[Any]): + """Future that can be waited on synchronously.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(SpikeAsyncFuture, self).__init__(*args, **kwargs) + + def wait(self) -> Any: + """Wait for the future to complete (blocking).""" + return self._loop.run_until_complete(self) + + +class SpikeAsyncTask(asyncio.Task[Any]): + """Task that can be waited on synchronously.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super(SpikeAsyncTask, self).__init__(*args, **kwargs) + + def wait(self) -> Any: + """Wait for the task to complete (blocking).""" + return self._loop.run_until_complete(self) + + +class SpikeAsyncEventLoop(asyncio.BaseEventLoop): + """Custom event loop for Spike async operations.""" + + def __init__(self, selector: SpikeAsyncSelector) -> None: + super().__init__() + self._selector: SpikeAsyncSelector = selector + + def task_factory( + loop: asyncio.AbstractEventLoop, + coro: Coroutine[Any, Any, Any], + **kwargs: Any + ) -> SpikeAsyncTask: + return SpikeAsyncTask(coro, loop=loop, **kwargs) + + self.set_task_factory(task_factory) + + self._tensor_ops: dict[int, SpikeAsyncFuture] = {} + self._exec_ops: dict[int, SpikeAsyncFuture] = {} + + def create_future(self) -> SpikeAsyncFuture: + """Create a future for this event loop.""" + return SpikeAsyncFuture(loop=self) + + def register_tensor_op(self, req_id: int, fut: SpikeAsyncFuture) -> None: + """Register a tensor operation future.""" + assert req_id not in self._tensor_ops + self._tensor_ops[req_id] = fut + + def register_exec_op(self, req_id: int, fut: SpikeAsyncFuture) -> None: + """Register an execution operation future.""" + assert req_id not in self._exec_ops + self._exec_ops[req_id] = fut + + def _process_events(self, event_list: list[NonBlockResult]) -> None: + """Process completed events.""" + for event in event_list: + match event: + case NonBlockTensorWriteResult(id=id, err=err): + fut = self._tensor_ops[id] + if err is None: + fut.set_result(None) + else: + fut.set_exception(err) + + del self._tensor_ops[id] + + case NonBlockTensorReadResult(id=id, data=data, err=err): + fut = self._tensor_ops[id] + if err is None: + fut.set_result(data) + else: + fut.set_exception(err) + + del self._tensor_ops[id] + + case NonBlockExecResult(id=id, err=err): + fut = self._exec_ops[id] + if err is None: + fut.set_result(None) + else: + fut.set_exception(err) + + del self._exec_ops[id] + + +class SpikeAsync: + """Async interface for Spike runtime.""" + + def __init__(self, verbose_level: int = 0) -> None: + self.spike: Spike = Spike(verbose_level=verbose_level) + + self._selector: SpikeAsyncSelector = SpikeAsyncSelector(self.spike) + self._loop: SpikeAsyncEventLoop = SpikeAsyncEventLoop(self._selector) + + self._default_stream: SpikeStream | None = None + + def _wrapper( + self, + closure: Callable[[], SpikeAsyncFuture], + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None, + stream: SpikeStream | None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Wrap an operation with dependency and stream management.""" + # Try to use default stream if no stream is explicitly specified + if stream is None: + stream = self._default_stream + + # If no explicit dependency, see if there is any stream induced dependency + if deps is None or len(deps) == 0: + if stream is None: + # No stream, fast path + return closure() + elif stream._last_fut is None: + # There is stream but no stream induced dependency + # Fast path, but also put the current op to the stream + fut = closure() + stream._last_fut = fut + return fut + + # If there is a stream induced dependency, add it to explicit dependency + if stream is not None and stream._last_fut is not None: + if deps is None: + deps = [] + else: + deps = list(deps) # Make mutable copy + + deps.append(stream._last_fut) + + # Heavy path for having any dependency + async def internal_async(): + for dep in deps: + await dep + return await closure() + + fut = self._loop.create_task(internal_async()) + + if stream is not None: + stream._last_fut = fut + + return fut + + def create_stream(self) -> SpikeStream: + """Create a new stream for operation sequencing.""" + return SpikeStream(self) + + def create_tensor_set( + self, + tensor_map: dict[str, NrtTensor] + ) -> NrtTensorSet: + """Create a tensor set from a dictionary of tensors. + + Convenience method for spike.create_tensor_set(). + """ + return self.spike.create_tensor_set(tensor_map) + + def load_model( + self, + neff_file: str, + core_id: int = 0, + cc_enabled: bool = False, + rank_id: int = 0, + world_size: int = 1 + ) -> NrtModel: + """Load a model from a NEFF file. + + Convenience method for spike.load_model(). + """ + return self.spike.load_model(neff_file, core_id, cc_enabled, rank_id, world_size) + + def allocate_tensor( + self, + size: int, + core_id: int = 0, + name: str | None = None + ) -> NrtTensor: + """Allocate a tensor on a NeuronCore. + + Convenience method for spike.allocate_tensor(). + """ + return self.spike.allocate_tensor(size, core_id, name) + + def tensor_write( + self, + tensor: NrtTensor, + data: Any, + offset: int = 0, + size: int = 0, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Write to a tensor asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + if size > 0: + req_id = self.spike.tensor_write_nonblock(tensor, data, size, offset) + else: + req_id = self.spike.tensor_write_nonblock(tensor, data, offset) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def tensor_write_batched_start( + self, + batch_id: int, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Start a batched tensor write asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + req_id = self.spike.tensor_write_nonblock_batched_start(batch_id) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def tensor_read_batched_start( + self, + batch_id: int, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Start a batched tensor read asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + req_id = self.spike.tensor_read_nonblock_batched_start(batch_id) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def tensor_read( + self, + tensor: NrtTensor, + dest: Any = None, + offset: int = 0, + size: int = 0, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Read from a tensor asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + if dest is None: + req_id = self.spike.tensor_read_nonblock(tensor, offset, size) + else: + req_id = self.spike.tensor_read_nonblock(tensor, dest, offset, size) + self._loop.register_tensor_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def execute( + self, + model: NrtModel, + inputs: NrtTensorSet, + outputs: NrtTensorSet, + ntff_name: str | None = None, + save_trace: bool = False, + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncFuture | SpikeAsyncTask: + """Execute a model asynchronously.""" + + def internal() -> SpikeAsyncFuture: + fut = self._loop.create_future() + req_id = self.spike.execute_nonblock( + model, inputs, outputs, ntff_name, save_trace + ) + self._loop.register_exec_op(req_id, fut) + return fut + + return self._wrapper(internal, deps, stream) + + def custom_op( + self, + coro: Coroutine[Any, Any, Any], + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncTask: + """Submit a custom coroutine as an operation.""" + # Try to use default stream if no stream is explicitly specified + if stream is None: + stream = self._default_stream + + # If no explicit dependency, see if there is any stream induced dependency + if deps is None or len(deps) == 0: + if stream is None: + # No stream, fast path + return self._loop.create_task(coro) + elif stream._last_fut is None: + # There is stream but no stream induced dependency + # Fast path, but also put the current op to the stream + fut = self._loop.create_task(coro) + stream._last_fut = fut + return fut + + # If there is a stream induced dependency, add it to explicit dependency + if stream is not None and stream._last_fut is not None: + if deps is None: + deps = [] + else: + deps = list(deps) # Make mutable copy + + deps.append(stream._last_fut) + + # Heavy path for having any dependency + async def internal_async() -> Any: + for dep in deps: + await dep + return await coro + + fut = self._loop.create_task(internal_async()) + + if stream is not None: + stream._last_fut = fut + + return fut + + def submit( + self, + coro: Coroutine[Any, Any, Any], + deps: Sequence[SpikeAsyncFuture | SpikeAsyncTask | Coroutine[Any, Any, Any]] | None = None, + stream: SpikeStream | None = None + ) -> SpikeAsyncTask: + """Submit a coroutine to the event loop.""" + return self.custom_op(coro, deps, stream) + + @staticmethod + async def all(futs: Sequence[SpikeAsyncFuture | SpikeAsyncTask]) -> list[Any]: + """Wait for all futures to complete.""" + res: list[Any] = [] + for fut in futs: + res.append(await fut) + return res + + +class SpikeStream: + """Stream for sequencing operations.""" + + def __init__(self, spike_async: SpikeAsync) -> None: + self._last_fut: SpikeAsyncFuture | SpikeAsyncTask | None = None + self._spike_async: SpikeAsync = spike_async + self._prev_stream: SpikeStream | None = None + + def __enter__(self) -> SpikeStream: + """Enter context manager, making this the default stream.""" + self._prev_stream = self._spike_async._default_stream + self._spike_async._default_stream = self + return self + + def __exit__(self, _exc_type: Any, _exc_value: Any, _traceback: Any) -> None: + """Exit context manager, restoring previous default stream.""" + self._spike_async._default_stream = self._prev_stream + self._prev_stream = None + + def wait(self) -> Any: + """Wait for all operations in this stream to complete.""" + if self._last_fut is not None: + return self._last_fut.wait() + + def record_event(self) -> SpikeAsyncFuture | SpikeAsyncTask | None: + """Record an event at the current point in the stream.""" + return self._last_fut + + def wait_event(self, event: SpikeAsyncFuture | SpikeAsyncTask) -> None: + """Wait for an event from another stream.""" + + async def internal(fut: SpikeAsyncFuture | SpikeAsyncTask | None) -> None: + if fut is not None: + await fut + await event + + self._last_fut = self._spike_async.custom_op(internal(self._last_fut)) diff --git a/spike/src/tensor.cpp b/spike/src/tensor.cpp index 1722fab..b5df6de 100644 --- a/spike/src/tensor.cpp +++ b/spike/src/tensor.cpp @@ -19,6 +19,20 @@ NrtTensor::NrtTensor(nrt_tensor_placement_t placement, uint32_t core_id, } } +NrtTensor::NrtTensor(nrt_tensor_t *ptr, uint32_t core_id, uint64_t size, + const std::string &name, const Spike *spike) + : ptr_(nullptr), core_id_(core_id), size_(size), name_(name), + spike_(spike) { + // Wrap an existing tensor by creating a slice + NRT_STATUS status = + nrt_tensor_allocate_slice(ptr, 0, size, name.c_str(), &ptr_); + if (status != 0) { + ptr_ = nullptr; + throw NrtError(status, + "Failed to wrap a raw tensor by allocating a tensor slice"); + } +} + NrtTensor::NrtTensor(const NrtTensor &source, size_t offset, size_t size, const std::string &name) : ptr_(nullptr), core_id_(source.core_id_), size_(size), name_(name), diff --git a/spike/src/tensor_set.cpp b/spike/src/tensor_set.cpp index 41ff30d..28d60a1 100644 --- a/spike/src/tensor_set.cpp +++ b/spike/src/tensor_set.cpp @@ -1,37 +1,50 @@ #include "tensor_set.h" +#include "spike.h" namespace spike { -NrtTensorSet::NrtTensorSet() : ptr_(nullptr) { +NrtTensorSet::NrtTensorSet(const Spike *spike) + : ptr_(nullptr), spike_(spike) { NRT_STATUS status = nrt_allocate_tensor_set(reinterpret_cast(&ptr_)); if (status != 0) { throw NrtError(status, "Failed to allocate tensor set"); } } -NrtTensorSet::~NrtTensorSet() { - if (ptr_) { - nrt_destroy_tensor_set(reinterpret_cast(&ptr_)); - } -} +NrtTensorSet::NrtTensorSet(nrt_tensor_set_t *ptr) + : ptr_(ptr), spike_(nullptr) {} + +NrtTensorSet::~NrtTensorSet() { free(); } -NrtTensorSet::NrtTensorSet(NrtTensorSet &&other) noexcept : ptr_(other.ptr_) { +NrtTensorSet::NrtTensorSet(NrtTensorSet &&other) noexcept + : ptr_(other.ptr_), spike_(other.spike_), + tensors_(std::move(other.tensors_)) { other.ptr_ = nullptr; } NrtTensorSet &NrtTensorSet::operator=(NrtTensorSet &&other) noexcept { if (this != &other) { - if (ptr_) { - nrt_destroy_tensor_set(reinterpret_cast(&ptr_)); - } + free(); ptr_ = other.ptr_; + spike_ = other.spike_; + tensors_ = std::move(other.tensors_); other.ptr_ = nullptr; } return *this; } +bool NrtTensorSet::is_freed() const { + return ptr_ == nullptr || (is_owner() && spike_->is_closed()); +} + +void NrtTensorSet::add_tensor(const std::string &name, + std::shared_ptr tensor) { + add_tensor(name, *tensor); + tensors_.push_back(std::move(tensor)); +} + void NrtTensorSet::add_tensor(const std::string &name, - const NrtTensor &tensor) { + const NrtTensor &tensor) { NRT_STATUS status = nrt_add_tensor_to_tensor_set(ptr_, name.c_str(), tensor.get_ptr()); if (status != 0) { @@ -39,4 +52,14 @@ void NrtTensorSet::add_tensor(const std::string &name, } } +void NrtTensorSet::free() { + if (is_freed() || !is_owner()) { + // Do not throw exception as this is perfectly fine + return; + } + + nrt_destroy_tensor_set(reinterpret_cast(&ptr_)); + ptr_ = nullptr; +} + } // namespace spike diff --git a/spike/tests/conftest.py b/spike/tests/conftest.py new file mode 100644 index 0000000..bf0e86a --- /dev/null +++ b/spike/tests/conftest.py @@ -0,0 +1,28 @@ +def pytest_addoption(parser): + parser.addoption( + "--test-mode", + default="correctness", + choices=["correctness", "overlapping"], + help=( + "Test mode: 'correctness' uses small random tensors to verify numerical accuracy; " + "'overlapping' uses large zero tensors to demonstrate async operation overlap." + ), + ) + parser.addoption( + "--num-pipelines", + type=int, + default=3, + help="Number of concurrent pipelines to allocate and run in test_spike_async.py tests.", + ) + parser.addoption( + "--warmup-iters", + type=int, + default=3, + help="Number of untimed warmup iterations before measurement in the perf test.", + ) + parser.addoption( + "--measure-iters", + type=int, + default=10, + help="Number of timed measurement iterations in the perf test.", + ) diff --git a/spike/tests/test_spike_async.py b/spike/tests/test_spike_async.py new file mode 100644 index 0000000..a209f8c --- /dev/null +++ b/spike/tests/test_spike_async.py @@ -0,0 +1,412 @@ +""" +Tests for Spike async/nonblock functionality. + +NOTE: These tests require actual NeuronCore hardware and a compiled NEFF model. +They serve as integration tests and usage examples. +""" + +import os +import time +import pytest +import numpy as np + +try: + from spike._spike import Spike + + from spike import reset as spike_reset + + available_core_count = Spike.get_visible_neuron_core_count() + if available_core_count < 1: + pytest.skip( + "Skipping all tests: No compatible Neuron hardware detected", + allow_module_level=True, + ) + +except ImportError as e: + pytest.skip(f"Required packages not available: {e}", allow_module_level=True) + +# Matmul shapes per --test-mode, expressed as (x_shape, y_shape) for out = x @ y. +# +# The "overlapping" shape is tuned so that, with the weights `y` pre-written once +# (mimicking ML model parameters), a single iteration's `write x` + `read out` +# data-movement time balances the matmul execution time (~4 ms each on the +# reference hardware). The balance follows from the measured rates +# (write 0.3125 ms/MiB, read 0.09375 ms/MiB, exec 1.46e-11 ms/MAC): writing x +# (M*K) plus reading out (M*N) equals exec (M*K*N) when 1.79e-7/K + 5.96e-7/N ~= +# 1.46e-11. All dims are powers of 2: K=32768, N=65536 satisfies it (the M factor +# cancels, so M=128 just sets the absolute scale -- exec mid-range at ~4 ms). +# Then write x = 8 MiB (2.5 ms), read out = 16 MiB (1.5 ms), exec ~= 4 ms. +# Note: the pre-written weights y are 32768*65536 fp16 = 4 GiB per core. +def _matmul_shapes(test_mode): + if test_mode == "overlapping": + return (128, 32768), (32768, 65536) + else: # correctness -- small enough for a CPU reference check + return (128, 256), (256, 128) + + +@pytest.fixture(scope="module") +def neff_path(request): + """Compile a matmul kernel for testing. + + Shape is chosen based on --test-mode (see _matmul_shapes): + correctness -- small (128x256) x (256x128), fast enough for CPU reference check + overlapping -- large, balanced so write+read ~= exec with weights pre-written + """ + from nkipy.core.compile import compile_to_neff, trace + + def my_matmul(x, y): + return np.matmul(x, y) + + test_mode = request.config.getoption("--test-mode") + x_shape, y_shape = _matmul_shapes(test_mode) + x = np.zeros(x_shape, dtype=np.float16) + y = np.zeros(y_shape, dtype=np.float16) + + traced_kernel = trace(my_matmul) + traced_kernel.specialize(x, y) + + output_dir = os.path.join(os.path.dirname(__file__), "artifacts") + neff_path = compile_to_neff(trace_kernel=traced_kernel, output_dir=output_dir) + + return neff_path + + +@pytest.fixture(scope="module") +def spike_async(): + """Initialize SpikeAsync for testing.""" + from spike import SpikeAsync + + spike_async = SpikeAsync() + yield spike_async + spike_async.spike.close() + + +@pytest.fixture(scope="module") +def model_and_tensors(request, neff_path, spike_async): + """Load model and prepare input/output tensors for matmul. + + Data and shape are chosen based on --test-mode: + correctness -- small (128x256) x (256x128) with random values for numerical verification + overlapping -- large (4096x8192) x (8192x4096) with zeros to demonstrate async overlap + """ + # Load model + model = spike_async.load_model(neff_path, core_id=0) + + # Create input arrays for matmul + test_mode = request.config.getoption("--test-mode") + if test_mode == "overlapping": + x = np.zeros((4096, 8192), dtype=np.float16) + y = np.zeros((8192, 4096), dtype=np.float16) + else: # correctness + x = np.random.rand(128, 256).astype(np.float16) + y = np.random.rand(256, 128).astype(np.float16) + + num_pipelines = request.config.getoption("--num-pipelines") + + # Allocate input tensors for concurrent pipelines + inputs_x = [] + inputs_y = [] + input_sets = [] + for i in range(num_pipelines): + input_x = spike_async.allocate_tensor(size=x.nbytes, core_id=0, name=f"input_x_{i}") + input_y = spike_async.allocate_tensor(size=y.nbytes, core_id=0, name=f"input_y_{i}") + inputs_x.append(input_x) + inputs_y.append(input_y) + input_sets.append(spike_async.create_tensor_set({"x": input_x, "y": input_y})) + + # Allocate output tensors for concurrent pipelines + output_shape = (x.shape[0], y.shape[1]) + output_size = np.prod(output_shape) * x.dtype.itemsize + outputs = [] + output_sets = [] + output_arrays = [] # Pre-allocated CPU buffers for reading outputs + for i in range(num_pipelines): + out = spike_async.allocate_tensor(size=output_size, core_id=0, name=f"output_{i}") + outputs.append(out) + output_sets.append(spike_async.create_tensor_set({"output0": out})) + # Pre-allocate CPU array for zero-copy tensor_read + output_arrays.append(np.empty(output_shape, dtype=x.dtype)) + + print('Done initializing inputs, outputs, and model.') + + return model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays + + +@pytest.fixture(scope="module") +def model_and_tensors_all_cores(request, neff_path, spike_async): + """Like model_and_tensors, but allocates one pipeline per visible NeuronCore. + + Loads the model and allocates input/output tensors on every core so that one + inference pipeline can run concurrently on each core. Mirrors the structure of + model_and_tensors, with all per-pipeline lists indexed by core_id. + + Each core hosts --num-pipelines independent pipelines so that multiple + inferences can overlap within a core (e.g. one pipeline's tensor_read can run + while another's execute is in flight). Per-pipeline lists are nested and + indexed [core_id][pipeline_idx]; per-core lists (models, inputs_y) are indexed + [core_id]. The weights `y` are pre-written once per core (mimicking ML model + parameters loaded ahead of inference) and shared by all pipelines on that core, + so each pipeline only writes its own `x` and reads its own output. Shapes come + from _matmul_shapes: + correctness -- small (128x256) x (256x128) with random values for numerical verification + overlapping -- large, balanced so write x + read out ~= exec (y excluded, pre-written) + """ + num_cores = Spike.get_visible_neuron_core_count() + num_pipelines = request.config.getoption("--num-pipelines") + + # Create input arrays for matmul + test_mode = request.config.getoption("--test-mode") + x_shape, y_shape = _matmul_shapes(test_mode) + if test_mode == "overlapping": + x = np.zeros(x_shape, dtype=np.float16) + y = np.zeros(y_shape, dtype=np.float16) + else: # correctness + x = np.random.rand(*x_shape).astype(np.float16) + y = np.random.rand(*y_shape).astype(np.float16) + + output_shape = (x.shape[0], y.shape[1]) + output_size = np.prod(output_shape) * x.dtype.itemsize + + # Per-core lists (one entry per core) + models = [] + inputs_y = [] + # Per-pipeline lists (nested: [core_id][pipeline_idx]) + inputs_x = [] + input_sets = [] + outputs = [] + output_sets = [] + output_arrays = [] # Pre-allocated CPU buffers for reading outputs + preload_futs = [] + for core_id in range(num_cores): + models.append(spike_async.load_model(neff_path, core_id=core_id)) + + # One shared weights tensor per core, pre-written once and reused by every + # pipeline on the core. + input_y = spike_async.allocate_tensor(size=y.nbytes, core_id=core_id, name=f"input_y_{core_id}") + inputs_y.append(input_y) + preload_futs.append(spike_async.tensor_write(input_y, y)) + + # Independent x/output tensors per pipeline so pipelines don't alias buffers. + core_inputs_x = [] + core_input_sets = [] + core_outputs = [] + core_output_sets = [] + core_output_arrays = [] + for p in range(num_pipelines): + input_x = spike_async.allocate_tensor(size=x.nbytes, core_id=core_id, name=f"input_x_{core_id}_{p}") + core_inputs_x.append(input_x) + core_input_sets.append(spike_async.create_tensor_set({"x": input_x, "y": input_y})) + + out = spike_async.allocate_tensor(size=output_size, core_id=core_id, name=f"output_{core_id}_{p}") + core_outputs.append(out) + core_output_sets.append(spike_async.create_tensor_set({"output0": out})) + # Pre-allocate CPU array for zero-copy tensor_read + core_output_arrays.append(np.empty(output_shape, dtype=x.dtype)) + + inputs_x.append(core_inputs_x) + input_sets.append(core_input_sets) + outputs.append(core_outputs) + output_sets.append(core_output_sets) + output_arrays.append(core_output_arrays) + + # Wait for all weight pre-writes to land before any test runs. + for fut in preload_futs: + fut.wait() + + print(f'Done initializing {num_pipelines} pipeline(s) each on {num_cores} cores.') + + return models, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays + + +@pytest.fixture(scope="module") +def expected_output(model_and_tensors): + """Compute expected matmul output once for all tests.""" + _, x, y, *_ = model_and_tensors + return np.matmul(x, y) + + +def test_stream_api(spike_async, model_and_tensors, expected_output): + """Test stream-based API for operation sequencing.""" + model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors + + # Use stream APIs - each iteration uses different input and output tensors + streams = [] + + for i in range(len(inputs_x)): + with spike_async.create_stream() as stream: + spike_async.tensor_write(inputs_x[i], x) + spike_async.tensor_write(inputs_y[i], y) + spike_async.execute(model, input_sets[i], output_sets[i]) + spike_async.tensor_read(outputs[i], output_arrays[i]) + streams.append(stream) + + # Verify outputs - data is already in pre-allocated arrays + for i, stream in enumerate(streams): + stream.wait() + assert np.allclose(output_arrays[i], expected_output, rtol=1e-3), f"Output mismatch for pipeline {i}" + + +def test_explicit_dependency_api(spike_async, model_and_tensors, expected_output): + """Test explicit dependency management API.""" + model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors + + # Use explicit dependency APIs - each iteration uses different input and output tensors + read_futs = [] + + for i in range(len(inputs_x)): + write_x_fut = spike_async.tensor_write(inputs_x[i], x) + write_y_fut = spike_async.tensor_write(inputs_y[i], y) + exec_fut = spike_async.execute(model, input_sets[i], output_sets[i], deps=[write_x_fut, write_y_fut]) + read_fut = spike_async.tensor_read(outputs[i], output_arrays[i], deps=[exec_fut]) + read_futs.append(read_fut) + + # Verify outputs - data is already in pre-allocated arrays + for i, read_fut in enumerate(read_futs): + read_fut.wait() + assert np.allclose(output_arrays[i], expected_output, rtol=1e-3), f"Output mismatch for pipeline {i}" + + +def test_coroutine_api(spike_async, model_and_tensors, expected_output): + """Test coroutine/async-await API.""" + model, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors + + async def inference_pipeline(pipeline_idx): + await spike_async.tensor_write(inputs_x[pipeline_idx], x) + await spike_async.tensor_write(inputs_y[pipeline_idx], y) + await spike_async.execute(model, input_sets[pipeline_idx], output_sets[pipeline_idx]) + await spike_async.tensor_read(outputs[pipeline_idx], output_arrays[pipeline_idx]) + + # Use coroutine APIs - each pipeline uses different input and output tensors + futs = [] + for i in range(len(inputs_x)): + fut = spike_async.submit(inference_pipeline(i)) + futs.append(fut) + + # Verify outputs - data is already in pre-allocated arrays + for i, fut in enumerate(futs): + fut.wait() + assert np.allclose(output_arrays[i], expected_output, rtol=1e-3), f"Output mismatch for pipeline {i}" + + +def test_all_cores_concurrent_perf(request, spike_async, model_and_tensors_all_cores): + """Performance test: run tensor_write -> execute -> tensor_read concurrently on all cores. + + Submits --num-pipelines coroutine pipelines per core (using tensors from the + model_and_tensors_all_cores fixture). Running multiple pipelines concurrently on + a single core lets independent operations overlap -- e.g. one pipeline's + tensor_read or tensor_write (DMA-bound) can run while another pipeline's execute + (compute-bound) is in flight -- which is the benefit of the async system. + + Two configurations are compared: + single-core -- all pipelines on core 0 only + all-cores -- the same per-core pipelines on every visible core, concurrently + + Because the cores execute independently in parallel, a zero-overhead coroutine + system would finish the all-cores run in the same time as the single-core run; + the difference quantifies the event-loop/coroutine scheduling overhead. + + Each configuration is run for --warmup-iters untimed warmup iterations followed + by --measure-iters timed iterations; the reported time is the mean over the + measured iterations. + """ + models, x, y, inputs_x, inputs_y, input_sets, outputs, output_sets, output_arrays = model_and_tensors_all_cores + num_cores = len(models) + if num_cores < 1: + pytest.skip("No visible NeuronCores available") + + num_pipelines = request.config.getoption("--num-pipelines") + warmup_iters = request.config.getoption("--warmup-iters") + measure_iters = request.config.getoption("--measure-iters") + if measure_iters < 1: + pytest.skip("--measure-iters must be >= 1") + + expected = np.matmul(x, y) + + # Weights y are pre-written by the fixture (like ML model parameters), so each + # pipeline only writes its x, executes, and reads its output. With the balanced + # overlapping shape, write x + read out ~= exec time, so overlapping multiple + # pipelines on a core can hide DMA behind compute (and vice versa). + async def inference_pipeline(core_id, p): + await spike_async.tensor_write(inputs_x[core_id][p], x) + await spike_async.execute(models[core_id], input_sets[core_id][p], output_sets[core_id][p]) + await spike_async.tensor_read(outputs[core_id][p], output_arrays[core_id][p]) + + def run_cores(core_ids): + """One iteration: num_pipelines pipelines on each given core, all concurrent. + + Returns elapsed seconds for the whole batch to complete. + """ + start = time.perf_counter() + futs = [ + spike_async.submit(inference_pipeline(core_id, p)) + for core_id in core_ids + for p in range(num_pipelines) + ] + for fut in futs: + fut.wait() + return time.perf_counter() - start + + def measure(core_ids): + """Run warmup iterations (untimed), then measurement iterations (timed).""" + for _ in range(warmup_iters): + run_cores(core_ids) + return [run_cores(core_ids) for _ in range(measure_iters)] + + def check_outputs(core_ids, label): + for core_id in core_ids: + for p in range(num_pipelines): + assert np.allclose(output_arrays[core_id][p], expected, rtol=1e-3), \ + f"Output mismatch on core {core_id} pipeline {p} ({label})" + + # Baseline: all pipelines on core 0 only. Since the all-cores run executes the + # same per-core pipelines on every core concurrently, in a zero-overhead + # coroutine system the two wall-clock times should be identical. Any difference + # measures the overhead the coroutine/event-loop system adds when scheduling + # many concurrent pipelines across cores. + single_times = measure([0]) + check_outputs([0], "single-core baseline") + + all_core_ids = list(range(num_cores)) + all_times = measure(all_core_ids) + check_outputs(all_core_ids, "all-cores") + + single_mean = float(np.mean(single_times)) + all_mean = float(np.mean(all_times)) + overhead = all_mean - single_mean + print( + f"\n({num_pipelines} pipeline(s)/core; warmup={warmup_iters}, measure={measure_iters} iters; " + f"times are mean +/- std)\n" + f"Single-core (core 0, {num_pipelines} pipelines): " + f"{single_mean * 1e3:8.2f} +/- {np.std(single_times) * 1e3:6.2f} ms " + f"({num_pipelines / single_mean:.1f} pipelines/s)\n" + f"All cores ({num_cores}x{num_pipelines}={num_cores * num_pipelines} pipelines): " + f"{all_mean * 1e3:8.2f} +/- {np.std(all_times) * 1e3:6.2f} ms " + f"({num_cores * num_pipelines / all_mean:.1f} pipelines/s)\n" + f"Coroutine overhead: {overhead * 1e3:8.2f} ms " + f"({all_mean / single_mean:.2f}x single-core)" + ) + + +def test_tensor_read_write(spike_async): + """Test basic tensor read/write operations.""" + # Allocate a test tensor + tensor_size = 1024 * 1024 # 1MB + tensor = spike_async.allocate_tensor(size=tensor_size, core_id=0, name="test_tensor") + + # Create test data + test_data = np.random.rand(tensor_size // 8).astype(np.float64) + test_bytes = test_data.tobytes() + + # Submit async write and read operations + write_fut = spike_async.tensor_write(tensor, test_bytes) + read_fut = spike_async.tensor_read(tensor, deps=[write_fut]) + + # Wait for completion + read_data = read_fut.wait() + + # Verify data + read_array = np.frombuffer(read_data, dtype=np.float64) + assert np.array_equal(read_array, test_data), "Tensor read/write data mismatch" + +if __name__ == "__main__": + pytest.main(["-v", __file__]) \ No newline at end of file