diff --git a/barretenberg/cpp/src/CMakeLists.txt b/barretenberg/cpp/src/CMakeLists.txt index f511f5a29c5b..20cfb695c920 100644 --- a/barretenberg/cpp/src/CMakeLists.txt +++ b/barretenberg/cpp/src/CMakeLists.txt @@ -127,6 +127,8 @@ if(NOT FUZZING AND NOT WASM AND NOT BB_LITE) # NOTE: Do not conditionally base this on the AVM flag as it defines a necessary vm2_sim library. add_subdirectory(barretenberg/vm2) add_subdirectory(barretenberg/ipc) + add_subdirectory(barretenberg/wsdb) + add_subdirectory(barretenberg/wsdb_client) add_subdirectory(barretenberg/nodejs_module) endif() diff --git a/barretenberg/cpp/src/barretenberg/wsdb/CMakeLists.txt b/barretenberg/cpp/src/barretenberg/wsdb/CMakeLists.txt new file mode 100644 index 000000000000..f7fead2146fe --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/CMakeLists.txt @@ -0,0 +1,44 @@ +if(NOT(FUZZING) AND NOT(WASM)) + # IPC client library (used by AVM simulator to talk to aztec-wsdb) + # Generated via: cd barretenberg/ts && npx tsx src/aztec-wsdb/generate.ts + add_library( + wsdb_ipc_client + STATIC + wsdb_ipc_client_generated.cpp + ) + target_link_libraries( + wsdb_ipc_client + PUBLIC + barretenberg + ipc + ) + + # aztec-wsdb binary (standalone world state database server) + add_executable( + aztec-wsdb + main.cpp + cli.cpp + wsdb_execute.cpp + wsdb_ipc_server.cpp + ) + target_link_libraries( + aztec-wsdb + PRIVATE + barretenberg + world_state + ipc + env + ) + if(ENABLE_STACKTRACES) + target_link_libraries( + aztec-wsdb + PUBLIC + Backward::Interface + ) + target_link_options( + aztec-wsdb + PRIVATE + -ldw -lelf + ) + endif() +endif() diff --git a/barretenberg/cpp/src/barretenberg/wsdb/cli.cpp b/barretenberg/cpp/src/barretenberg/wsdb/cli.cpp new file mode 100644 index 000000000000..7f9fd1899886 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/cli.cpp @@ -0,0 +1,136 @@ +#include "barretenberg/wsdb/cli.hpp" +#include "barretenberg/common/log.hpp" +#include "barretenberg/common/throw_or_abort.hpp" +#include "barretenberg/serialize/msgpack.hpp" +#include "barretenberg/wsdb/wsdb_execute.hpp" +#include "barretenberg/wsdb/wsdb_ipc_server.hpp" + +#include "barretenberg/bb/deps/cli11.hpp" +#include +#include +#include +#include +#include + +namespace bb::wsdb { + +using namespace bb::world_state; +using namespace bb::crypto::merkle_tree; + +namespace { + +struct WsdbApi { + WsdbCommand commands; + WsdbCommandResponse responses; + SERIALIZATION_FIELDS(commands, responses); +}; + +std::string get_wsdb_schema_as_json() +{ + return msgpack_schema_to_string(WsdbApi{}); +} + +} // namespace + +int parse_and_run_wsdb(int argc, char* argv[]) +{ + CLI::App app{ "aztec-wsdb: Standalone world state database server" }; + app.require_subcommand(1); + + // ----------------------------------------------------------------------- + // Subcommand: msgpack + // ----------------------------------------------------------------------- + CLI::App* msgpack_command = app.add_subcommand("msgpack", "Msgpack API interface."); + + // msgpack schema + CLI::App* msgpack_schema_command = + msgpack_command->add_subcommand("schema", "Output a msgpack schema encoded as JSON to stdout."); + + // msgpack run + CLI::App* msgpack_run_command = + msgpack_command->add_subcommand("run", "Start the world state database IPC server."); + + std::string input_path; + msgpack_run_command->add_option( + "-i,--input", input_path, "IPC socket/shm path (.sock for UDS, .shm for shared memory)"); + + std::string data_dir; + msgpack_run_command->add_option("-d,--data-dir", data_dir, "Data directory for LMDB stores")->required(); + + // Tree heights (JSON map: treeId -> height) + std::string tree_heights_json; + msgpack_run_command->add_option("--tree-heights", tree_heights_json, "Tree heights as JSON: {0:40,1:32,...}"); + + // Tree prefill sizes + std::string tree_prefill_json; + msgpack_run_command->add_option( + "--tree-prefill", tree_prefill_json, "Tree prefill sizes as JSON: {0:128,2:128,...}"); + + // Map sizes (KB) + std::string map_sizes_json; + msgpack_run_command->add_option("--map-sizes", map_sizes_json, "LMDB map sizes in KB as JSON: {0:1024,...}"); + + uint32_t threads = 16; + msgpack_run_command->add_option("-t,--threads", threads, "Thread pool size (default: 16)") + ->check(CLI::PositiveNumber); + + uint32_t initial_header_generator_point = 0; + msgpack_run_command->add_option( + "--initial-header-generator-point", initial_header_generator_point, "Header generator point (default: 0)"); + + // Prefilled public data as JSON array of [slot_hex, value_hex] pairs + std::string prefilled_public_data_json; + msgpack_run_command->add_option( + "--prefilled-public-data", prefilled_public_data_json, "Prefilled public data as JSON array"); + + uint64_t genesis_timestamp = 0; + msgpack_run_command->add_option("--genesis-timestamp", genesis_timestamp, "Genesis block timestamp (default: 0)"); + + size_t request_ring_size = 1024 * 1024; + msgpack_run_command + ->add_option( + "--request-ring-size", request_ring_size, "Request ring buffer size for shared memory IPC (default: 1MB)") + ->check(CLI::PositiveNumber); + + size_t response_ring_size = 1024 * 1024; + msgpack_run_command + ->add_option("--response-ring-size", + response_ring_size, + "Response ring buffer size for shared memory IPC (default: 1MB)") + ->check(CLI::PositiveNumber); + + // Parse CLI + try { + app.parse(argc, argv); + } catch (const CLI::ParseError& e) { + return app.exit(e); + } + + try { + if (msgpack_schema_command->parsed()) { + std::cout << get_wsdb_schema_as_json() << std::endl; + return 0; + } + + if (msgpack_run_command->parsed()) { + return execute_wsdb_server(input_path, + data_dir, + tree_heights_json, + tree_prefill_json, + map_sizes_json, + threads, + initial_header_generator_point, + prefilled_public_data_json, + genesis_timestamp, + request_ring_size, + response_ring_size); + } + } catch (const std::exception& e) { + std::cerr << "Error: " << e.what() << '\n'; + return 1; + } + + return 0; +} + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/cli.hpp b/barretenberg/cpp/src/barretenberg/wsdb/cli.hpp new file mode 100644 index 000000000000..02b8705a5f4d --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/cli.hpp @@ -0,0 +1,7 @@ +#pragma once + +namespace bb::wsdb { + +int parse_and_run_wsdb(int argc, char* argv[]); + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/main.cpp b/barretenberg/cpp/src/barretenberg/wsdb/main.cpp new file mode 100644 index 000000000000..8857922c678a --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/main.cpp @@ -0,0 +1,6 @@ +#include "barretenberg/wsdb/cli.hpp" + +int main(int argc, char* argv[]) +{ + return bb::wsdb::parse_and_run_wsdb(argc, argv); +} diff --git a/barretenberg/cpp/src/barretenberg/wsdb/wsdb_commands.hpp b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_commands.hpp new file mode 100644 index 000000000000..5ccf8cc760bc --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_commands.hpp @@ -0,0 +1,536 @@ +#pragma once +/** + * @file wsdb_commands.hpp + * @brief NamedUnion command structs for the aztec-wsdb world state database API. + * + * Each command follows the bbapi pattern: + * - static constexpr MSGPACK_SCHEMA_NAME for NamedUnion dispatch + * - Nested Response struct with its own MSGPACK_SCHEMA_NAME + * - Request fields with SERIALIZATION_FIELDS + * - execute(WsdbRequest&) && method (implemented in wsdb_execute.cpp) + */ + +#include "barretenberg/crypto/merkle_tree/hash_path.hpp" +#include "barretenberg/crypto/merkle_tree/indexed_tree/indexed_leaf.hpp" +#include "barretenberg/crypto/merkle_tree/response.hpp" +#include "barretenberg/crypto/merkle_tree/types.hpp" +#include "barretenberg/ecc/curves/bn254/fr.hpp" +#include "barretenberg/serialize/msgpack.hpp" +#include "barretenberg/world_state/fork.hpp" +#include "barretenberg/world_state/types.hpp" +#include +#include +#include +#include + +namespace bb::wsdb { + +using namespace bb::world_state; +using namespace bb::crypto::merkle_tree; + +// Forward declaration +struct WsdbRequest; + +// --------------------------------------------------------------------------- +// Tree info / state queries +// --------------------------------------------------------------------------- + +struct WsdbGetTreeInfo { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetTreeInfo"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetTreeInfoResponse"; + MerkleTreeId treeId; + fr root; + index_t size; + uint32_t depth; + SERIALIZATION_FIELDS(treeId, root, size, depth); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision); + bool operator==(const WsdbGetTreeInfo&) const = default; +}; + +struct WsdbGetStateReference { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetStateReference"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetStateReferenceResponse"; + StateReference state; + SERIALIZATION_FIELDS(state); + bool operator==(const Response&) const = default; + }; + WorldStateRevision revision; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(revision); + bool operator==(const WsdbGetStateReference&) const = default; +}; + +struct WsdbGetInitialStateReference { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetInitialStateReference"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetInitialStateReferenceResponse"; + StateReference state; + SERIALIZATION_FIELDS(state); + bool operator==(const Response&) const = default; + }; + Response execute(WsdbRequest& request) &&; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const WsdbGetInitialStateReference&) const = default; +}; + +// --------------------------------------------------------------------------- +// Leaf queries +// --------------------------------------------------------------------------- + +struct WsdbGetLeafValue { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetLeafValue"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetLeafValueResponse"; + // Polymorphic: Fr, NullifierLeafValue, or PublicDataLeafValue serialized as bytes + std::optional> value; + SERIALIZATION_FIELDS(value); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + index_t leafIndex; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision, leafIndex); + bool operator==(const WsdbGetLeafValue&) const = default; +}; + +struct WsdbGetLeafPreimage { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetLeafPreimage"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetLeafPreimageResponse"; + // Serialized indexed leaf (NullifierLeafValue or PublicDataLeafValue) + std::optional> preimage; + SERIALIZATION_FIELDS(preimage); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + index_t leafIndex; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision, leafIndex); + bool operator==(const WsdbGetLeafPreimage&) const = default; +}; + +struct WsdbGetSiblingPath { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetSiblingPath"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetSiblingPathResponse"; + fr_sibling_path path; + SERIALIZATION_FIELDS(path); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + index_t leafIndex; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision, leafIndex); + bool operator==(const WsdbGetSiblingPath&) const = default; +}; + +struct WsdbGetBlockNumbersForLeafIndices { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetBlockNumbersForLeafIndices"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetBlockNumbersForLeafIndicesResponse"; + std::vector> blockNumbers; + SERIALIZATION_FIELDS(blockNumbers); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + std::vector leafIndices; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision, leafIndices); + bool operator==(const WsdbGetBlockNumbersForLeafIndices&) const = default; +}; + +// --------------------------------------------------------------------------- +// Leaf search operations +// --------------------------------------------------------------------------- + +struct WsdbFindLeafIndices { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFindLeafIndices"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFindLeafIndicesResponse"; + std::vector> indices; + SERIALIZATION_FIELDS(indices); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + // Polymorphic leaves: each leaf is serialized as bytes + std::vector> leaves; + index_t startIndex; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision, leaves, startIndex); + bool operator==(const WsdbFindLeafIndices&) const = default; +}; + +struct WsdbFindLowLeaf { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFindLowLeaf"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFindLowLeafResponse"; + bool alreadyPresent; + index_t index; + SERIALIZATION_FIELDS(alreadyPresent, index); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + fr key; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision, key); + bool operator==(const WsdbFindLowLeaf&) const = default; +}; + +struct WsdbFindSiblingPaths { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFindSiblingPaths"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFindSiblingPathsResponse"; + std::vector> paths; + SERIALIZATION_FIELDS(paths); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + WorldStateRevision revision; + // Polymorphic leaves + std::vector> leaves; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, revision, leaves); + bool operator==(const WsdbFindSiblingPaths&) const = default; +}; + +// --------------------------------------------------------------------------- +// Tree mutation operations +// --------------------------------------------------------------------------- + +struct WsdbAppendLeaves { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbAppendLeaves"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbAppendLeavesResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + // Polymorphic leaves + std::vector> leaves; + Fork::Id forkId{ CANONICAL_FORK_ID }; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, leaves, forkId); + bool operator==(const WsdbAppendLeaves&) const = default; +}; + +struct WsdbBatchInsert { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbBatchInsert"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbBatchInsertResponse"; + // Serialized BatchInsertionResult + std::vector result; + SERIALIZATION_FIELDS(result); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + std::vector> leaves; + uint32_t subtreeDepth; + Fork::Id forkId{ CANONICAL_FORK_ID }; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, leaves, subtreeDepth, forkId); + bool operator==(const WsdbBatchInsert&) const = default; +}; + +struct WsdbSequentialInsert { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbSequentialInsert"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbSequentialInsertResponse"; + // Serialized SequentialInsertionResult + std::vector result; + SERIALIZATION_FIELDS(result); + bool operator==(const Response&) const = default; + }; + MerkleTreeId treeId; + std::vector> leaves; + Fork::Id forkId{ CANONICAL_FORK_ID }; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(treeId, leaves, forkId); + bool operator==(const WsdbSequentialInsert&) const = default; +}; + +struct WsdbUpdateArchive { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbUpdateArchive"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbUpdateArchiveResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + StateReference blockStateRef; + bb::fr blockHeaderHash; + Fork::Id forkId{ CANONICAL_FORK_ID }; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(blockStateRef, blockHeaderHash, forkId); + bool operator==(const WsdbUpdateArchive&) const = default; +}; + +// --------------------------------------------------------------------------- +// Transaction operations +// --------------------------------------------------------------------------- + +struct WsdbCommit { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCommit"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCommitResponse"; + WorldStateStatusFull status; + SERIALIZATION_FIELDS(status); + bool operator==(const Response&) const = default; + }; + Response execute(WsdbRequest& request) &&; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const WsdbCommit&) const = default; +}; + +struct WsdbRollback { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRollback"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRollbackResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + Response execute(WsdbRequest& request) &&; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const WsdbRollback&) const = default; +}; + +// --------------------------------------------------------------------------- +// Block synchronization +// --------------------------------------------------------------------------- + +struct WsdbSyncBlock { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbSyncBlock"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbSyncBlockResponse"; + WorldStateStatusFull status; + SERIALIZATION_FIELDS(status); + bool operator==(const Response&) const = default; + }; + block_number_t blockNumber; + StateReference blockStateRef; + bb::fr blockHeaderHash; + std::vector paddedNoteHashes; + std::vector paddedL1ToL2Messages; + std::vector paddedNullifiers; + std::vector publicDataWrites; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(blockNumber, + blockStateRef, + blockHeaderHash, + paddedNoteHashes, + paddedL1ToL2Messages, + paddedNullifiers, + publicDataWrites); + bool operator==(const WsdbSyncBlock&) const = default; +}; + +// --------------------------------------------------------------------------- +// Fork management +// --------------------------------------------------------------------------- + +struct WsdbCreateFork { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCreateFork"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCreateForkResponse"; + uint64_t forkId; + SERIALIZATION_FIELDS(forkId); + bool operator==(const Response&) const = default; + }; + bool latest; + block_number_t blockNumber; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(latest, blockNumber); + bool operator==(const WsdbCreateFork&) const = default; +}; + +struct WsdbDeleteFork { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbDeleteFork"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbDeleteForkResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + uint64_t forkId; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(forkId); + bool operator==(const WsdbDeleteFork&) const = default; +}; + +// --------------------------------------------------------------------------- +// Block management +// --------------------------------------------------------------------------- + +struct WsdbFinalizeBlocks { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFinalizeBlocks"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbFinalizeBlocksResponse"; + WorldStateStatusSummary status; + SERIALIZATION_FIELDS(status); + bool operator==(const Response&) const = default; + }; + block_number_t toBlockNumber; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(toBlockNumber); + bool operator==(const WsdbFinalizeBlocks&) const = default; +}; + +struct WsdbUnwindBlocks { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbUnwindBlocks"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbUnwindBlocksResponse"; + WorldStateStatusFull status; + SERIALIZATION_FIELDS(status); + bool operator==(const Response&) const = default; + }; + block_number_t toBlockNumber; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(toBlockNumber); + bool operator==(const WsdbUnwindBlocks&) const = default; +}; + +struct WsdbRemoveHistoricalBlocks { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRemoveHistoricalBlocks"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRemoveHistoricalBlocksResponse"; + WorldStateStatusFull status; + SERIALIZATION_FIELDS(status); + bool operator==(const Response&) const = default; + }; + block_number_t toBlockNumber; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(toBlockNumber); + bool operator==(const WsdbRemoveHistoricalBlocks&) const = default; +}; + +// --------------------------------------------------------------------------- +// Status +// --------------------------------------------------------------------------- + +struct WsdbGetStatus { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetStatus"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbGetStatusResponse"; + WorldStateStatusSummary status; + SERIALIZATION_FIELDS(status); + bool operator==(const Response&) const = default; + }; + Response execute(WsdbRequest& request) &&; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const WsdbGetStatus&) const = default; +}; + +// --------------------------------------------------------------------------- +// Checkpoint operations +// --------------------------------------------------------------------------- + +struct WsdbCreateCheckpoint { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCreateCheckpoint"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCreateCheckpointResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + uint64_t forkId; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(forkId); + bool operator==(const WsdbCreateCheckpoint&) const = default; +}; + +struct WsdbCommitCheckpoint { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCommitCheckpoint"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCommitCheckpointResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + uint64_t forkId; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(forkId); + bool operator==(const WsdbCommitCheckpoint&) const = default; +}; + +struct WsdbRevertCheckpoint { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRevertCheckpoint"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRevertCheckpointResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + uint64_t forkId; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(forkId); + bool operator==(const WsdbRevertCheckpoint&) const = default; +}; + +struct WsdbCommitAllCheckpoints { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCommitAllCheckpoints"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCommitAllCheckpointsResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + uint64_t forkId; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(forkId); + bool operator==(const WsdbCommitAllCheckpoints&) const = default; +}; + +struct WsdbRevertAllCheckpoints { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRevertAllCheckpoints"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbRevertAllCheckpointsResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + uint64_t forkId; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(forkId); + bool operator==(const WsdbRevertAllCheckpoints&) const = default; +}; + +// --------------------------------------------------------------------------- +// Database operations +// --------------------------------------------------------------------------- + +struct WsdbCopyStores { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCopyStores"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbCopyStoresResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + std::string dstPath; + std::optional compact; + Response execute(WsdbRequest& request) &&; + SERIALIZATION_FIELDS(dstPath, compact); + bool operator==(const WsdbCopyStores&) const = default; +}; + +// --------------------------------------------------------------------------- +// Lifecycle +// --------------------------------------------------------------------------- + +struct WsdbShutdown { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbShutdown"; + struct Response { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbShutdownResponse"; + void msgpack(auto&& pack_fn) { pack_fn(); } + bool operator==(const Response&) const = default; + }; + void msgpack(auto&& pack_fn) { pack_fn(); } + Response execute(WsdbRequest& request) &&; + bool operator==(const WsdbShutdown&) const = default; +}; + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/wsdb_execute.cpp b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_execute.cpp new file mode 100644 index 000000000000..5a6282b9de8f --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_execute.cpp @@ -0,0 +1,414 @@ +#include "barretenberg/wsdb/wsdb_execute.hpp" +#include "barretenberg/crypto/merkle_tree/indexed_tree/indexed_leaf.hpp" +#include "barretenberg/crypto/merkle_tree/response.hpp" +#include "barretenberg/world_state/world_state.hpp" +#include +#include + +namespace bb::wsdb { + +using namespace bb::world_state; +using namespace bb::crypto::merkle_tree; + +// --------------------------------------------------------------------------- +// Helper: serialize a value to msgpack bytes +// --------------------------------------------------------------------------- + +template static std::vector serialize_to_msgpack(const T& value) +{ + msgpack::sbuffer buf; + msgpack::pack(buf, value); + return std::vector(buf.data(), buf.data() + buf.size()); +} + +// --------------------------------------------------------------------------- +// Helper: deserialize leaves from raw bytes based on tree type +// --------------------------------------------------------------------------- + +template +static std::vector deserialize_leaves(const std::vector>& raw_leaves) +{ + std::vector leaves; + leaves.reserve(raw_leaves.size()); + for (const auto& raw : raw_leaves) { + auto unpacked = msgpack::unpack(reinterpret_cast(raw.data()), raw.size()); + LeafType leaf; + unpacked.get().convert(leaf); + leaves.push_back(std::move(leaf)); + } + return leaves; +} + +// --------------------------------------------------------------------------- +// Top-level dispatch +// --------------------------------------------------------------------------- + +WsdbCommandResponse wsdb(WsdbRequest& request, WsdbCommand&& command) +{ + return execute(request, std::move(command)); +} + +// --------------------------------------------------------------------------- +// Tree info / state queries +// --------------------------------------------------------------------------- + +WsdbGetTreeInfo::Response WsdbGetTreeInfo::execute(WsdbRequest& request) && +{ + auto info = request.world_state.get_tree_info(revision, treeId); + return Response{ .treeId = treeId, .root = info.meta.root, .size = info.meta.size, .depth = info.meta.depth }; +} + +WsdbGetStateReference::Response WsdbGetStateReference::execute(WsdbRequest& request) && +{ + auto state = request.world_state.get_state_reference(revision); + return Response{ .state = state }; +} + +WsdbGetInitialStateReference::Response WsdbGetInitialStateReference::execute(WsdbRequest& request) && +{ + auto state = request.world_state.get_initial_state_reference(); + return Response{ .state = state }; +} + +// --------------------------------------------------------------------------- +// Leaf queries +// --------------------------------------------------------------------------- + +WsdbGetLeafValue::Response WsdbGetLeafValue::execute(WsdbRequest& request) && +{ + switch (treeId) { + case MerkleTreeId::NOTE_HASH_TREE: + case MerkleTreeId::L1_TO_L2_MESSAGE_TREE: + case MerkleTreeId::ARCHIVE: { + auto leaf = request.world_state.get_leaf(revision, treeId, leafIndex); + if (!leaf.has_value()) { + return Response{ .value = std::nullopt }; + } + return Response{ .value = serialize_to_msgpack(leaf.value()) }; + } + case MerkleTreeId::PUBLIC_DATA_TREE: { + auto leaf = request.world_state.get_leaf(revision, treeId, leafIndex); + if (!leaf.has_value()) { + return Response{ .value = std::nullopt }; + } + return Response{ .value = serialize_to_msgpack(leaf.value()) }; + } + case MerkleTreeId::NULLIFIER_TREE: { + auto leaf = request.world_state.get_leaf(revision, treeId, leafIndex); + if (!leaf.has_value()) { + return Response{ .value = std::nullopt }; + } + return Response{ .value = serialize_to_msgpack(leaf.value()) }; + } + default: + throw std::runtime_error("Unsupported tree type for get_leaf_value"); + } +} + +WsdbGetLeafPreimage::Response WsdbGetLeafPreimage::execute(WsdbRequest& request) && +{ + switch (treeId) { + case MerkleTreeId::NULLIFIER_TREE: { + auto leaf = request.world_state.get_indexed_leaf(revision, treeId, leafIndex); + if (!leaf.has_value()) { + return Response{ .preimage = std::nullopt }; + } + return Response{ .preimage = serialize_to_msgpack(leaf.value()) }; + } + case MerkleTreeId::PUBLIC_DATA_TREE: { + auto leaf = request.world_state.get_indexed_leaf(revision, treeId, leafIndex); + if (!leaf.has_value()) { + return Response{ .preimage = std::nullopt }; + } + return Response{ .preimage = serialize_to_msgpack(leaf.value()) }; + } + default: + throw std::runtime_error("Unsupported tree type for get_leaf_preimage"); + } +} + +WsdbGetSiblingPath::Response WsdbGetSiblingPath::execute(WsdbRequest& request) && +{ + fr_sibling_path path = request.world_state.get_sibling_path(revision, treeId, leafIndex); + return Response{ .path = path }; +} + +WsdbGetBlockNumbersForLeafIndices::Response WsdbGetBlockNumbersForLeafIndices::execute(WsdbRequest& request) && +{ + Response response; + request.world_state.get_block_numbers_for_leaf_indices(revision, treeId, leafIndices, response.blockNumbers); + return response; +} + +// --------------------------------------------------------------------------- +// Leaf search operations +// --------------------------------------------------------------------------- + +WsdbFindLeafIndices::Response WsdbFindLeafIndices::execute(WsdbRequest& request) && +{ + Response response; + switch (treeId) { + case MerkleTreeId::NOTE_HASH_TREE: + case MerkleTreeId::L1_TO_L2_MESSAGE_TREE: + case MerkleTreeId::ARCHIVE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.find_leaf_indices(revision, treeId, typed_leaves, response.indices, startIndex); + break; + } + case MerkleTreeId::PUBLIC_DATA_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.find_leaf_indices( + revision, treeId, typed_leaves, response.indices, startIndex); + break; + } + case MerkleTreeId::NULLIFIER_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.find_leaf_indices( + revision, treeId, typed_leaves, response.indices, startIndex); + break; + } + default: + throw std::runtime_error("Unsupported tree type for find_leaf_indices"); + } + return response; +} + +WsdbFindLowLeaf::Response WsdbFindLowLeaf::execute(WsdbRequest& request) && +{ + auto low_leaf_info = request.world_state.find_low_leaf_index(revision, treeId, key); + return Response{ .alreadyPresent = low_leaf_info.is_already_present, .index = low_leaf_info.index }; +} + +WsdbFindSiblingPaths::Response WsdbFindSiblingPaths::execute(WsdbRequest& request) && +{ + Response response; + switch (treeId) { + case MerkleTreeId::NOTE_HASH_TREE: + case MerkleTreeId::L1_TO_L2_MESSAGE_TREE: + case MerkleTreeId::ARCHIVE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.find_sibling_paths(revision, treeId, typed_leaves, response.paths); + break; + } + case MerkleTreeId::PUBLIC_DATA_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.find_sibling_paths(revision, treeId, typed_leaves, response.paths); + break; + } + case MerkleTreeId::NULLIFIER_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.find_sibling_paths(revision, treeId, typed_leaves, response.paths); + break; + } + default: + throw std::runtime_error("Unsupported tree type for find_sibling_paths"); + } + return response; +} + +// --------------------------------------------------------------------------- +// Tree mutation operations +// --------------------------------------------------------------------------- + +WsdbAppendLeaves::Response WsdbAppendLeaves::execute(WsdbRequest& request) && +{ + switch (treeId) { + case MerkleTreeId::NOTE_HASH_TREE: + case MerkleTreeId::L1_TO_L2_MESSAGE_TREE: + case MerkleTreeId::ARCHIVE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.append_leaves(treeId, typed_leaves, forkId); + break; + } + case MerkleTreeId::PUBLIC_DATA_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.append_leaves(treeId, typed_leaves, forkId); + break; + } + case MerkleTreeId::NULLIFIER_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + request.world_state.append_leaves(treeId, typed_leaves, forkId); + break; + } + default: + throw std::runtime_error("Unsupported tree type for append_leaves"); + } + return Response{}; +} + +WsdbBatchInsert::Response WsdbBatchInsert::execute(WsdbRequest& request) && +{ + switch (treeId) { + case MerkleTreeId::PUBLIC_DATA_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + auto result = request.world_state.batch_insert_indexed_leaves( + treeId, typed_leaves, subtreeDepth, forkId); + return Response{ .result = serialize_to_msgpack(result) }; + } + case MerkleTreeId::NULLIFIER_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + auto result = request.world_state.batch_insert_indexed_leaves( + treeId, typed_leaves, subtreeDepth, forkId); + return Response{ .result = serialize_to_msgpack(result) }; + } + default: + throw std::runtime_error("Unsupported tree type for batch_insert"); + } +} + +WsdbSequentialInsert::Response WsdbSequentialInsert::execute(WsdbRequest& request) && +{ + switch (treeId) { + case MerkleTreeId::PUBLIC_DATA_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + auto result = request.world_state.insert_indexed_leaves(treeId, typed_leaves, forkId); + return Response{ .result = serialize_to_msgpack(result) }; + } + case MerkleTreeId::NULLIFIER_TREE: { + auto typed_leaves = deserialize_leaves(leaves); + auto result = request.world_state.insert_indexed_leaves(treeId, typed_leaves, forkId); + return Response{ .result = serialize_to_msgpack(result) }; + } + default: + throw std::runtime_error("Unsupported tree type for sequential_insert"); + } +} + +WsdbUpdateArchive::Response WsdbUpdateArchive::execute(WsdbRequest& request) && +{ + request.world_state.update_archive(blockStateRef, blockHeaderHash, forkId); + return Response{}; +} + +// --------------------------------------------------------------------------- +// Transaction operations +// --------------------------------------------------------------------------- + +WsdbCommit::Response WsdbCommit::execute(WsdbRequest& request) && +{ + WorldStateStatusFull status; + request.world_state.commit(status); + return Response{ .status = status }; +} + +WsdbRollback::Response WsdbRollback::execute(WsdbRequest& request) && +{ + request.world_state.rollback(); + return Response{}; +} + +// --------------------------------------------------------------------------- +// Block synchronization +// --------------------------------------------------------------------------- + +WsdbSyncBlock::Response WsdbSyncBlock::execute(WsdbRequest& request) && +{ + WorldStateStatusFull status = request.world_state.sync_block( + blockStateRef, blockHeaderHash, paddedNoteHashes, paddedL1ToL2Messages, paddedNullifiers, publicDataWrites); + return Response{ .status = status }; +} + +// --------------------------------------------------------------------------- +// Fork management +// --------------------------------------------------------------------------- + +WsdbCreateFork::Response WsdbCreateFork::execute(WsdbRequest& request) && +{ + std::optional block = latest ? std::nullopt : std::optional(blockNumber); + uint64_t id = request.world_state.create_fork(block); + return Response{ .forkId = id }; +} + +WsdbDeleteFork::Response WsdbDeleteFork::execute(WsdbRequest& request) && +{ + request.world_state.delete_fork(forkId); + return Response{}; +} + +// --------------------------------------------------------------------------- +// Block management +// --------------------------------------------------------------------------- + +WsdbFinalizeBlocks::Response WsdbFinalizeBlocks::execute(WsdbRequest& request) && +{ + WorldStateStatusSummary status = request.world_state.set_finalized_blocks(toBlockNumber); + return Response{ .status = status }; +} + +WsdbUnwindBlocks::Response WsdbUnwindBlocks::execute(WsdbRequest& request) && +{ + WorldStateStatusFull status = request.world_state.unwind_blocks(toBlockNumber); + return Response{ .status = status }; +} + +WsdbRemoveHistoricalBlocks::Response WsdbRemoveHistoricalBlocks::execute(WsdbRequest& request) && +{ + WorldStateStatusFull status = request.world_state.remove_historical_blocks(toBlockNumber); + return Response{ .status = status }; +} + +// --------------------------------------------------------------------------- +// Status +// --------------------------------------------------------------------------- + +WsdbGetStatus::Response WsdbGetStatus::execute(WsdbRequest& request) && +{ + WorldStateStatusSummary status; + request.world_state.get_status_summary(status); + return Response{ .status = status }; +} + +// --------------------------------------------------------------------------- +// Checkpoint operations +// --------------------------------------------------------------------------- + +WsdbCreateCheckpoint::Response WsdbCreateCheckpoint::execute(WsdbRequest& request) && +{ + request.world_state.checkpoint(forkId); + return Response{}; +} + +WsdbCommitCheckpoint::Response WsdbCommitCheckpoint::execute(WsdbRequest& request) && +{ + request.world_state.commit_checkpoint(forkId); + return Response{}; +} + +WsdbRevertCheckpoint::Response WsdbRevertCheckpoint::execute(WsdbRequest& request) && +{ + request.world_state.revert_checkpoint(forkId); + return Response{}; +} + +WsdbCommitAllCheckpoints::Response WsdbCommitAllCheckpoints::execute(WsdbRequest& request) && +{ + request.world_state.commit_all_checkpoints_to(forkId, 0); + return Response{}; +} + +WsdbRevertAllCheckpoints::Response WsdbRevertAllCheckpoints::execute(WsdbRequest& request) && +{ + request.world_state.revert_all_checkpoints_to(forkId, 0); + return Response{}; +} + +// --------------------------------------------------------------------------- +// Database operations +// --------------------------------------------------------------------------- + +WsdbCopyStores::Response WsdbCopyStores::execute(WsdbRequest& request) && +{ + request.world_state.copy_stores(dstPath, compact.value_or(false)); + return Response{}; +} + +// --------------------------------------------------------------------------- +// Lifecycle +// --------------------------------------------------------------------------- + +WsdbShutdown::Response WsdbShutdown::execute(WsdbRequest& /* request */) && +{ + return Response{}; +} + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/wsdb_execute.hpp b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_execute.hpp new file mode 100644 index 000000000000..20de7d738c4c --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_execute.hpp @@ -0,0 +1,115 @@ +#pragma once +/** + * @file wsdb_execute.hpp + * @brief WsdbCommand NamedUnion, WsdbRequest context, and dispatch function. + */ + +#include "barretenberg/common/named_union.hpp" +#include "barretenberg/world_state/world_state.hpp" +#include "barretenberg/wsdb/wsdb_commands.hpp" + +namespace bb::wsdb { + +/** + * @brief Context passed to each command's execute() method, providing access to the WorldState. + */ +struct WsdbRequest { + world_state::WorldState& world_state; +}; + +/** + * @brief Error response returned when a command fails. + */ +struct WsdbErrorResponse { + static constexpr const char MSGPACK_SCHEMA_NAME[] = "WsdbErrorResponse"; + std::string message; + SERIALIZATION_FIELDS(message); + bool operator==(const WsdbErrorResponse&) const = default; +}; + +/** + * @brief Union of all wsdb commands (request types). + */ +using WsdbCommand = NamedUnion; + +/** + * @brief Union of all wsdb response types. + */ +using WsdbCommandResponse = NamedUnion; + +/** + * @brief Execute a wsdb command using the visitor pattern. + */ +inline WsdbCommandResponse execute(WsdbRequest& request, WsdbCommand&& command) +{ + return std::move(command).visit([&request](auto&& cmd) -> WsdbCommandResponse { + using CmdType = std::decay_t; + return std::forward(cmd).execute(request); + }); +} + +/** + * @brief Top-level wsdb API entry point. Takes a WsdbRequest and dispatches the command. + */ +WsdbCommandResponse wsdb(WsdbRequest& request, WsdbCommand&& command); + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.cpp b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.cpp new file mode 100644 index 000000000000..08da0e939635 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.cpp @@ -0,0 +1,225 @@ +// AUTOGENERATED FILE - DO NOT EDIT + +#include "barretenberg/wsdb/wsdb_ipc_client_generated.hpp" +#include "barretenberg/serialize/msgpack.hpp" +#include "barretenberg/serialize/msgpack_impl.hpp" +#include "barretenberg/wsdb/wsdb_execute.hpp" + +#include +#include + +namespace bb::wsdb { + +WsdbIpcClient::WsdbIpcClient(const std::string& socket_path) + : client_(ipc::IpcClient::create_socket(socket_path)) +{ + if (!client_->connect()) { + throw std::runtime_error("Failed to connect to server at " + socket_path); + } +} + +WsdbIpcClient::~WsdbIpcClient() +{ + if (client_) { + client_->close(); + } +} + +template typename Cmd::Response WsdbIpcClient::send(Cmd&& cmd) const +{ + // Wrap command in WsdbCommand NamedUnion, then in a 1-element tuple (matches server expectations) + WsdbCommand command = std::forward(cmd); + auto wrapped = std::make_tuple(std::move(command)); + + // Serialize to msgpack + msgpack::sbuffer send_buffer; + msgpack::pack(send_buffer, wrapped); + + // Send to server + constexpr uint64_t timeout_ns = 30'000'000'000ULL; // 30 seconds + if (!client_->send(send_buffer.data(), send_buffer.size(), timeout_ns)) { + throw std::runtime_error("Failed to send command to server"); + } + + // Receive response + auto response_span = client_->receive(timeout_ns); + if (response_span.empty()) { + throw std::runtime_error("Empty response from server"); + } + + // Deserialize response + auto unpacked = msgpack::unpack(reinterpret_cast(response_span.data()), response_span.size()); + auto response_obj = unpacked.get(); + + WsdbCommandResponse response; + response_obj.convert(response); + + // Release the receive buffer + client_->release(response_span.size()); + + // Check for error response + return std::move(response).visit([](auto&& resp) -> typename Cmd::Response { + using RespType = std::decay_t; + + if constexpr (std::is_same_v) { + throw std::runtime_error("Server error: " + resp.message); + } else if constexpr (std::is_same_v) { + return std::forward(resp); + } else { + throw std::runtime_error("Unexpected response type from server"); + } + }); +} + +WsdbGetTreeInfo::Response WsdbIpcClient::get_tree_info(WsdbGetTreeInfo cmd) const +{ + return send(std::move(cmd)); +} + +WsdbGetStateReference::Response WsdbIpcClient::get_state_reference(WsdbGetStateReference cmd) const +{ + return send(std::move(cmd)); +} + +WsdbGetInitialStateReference::Response WsdbIpcClient::get_initial_state_reference() const +{ + return send(WsdbGetInitialStateReference{}); +} + +WsdbGetLeafValue::Response WsdbIpcClient::get_leaf_value(WsdbGetLeafValue cmd) const +{ + return send(std::move(cmd)); +} + +WsdbGetLeafPreimage::Response WsdbIpcClient::get_leaf_preimage(WsdbGetLeafPreimage cmd) const +{ + return send(std::move(cmd)); +} + +WsdbGetSiblingPath::Response WsdbIpcClient::get_sibling_path(WsdbGetSiblingPath cmd) const +{ + return send(std::move(cmd)); +} + +WsdbGetBlockNumbersForLeafIndices::Response WsdbIpcClient::get_block_numbers_for_leaf_indices( + WsdbGetBlockNumbersForLeafIndices cmd) const +{ + return send(std::move(cmd)); +} + +WsdbFindLeafIndices::Response WsdbIpcClient::find_leaf_indices(WsdbFindLeafIndices cmd) const +{ + return send(std::move(cmd)); +} + +WsdbFindLowLeaf::Response WsdbIpcClient::find_low_leaf(WsdbFindLowLeaf cmd) const +{ + return send(std::move(cmd)); +} + +WsdbFindSiblingPaths::Response WsdbIpcClient::find_sibling_paths(WsdbFindSiblingPaths cmd) const +{ + return send(std::move(cmd)); +} + +void WsdbIpcClient::append_leaves(WsdbAppendLeaves cmd) const +{ + send(std::move(cmd)); +} + +WsdbBatchInsert::Response WsdbIpcClient::batch_insert(WsdbBatchInsert cmd) const +{ + return send(std::move(cmd)); +} + +WsdbSequentialInsert::Response WsdbIpcClient::sequential_insert(WsdbSequentialInsert cmd) const +{ + return send(std::move(cmd)); +} + +void WsdbIpcClient::update_archive(WsdbUpdateArchive cmd) const +{ + send(std::move(cmd)); +} + +WsdbCommit::Response WsdbIpcClient::commit() +{ + return send(WsdbCommit{}); +} + +void WsdbIpcClient::rollback() +{ + send(WsdbRollback{}); +} + +WsdbSyncBlock::Response WsdbIpcClient::sync_block(WsdbSyncBlock cmd) +{ + return send(std::move(cmd)); +} + +WsdbCreateFork::Response WsdbIpcClient::create_fork(WsdbCreateFork cmd) +{ + return send(std::move(cmd)); +} + +void WsdbIpcClient::delete_fork(WsdbDeleteFork cmd) +{ + send(std::move(cmd)); +} + +WsdbFinalizeBlocks::Response WsdbIpcClient::finalize_blocks(WsdbFinalizeBlocks cmd) const +{ + return send(std::move(cmd)); +} + +WsdbUnwindBlocks::Response WsdbIpcClient::unwind_blocks(WsdbUnwindBlocks cmd) +{ + return send(std::move(cmd)); +} + +WsdbRemoveHistoricalBlocks::Response WsdbIpcClient::remove_historical_blocks(WsdbRemoveHistoricalBlocks cmd) const +{ + return send(std::move(cmd)); +} + +WsdbGetStatus::Response WsdbIpcClient::get_status() const +{ + return send(WsdbGetStatus{}); +} + +void WsdbIpcClient::create_checkpoint(WsdbCreateCheckpoint cmd) +{ + send(std::move(cmd)); +} + +void WsdbIpcClient::commit_checkpoint(WsdbCommitCheckpoint cmd) +{ + send(std::move(cmd)); +} + +void WsdbIpcClient::revert_checkpoint(WsdbRevertCheckpoint cmd) +{ + send(std::move(cmd)); +} + +void WsdbIpcClient::commit_all_checkpoints(WsdbCommitAllCheckpoints cmd) +{ + send(std::move(cmd)); +} + +void WsdbIpcClient::revert_all_checkpoints(WsdbRevertAllCheckpoints cmd) +{ + send(std::move(cmd)); +} + +void WsdbIpcClient::copy_stores(WsdbCopyStores cmd) const +{ + send(std::move(cmd)); +} + +void WsdbIpcClient::shutdown() +{ + send(WsdbShutdown{}); +} + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.hpp b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.hpp new file mode 100644 index 000000000000..cc5fede28845 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.hpp @@ -0,0 +1,65 @@ +// AUTOGENERATED FILE - DO NOT EDIT +#pragma once + +#include "barretenberg/common/try_catch_shim.hpp" +#include "barretenberg/ipc/ipc_client.hpp" +#include "barretenberg/wsdb/wsdb_execute.hpp" + +#include +#include + +namespace bb::wsdb { + +/** + * @brief Auto-generated IPC client. + * + * Each method sends a msgpack-serialized command to the server over UDS + * and returns the typed response. All methods block until the response arrives. + */ +class WsdbIpcClient { + public: + explicit WsdbIpcClient(const std::string& socket_path); + ~WsdbIpcClient(); + + WsdbIpcClient(const WsdbIpcClient&) = delete; + WsdbIpcClient& operator=(const WsdbIpcClient&) = delete; + + WsdbGetTreeInfo::Response get_tree_info(WsdbGetTreeInfo cmd) const; + WsdbGetStateReference::Response get_state_reference(WsdbGetStateReference cmd) const; + WsdbGetInitialStateReference::Response get_initial_state_reference() const; + WsdbGetLeafValue::Response get_leaf_value(WsdbGetLeafValue cmd) const; + WsdbGetLeafPreimage::Response get_leaf_preimage(WsdbGetLeafPreimage cmd) const; + WsdbGetSiblingPath::Response get_sibling_path(WsdbGetSiblingPath cmd) const; + WsdbGetBlockNumbersForLeafIndices::Response get_block_numbers_for_leaf_indices( + WsdbGetBlockNumbersForLeafIndices cmd) const; + WsdbFindLeafIndices::Response find_leaf_indices(WsdbFindLeafIndices cmd) const; + WsdbFindLowLeaf::Response find_low_leaf(WsdbFindLowLeaf cmd) const; + WsdbFindSiblingPaths::Response find_sibling_paths(WsdbFindSiblingPaths cmd) const; + void append_leaves(WsdbAppendLeaves cmd) const; + WsdbBatchInsert::Response batch_insert(WsdbBatchInsert cmd) const; + WsdbSequentialInsert::Response sequential_insert(WsdbSequentialInsert cmd) const; + void update_archive(WsdbUpdateArchive cmd) const; + WsdbCommit::Response commit(); + void rollback(); + WsdbSyncBlock::Response sync_block(WsdbSyncBlock cmd); + WsdbCreateFork::Response create_fork(WsdbCreateFork cmd); + void delete_fork(WsdbDeleteFork cmd); + WsdbFinalizeBlocks::Response finalize_blocks(WsdbFinalizeBlocks cmd) const; + WsdbUnwindBlocks::Response unwind_blocks(WsdbUnwindBlocks cmd); + WsdbRemoveHistoricalBlocks::Response remove_historical_blocks(WsdbRemoveHistoricalBlocks cmd) const; + WsdbGetStatus::Response get_status() const; + void create_checkpoint(WsdbCreateCheckpoint cmd); + void commit_checkpoint(WsdbCommitCheckpoint cmd); + void revert_checkpoint(WsdbRevertCheckpoint cmd); + void commit_all_checkpoints(WsdbCommitAllCheckpoints cmd); + void revert_all_checkpoints(WsdbRevertAllCheckpoints cmd); + void copy_stores(WsdbCopyStores cmd) const; + void shutdown(); + + private: + template typename Cmd::Response send(Cmd&& cmd) const; + + mutable std::unique_ptr client_; +}; + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_server.cpp b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_server.cpp new file mode 100644 index 000000000000..b124326e8347 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_server.cpp @@ -0,0 +1,348 @@ +#include "barretenberg/wsdb/wsdb_ipc_server.hpp" +#include "barretenberg/common/log.hpp" +#include "barretenberg/crypto/merkle_tree/indexed_tree/indexed_leaf.hpp" +#include "barretenberg/ipc/ipc_server.hpp" +#include "barretenberg/serialize/msgpack.hpp" +#include "barretenberg/world_state/world_state.hpp" +#include "barretenberg/wsdb/wsdb_execute.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __linux__ +#include +#elif defined(__APPLE__) +#include +#endif + +// Use nlohmann/json if available, otherwise minimal parsing +#include + +namespace bb::wsdb { + +using namespace bb::world_state; +using namespace bb::crypto::merkle_tree; + +// --------------------------------------------------------------------------- +// Platform-specific parent death monitoring +// (Same pattern as api_msgpack.cpp) +// --------------------------------------------------------------------------- + +static void setup_parent_death_monitoring() +{ +#ifdef __linux__ + if (prctl(PR_SET_PDEATHSIG, SIGTERM) == -1) { + std::cerr << "Warning: Could not set parent death signal" << '\n'; + } +#elif defined(__APPLE__) + pid_t parent_pid = getppid(); + std::thread([parent_pid]() { + int kq = kqueue(); + if (kq == -1) { + std::cerr << "Warning: Could not create kqueue for parent monitoring" << '\n'; + return; + } + struct kevent change; + EV_SET(&change, parent_pid, EVFILT_PROC, EV_ADD | EV_ENABLE, NOTE_EXIT, 0, nullptr); + if (kevent(kq, &change, 1, nullptr, 0, nullptr) == -1) { + std::cerr << "Warning: Could not monitor parent process" << '\n'; + close(kq); + return; + } + struct kevent event; + kevent(kq, nullptr, 0, &event, 1, nullptr); + std::cerr << "Parent process exited, shutting down..." << '\n'; + close(kq); + std::exit(0); + }).detach(); +#endif +} + +// --------------------------------------------------------------------------- +// Simple JSON-like parsing for config maps +// Parses "{0:1024,1:2048,...}" into unordered_map +// --------------------------------------------------------------------------- + +static std::unordered_map parse_tree_uint64_map(const std::string& json) +{ + std::unordered_map result; + if (json.empty()) { + return result; + } + std::string cleaned; + for (char c : json) { + if (c != '{' && c != '}' && c != ' ') { + cleaned += c; + } + } + std::istringstream ss(cleaned); + std::string pair; + while (std::getline(ss, pair, ',')) { + auto colon_pos = pair.find(':'); + if (colon_pos != std::string::npos) { + auto key = static_cast(std::stoi(pair.substr(0, colon_pos))); + auto value = static_cast(std::stoull(pair.substr(colon_pos + 1))); + result[key] = value; + } + } + return result; +} + +static std::unordered_map parse_tree_uint32_map(const std::string& json) +{ + std::unordered_map result; + if (json.empty()) { + return result; + } + auto u64_map = parse_tree_uint64_map(json); + for (const auto& [k, v] : u64_map) { + result[k] = static_cast(v); + } + return result; +} + +static std::unordered_map parse_tree_index_map(const std::string& json) +{ + std::unordered_map result; + if (json.empty()) { + return result; + } + auto u64_map = parse_tree_uint64_map(json); + for (const auto& [k, v] : u64_map) { + result[k] = static_cast(v); + } + return result; +} + +// --------------------------------------------------------------------------- +// Parse prefilled public data from JSON: [["slot_hex","value_hex"],...] +// Each hex string is a 64-char (32-byte) hex-encoded field element. +// --------------------------------------------------------------------------- + +static fr hex_to_fr(const std::string& hex) +{ + std::string cleaned = hex; + if (cleaned.size() >= 2 && cleaned[0] == '0' && (cleaned[1] == 'x' || cleaned[1] == 'X')) { + cleaned = cleaned.substr(2); + } + return fr(cleaned); +} + +static std::vector parse_prefilled_public_data(const std::string& json) +{ + std::vector result; + if (json.empty() || json == "[]") { + return result; + } + + // Simple state-machine parser for [["hex","hex"],["hex","hex"],...] + std::vector hex_values; + std::string current; + bool in_string = false; + + for (char c : json) { + if (c == '"') { + in_string = !in_string; + } else if (in_string) { + current += c; + } else if ((c == ',' || c == ']') && !current.empty()) { + hex_values.push_back(std::move(current)); + current.clear(); + } + } + + // hex_values should have pairs: slot, value, slot, value, ... + if (hex_values.size() % 2 != 0) { + std::cerr << "Warning: odd number of hex values in prefilled public data, ignoring last" << '\n'; + } + for (size_t i = 0; i + 1 < hex_values.size(); i += 2) { + result.emplace_back(hex_to_fr(hex_values[i]), hex_to_fr(hex_values[i + 1])); + } + return result; +} + +// --------------------------------------------------------------------------- +// IPC server execution +// --------------------------------------------------------------------------- + +int execute_wsdb_server(const std::string& input_path, + const std::string& data_dir, + const std::string& tree_heights_json, + const std::string& tree_prefill_json, + const std::string& map_sizes_json, + uint32_t threads, + uint32_t initial_header_generator_point, + const std::string& prefilled_public_data_json, + uint64_t genesis_timestamp, + size_t request_ring_size, + size_t response_ring_size) +{ + const uint64_t DEFAULT_MAP_SIZE = 1024UL * 1024; + + // Parse config + auto tree_height = parse_tree_uint32_map(tree_heights_json); + auto tree_prefill = parse_tree_index_map(tree_prefill_json); + + std::unordered_map map_size{ + { MerkleTreeId::ARCHIVE, DEFAULT_MAP_SIZE }, + { MerkleTreeId::NULLIFIER_TREE, DEFAULT_MAP_SIZE }, + { MerkleTreeId::NOTE_HASH_TREE, DEFAULT_MAP_SIZE }, + { MerkleTreeId::PUBLIC_DATA_TREE, DEFAULT_MAP_SIZE }, + { MerkleTreeId::L1_TO_L2_MESSAGE_TREE, DEFAULT_MAP_SIZE }, + }; + if (!map_sizes_json.empty()) { + auto parsed = parse_tree_uint64_map(map_sizes_json); + for (const auto& [k, v] : parsed) { + map_size[k] = v; + } + } + + // Parse prefilled public data: JSON array of ["slot_hex","value_hex"] pairs + std::vector prefilled_public_data; + if (!prefilled_public_data_json.empty()) { + prefilled_public_data = parse_prefilled_public_data(prefilled_public_data_json); + std::cerr << "Parsed " << prefilled_public_data.size() << " prefilled public data entries" << '\n'; + } + + // Create WorldState + std::cerr << "Creating WorldState at " << data_dir << " with " << threads << " threads" << '\n'; + auto ws = std::make_unique(threads, + data_dir, + map_size, + tree_height, + tree_prefill, + prefilled_public_data, + initial_header_generator_point, + genesis_timestamp); + + WsdbRequest request{ .world_state = *ws }; + + // Create IPC server based on path suffix + std::unique_ptr server; + + if (input_path.size() >= 4 && input_path.substr(input_path.size() - 4) == ".shm") { + std::string base_name = input_path.substr(0, input_path.size() - 4); + constexpr size_t MAX_SHM_CLIENTS = 2; // TS backend (client 0) + AVM binary (client 1) + server = ipc::IpcServer::create_mpsc_shm(base_name, MAX_SHM_CLIENTS, request_ring_size, response_ring_size); + std::cerr << "MPSC shared memory server at " << base_name << " (max " << MAX_SHM_CLIENTS << " clients)\n"; + } else if (input_path.size() >= 5 && input_path.substr(input_path.size() - 5) == ".sock") { + server = ipc::IpcServer::create_socket(input_path, 1); + std::cerr << "Socket server at " << input_path << '\n'; + } else { + std::cerr << "Error: --input path must end with .sock or .shm" << '\n'; + return 1; + } + + // Set up signal handlers + static ipc::IpcServer* global_server = server.get(); + + auto graceful_shutdown_handler = [](int signal) { + std::cerr << "\nReceived signal " << signal << ", shutting down gracefully..." << '\n'; + if (global_server) { + global_server->request_shutdown(); + } + }; + + auto fatal_error_handler = [](int signal) { + const char* signal_name = (signal == SIGBUS) ? "SIGBUS" : (signal == SIGSEGV) ? "SIGSEGV" : "UNKNOWN"; + std::cerr << "\nFatal error: received " << signal_name << '\n'; + if (global_server) { + global_server->close(); + } + std::exit(1); + }; + + (void)std::signal(SIGTERM, graceful_shutdown_handler); + (void)std::signal(SIGINT, graceful_shutdown_handler); + (void)std::signal(SIGBUS, fatal_error_handler); + (void)std::signal(SIGSEGV, fatal_error_handler); + + setup_parent_death_monitoring(); + + if (!server->listen()) { + std::cerr << "Error: Could not start IPC server" << '\n'; + return 1; + } + + std::cerr << "aztec-wsdb IPC server ready" << '\n'; + + // Run server with wsdb command handler + server->run([&request](int client_id, std::span raw_request) -> std::vector { + try { + // Deserialize msgpack command + // Format: [["CommandName", {payload}]] - a 1-element tuple containing the NamedUnion + auto unpacked = msgpack::unpack(reinterpret_cast(raw_request.data()), raw_request.size()); + auto obj = unpacked.get(); + + // Expect array of size 1 (tuple wrapping) + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) + if (obj.type != msgpack::type::ARRAY || obj.via.array.size != 1) { + std::cerr << "Error: Expected array of size 1 from client " << client_id << '\n'; + return {}; + } + + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) + auto& command_obj = obj.via.array.ptr[0]; + + // Check for shutdown before converting + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) + if (command_obj.type == msgpack::type::ARRAY && command_obj.via.array.size == 2 && + command_obj.via.array.ptr[0].type == msgpack::type::STR) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) + std::string_view command_name(command_obj.via.array.ptr[0].via.str.ptr, + command_obj.via.array.ptr[0].via.str.size); + bool is_shutdown = (command_name == "WsdbShutdown"); + + // Convert and execute + WsdbCommand command; + command_obj.convert(command); + auto response = wsdb(request, std::move(command)); + + // Serialize response + msgpack::sbuffer response_buffer; + msgpack::pack(response_buffer, response); + std::vector result(response_buffer.data(), response_buffer.data() + response_buffer.size()); + + if (is_shutdown) { + throw ipc::ShutdownRequested(std::move(result)); + } + + return result; + } + + // Fallback: try converting directly + WsdbCommand command; + command_obj.convert(command); + auto response = wsdb(request, std::move(command)); + + msgpack::sbuffer response_buffer; + msgpack::pack(response_buffer, response); + return std::vector(response_buffer.data(), response_buffer.data() + response_buffer.size()); + + } catch (const ipc::ShutdownRequested&) { + throw; + } catch (const std::exception& e) { + std::cerr << "Error processing request from client " << client_id << ": " << e.what() << '\n'; + std::cerr.flush(); + + WsdbErrorResponse error_response{ .message = std::string(e.what()) }; + WsdbCommandResponse response = error_response; + + msgpack::sbuffer response_buffer; + msgpack::pack(response_buffer, response); + return std::vector(response_buffer.data(), response_buffer.data() + response_buffer.size()); + } + }); + + server->close(); + return 0; +} + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_server.hpp b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_server.hpp new file mode 100644 index 000000000000..afb03cba3512 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb/wsdb_ipc_server.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include + +namespace bb::wsdb { + +/** + * @brief Start the aztec-wsdb IPC server. + * + * Creates a WorldState instance and runs the IPC server loop, dispatching + * incoming msgpack commands via the WsdbCommand NamedUnion. + */ +int execute_wsdb_server(const std::string& input_path, + const std::string& data_dir, + const std::string& tree_heights_json, + const std::string& tree_prefill_json, + const std::string& map_sizes_json, + uint32_t threads, + uint32_t initial_header_generator_point, + const std::string& prefilled_public_data_json, + uint64_t genesis_timestamp, + size_t request_ring_size, + size_t response_ring_size); + +} // namespace bb::wsdb diff --git a/barretenberg/cpp/src/barretenberg/wsdb_client/CMakeLists.txt b/barretenberg/cpp/src/barretenberg/wsdb_client/CMakeLists.txt new file mode 100644 index 000000000000..f7647019cce0 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb_client/CMakeLists.txt @@ -0,0 +1,19 @@ +if(NOT(FUZZING) AND NOT(WASM)) + # WSDB IPC client library - bridges callers to aztec-wsdb over IPC. + # Implements LowLevelMerkleDBInterface so the AVM simulator can talk to a + # standalone aztec-wsdb process instead of an in-process WorldState. + add_library( + wsdb_client + STATIC + wsdb_ipc_merkle_db.cpp + ) + target_link_libraries( + wsdb_client + PUBLIC + barretenberg + wsdb_ipc_client + ipc + vm2_sim + ) + set_target_properties(wsdb_client PROPERTIES POSITION_INDEPENDENT_CODE ON) +endif() diff --git a/barretenberg/cpp/src/barretenberg/wsdb_client/wsdb_ipc_merkle_db.cpp b/barretenberg/cpp/src/barretenberg/wsdb_client/wsdb_ipc_merkle_db.cpp new file mode 100644 index 000000000000..a6c5c7df4fd5 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb_client/wsdb_ipc_merkle_db.cpp @@ -0,0 +1,231 @@ +#include "barretenberg/wsdb_client/wsdb_ipc_merkle_db.hpp" +#include "barretenberg/aztec/aztec_constants.hpp" +#include "barretenberg/common/log.hpp" +#include "barretenberg/serialize/msgpack.hpp" +#include "barretenberg/serialize/msgpack_impl.hpp" +#include "barretenberg/wsdb/wsdb_commands.hpp" + +namespace bb::wsdb_client { + +// Use avm2::simulation for interface types, but NOT world_state (it transitively +// imports crypto::merkle_tree which conflicts with avm2::simulation aliases). +using namespace avm2::simulation; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +template std::vector WsdbIpcMerkleDB::serialize_to_msgpack(const T& value) +{ + msgpack::sbuffer buf; + msgpack::pack(buf, value); + return std::vector(buf.data(), buf.data() + buf.size()); +} + +template T WsdbIpcMerkleDB::deserialize_from_msgpack(const std::vector& bytes) +{ + auto unpacked = msgpack::unpack(reinterpret_cast(bytes.data()), bytes.size()); + T value; + unpacked.get().convert(value); + return value; +} + +// --------------------------------------------------------------------------- +// Constructor +// --------------------------------------------------------------------------- + +WsdbIpcMerkleDB::WsdbIpcMerkleDB(wsdb::WsdbIpcClient& client, world_state::WorldStateRevision revision) + : client_(client) + , revision_(revision) +{} + +// --------------------------------------------------------------------------- +// Tree roots +// --------------------------------------------------------------------------- + +avm2::TreeSnapshots WsdbIpcMerkleDB::get_tree_roots() const +{ + if (cached_tree_roots_.has_value()) { + return cached_tree_roots_.value(); + } + + auto l1_info = client_.get_tree_info( + wsdb::WsdbGetTreeInfo{ .treeId = MerkleTreeId::L1_TO_L2_MESSAGE_TREE, .revision = revision_ }); + auto nh_info = + client_.get_tree_info(wsdb::WsdbGetTreeInfo{ .treeId = MerkleTreeId::NOTE_HASH_TREE, .revision = revision_ }); + auto null_info = + client_.get_tree_info(wsdb::WsdbGetTreeInfo{ .treeId = MerkleTreeId::NULLIFIER_TREE, .revision = revision_ }); + auto pd_info = + client_.get_tree_info(wsdb::WsdbGetTreeInfo{ .treeId = MerkleTreeId::PUBLIC_DATA_TREE, .revision = revision_ }); + + avm2::TreeSnapshots snapshots{ + .l1_to_l2_message_tree = + avm2::AppendOnlyTreeSnapshot{ .root = l1_info.root, .next_available_leaf_index = l1_info.size }, + .note_hash_tree = + avm2::AppendOnlyTreeSnapshot{ .root = nh_info.root, .next_available_leaf_index = nh_info.size }, + .nullifier_tree = + avm2::AppendOnlyTreeSnapshot{ .root = null_info.root, .next_available_leaf_index = null_info.size }, + .public_data_tree = + avm2::AppendOnlyTreeSnapshot{ .root = pd_info.root, .next_available_leaf_index = pd_info.size }, + }; + cached_tree_roots_ = snapshots; + return snapshots; +} + +void WsdbIpcMerkleDB::invalidate_tree_roots_cache() +{ + cached_tree_roots_ = std::nullopt; +} + +// --------------------------------------------------------------------------- +// Query methods +// --------------------------------------------------------------------------- + +SiblingPath WsdbIpcMerkleDB::get_sibling_path(MerkleTreeId tree_id, index_t leaf_index) const +{ + auto resp = client_.get_sibling_path( + wsdb::WsdbGetSiblingPath{ .treeId = tree_id, .revision = revision_, .leafIndex = leaf_index }); + return resp.path; +} + +crypto::merkle_tree::GetLowIndexedLeafResponse WsdbIpcMerkleDB::get_low_indexed_leaf(MerkleTreeId tree_id, + const avm2::FF& value) const +{ + auto resp = client_.find_low_leaf(wsdb::WsdbFindLowLeaf{ .treeId = tree_id, .revision = revision_, .key = value }); + return GetLowIndexedLeafResponse(resp.alreadyPresent, resp.index); +} + +avm2::FF WsdbIpcMerkleDB::get_leaf_value(MerkleTreeId tree_id, index_t leaf_index) const +{ + auto resp = client_.get_leaf_value( + wsdb::WsdbGetLeafValue{ .treeId = tree_id, .revision = revision_, .leafIndex = leaf_index }); + if (!resp.value.has_value()) { + throw std::runtime_error("Invalid get_leaf_value request for tree " + + std::to_string(static_cast(tree_id)) + " index " + + std::to_string(leaf_index)); + } + return deserialize_from_msgpack(resp.value.value()); +} + +IndexedLeaf WsdbIpcMerkleDB::get_leaf_preimage_public_data_tree(index_t leaf_index) const +{ + auto resp = client_.get_leaf_preimage(wsdb::WsdbGetLeafPreimage{ + .treeId = MerkleTreeId::PUBLIC_DATA_TREE, .revision = revision_, .leafIndex = leaf_index }); + if (!resp.preimage.has_value()) { + throw std::runtime_error("Invalid get_leaf_preimage_public_data_tree request for index " + + std::to_string(leaf_index)); + } + return deserialize_from_msgpack>(resp.preimage.value()); +} + +IndexedLeaf WsdbIpcMerkleDB::get_leaf_preimage_nullifier_tree(index_t leaf_index) const +{ + auto resp = client_.get_leaf_preimage(wsdb::WsdbGetLeafPreimage{ + .treeId = MerkleTreeId::NULLIFIER_TREE, .revision = revision_, .leafIndex = leaf_index }); + if (!resp.preimage.has_value()) { + throw std::runtime_error("Invalid get_leaf_preimage_nullifier_tree request for index " + + std::to_string(leaf_index)); + } + return deserialize_from_msgpack>(resp.preimage.value()); +} + +// --------------------------------------------------------------------------- +// State modification methods +// --------------------------------------------------------------------------- + +SequentialInsertionResult WsdbIpcMerkleDB::insert_indexed_leaves_public_data_tree( + const PublicDataLeafValue& leaf_value) +{ + std::vector> serialized_leaves = { serialize_to_msgpack(leaf_value) }; + auto resp = client_.sequential_insert(wsdb::WsdbSequentialInsert{ + .treeId = MerkleTreeId::PUBLIC_DATA_TREE, .leaves = std::move(serialized_leaves), .forkId = revision_.forkId }); + invalidate_tree_roots_cache(); + return deserialize_from_msgpack>(resp.result); +} + +SequentialInsertionResult WsdbIpcMerkleDB::insert_indexed_leaves_nullifier_tree( + const NullifierLeafValue& leaf_value) +{ + std::vector> serialized_leaves = { serialize_to_msgpack(leaf_value) }; + auto resp = client_.sequential_insert(wsdb::WsdbSequentialInsert{ + .treeId = MerkleTreeId::NULLIFIER_TREE, .leaves = std::move(serialized_leaves), .forkId = revision_.forkId }); + invalidate_tree_roots_cache(); + return deserialize_from_msgpack>(resp.result); +} + +void WsdbIpcMerkleDB::append_leaves(MerkleTreeId tree_id, std::span leaves) +{ + std::vector> serialized_leaves; + serialized_leaves.reserve(leaves.size()); + for (const auto& leaf : leaves) { + serialized_leaves.push_back(serialize_to_msgpack(leaf)); + } + client_.append_leaves(wsdb::WsdbAppendLeaves{ + .treeId = tree_id, .leaves = std::move(serialized_leaves), .forkId = revision_.forkId }); + invalidate_tree_roots_cache(); +} + +void WsdbIpcMerkleDB::pad_tree(MerkleTreeId tree_id, size_t num_leaves) +{ + switch (tree_id) { + case MerkleTreeId::NULLIFIER_TREE: { + std::vector> padding_leaves; + padding_leaves.reserve(num_leaves); + auto empty_leaf = NullifierLeafValue::empty(); + for (size_t i = 0; i < num_leaves; i++) { + padding_leaves.push_back(serialize_to_msgpack(empty_leaf)); + } + client_.batch_insert(wsdb::WsdbBatchInsert{ .treeId = MerkleTreeId::NULLIFIER_TREE, + .leaves = std::move(padding_leaves), + .subtreeDepth = NULLIFIER_SUBTREE_HEIGHT, + .forkId = revision_.forkId }); + break; + } + case MerkleTreeId::NOTE_HASH_TREE: { + std::vector> padding_leaves; + padding_leaves.reserve(num_leaves); + auto zero = avm2::FF(0); + for (size_t i = 0; i < num_leaves; i++) { + padding_leaves.push_back(serialize_to_msgpack(zero)); + } + client_.append_leaves(wsdb::WsdbAppendLeaves{ + .treeId = MerkleTreeId::NOTE_HASH_TREE, .leaves = std::move(padding_leaves), .forkId = revision_.forkId }); + break; + } + default: + throw std::runtime_error("Padding not supported for tree " + std::to_string(static_cast(tree_id))); + } + invalidate_tree_roots_cache(); +} + +// --------------------------------------------------------------------------- +// Checkpoint methods +// --------------------------------------------------------------------------- + +void WsdbIpcMerkleDB::create_checkpoint() +{ + client_.create_checkpoint(wsdb::WsdbCreateCheckpoint{ .forkId = revision_.forkId }); + uint32_t current_id = checkpoint_stack_.top(); + checkpoint_stack_.push(current_id + 1); +} + +void WsdbIpcMerkleDB::commit_checkpoint() +{ + client_.commit_checkpoint(wsdb::WsdbCommitCheckpoint{ .forkId = revision_.forkId }); + invalidate_tree_roots_cache(); + checkpoint_stack_.pop(); +} + +void WsdbIpcMerkleDB::revert_checkpoint() +{ + client_.revert_checkpoint(wsdb::WsdbRevertCheckpoint{ .forkId = revision_.forkId }); + invalidate_tree_roots_cache(); + checkpoint_stack_.pop(); +} + +uint32_t WsdbIpcMerkleDB::get_checkpoint_id() const +{ + return checkpoint_stack_.top(); +} + +} // namespace bb::wsdb_client diff --git a/barretenberg/cpp/src/barretenberg/wsdb_client/wsdb_ipc_merkle_db.hpp b/barretenberg/cpp/src/barretenberg/wsdb_client/wsdb_ipc_merkle_db.hpp new file mode 100644 index 000000000000..becfbf4d5b75 --- /dev/null +++ b/barretenberg/cpp/src/barretenberg/wsdb_client/wsdb_ipc_merkle_db.hpp @@ -0,0 +1,72 @@ +#pragma once +/** + * @file wsdb_ipc_merkle_db.hpp + * @brief LowLevelMerkleDBInterface implementation backed by WSDB IPC. + * + * Connects to an aztec-wsdb process over Unix Domain Socket and translates + * each LowLevelMerkleDBInterface call into the corresponding WSDB IPC command. + */ + +#include "barretenberg/vm2/simulation/interfaces/db.hpp" +#include "barretenberg/world_state/types.hpp" +#include "barretenberg/wsdb/wsdb_commands.hpp" +#include "barretenberg/wsdb/wsdb_execute.hpp" +#include "barretenberg/wsdb/wsdb_ipc_client_generated.hpp" + +#include +#include + +namespace bb::wsdb_client { + +class WsdbIpcMerkleDB final : public avm2::simulation::LowLevelMerkleDBInterface { + public: + /** + * @brief Construct from a connected WSDB IPC client and world state revision. + * @param client Reference to a connected WsdbIpcClient. + * @param revision The world state revision (includes forkId) to use for queries. + */ + WsdbIpcMerkleDB(wsdb::WsdbIpcClient& client, world_state::WorldStateRevision revision); + + avm2::TreeSnapshots get_tree_roots() const override; + + // Query methods + avm2::simulation::SiblingPath get_sibling_path(avm2::simulation::MerkleTreeId tree_id, + avm2::simulation::index_t leaf_index) const override; + crypto::merkle_tree::GetLowIndexedLeafResponse get_low_indexed_leaf(avm2::simulation::MerkleTreeId tree_id, + const avm2::FF& value) const override; + avm2::FF get_leaf_value(avm2::simulation::MerkleTreeId tree_id, + avm2::simulation::index_t leaf_index) const override; + avm2::simulation::IndexedLeaf get_leaf_preimage_public_data_tree( + avm2::simulation::index_t leaf_index) const override; + avm2::simulation::IndexedLeaf get_leaf_preimage_nullifier_tree( + avm2::simulation::index_t leaf_index) const override; + + // State modification methods + avm2::simulation::SequentialInsertionResult + insert_indexed_leaves_public_data_tree(const avm2::simulation::PublicDataLeafValue& leaf_value) override; + avm2::simulation::SequentialInsertionResult + insert_indexed_leaves_nullifier_tree(const avm2::simulation::NullifierLeafValue& leaf_value) override; + void append_leaves(avm2::simulation::MerkleTreeId tree_id, std::span leaves) override; + void pad_tree(avm2::simulation::MerkleTreeId tree_id, size_t num_leaves) override; + + // Checkpoint methods + void create_checkpoint() override; + void commit_checkpoint() override; + void revert_checkpoint() override; + uint32_t get_checkpoint_id() const override; + + private: + template static std::vector serialize_to_msgpack(const T& value); + template static T deserialize_from_msgpack(const std::vector& bytes); + + /** Invalidate the cached tree roots (call after any write operation). */ + void invalidate_tree_roots_cache(); + + wsdb::WsdbIpcClient& client_; + world_state::WorldStateRevision revision_; + std::stack checkpoint_stack_{ { 0 } }; + /** Cached tree roots — avoids 5 IPC round trips per get_tree_roots() call. */ + mutable std::optional cached_tree_roots_; +}; + +} // namespace bb::wsdb_client diff --git a/barretenberg/ts/.gitignore b/barretenberg/ts/.gitignore index cc254d3c8714..c4cb49ce017e 100644 --- a/barretenberg/ts/.gitignore +++ b/barretenberg/ts/.gitignore @@ -12,3 +12,4 @@ package # Generated files src/cbind/generated/ +src/aztec-wsdb/generated/ diff --git a/barretenberg/ts/package.json b/barretenberg/ts/package.json index d44208ca1147..c6118f0d423e 100644 --- a/barretenberg/ts/package.json +++ b/barretenberg/ts/package.json @@ -11,6 +11,12 @@ "require": "./dest/node-cjs/index.js", "browser": "./dest/browser/index.js", "default": "./dest/node/index.js" + }, + "./aztec-wsdb": { + "default": "./dest/node/aztec-wsdb/index.js" + }, + "./platform": { + "default": "./dest/node/bb_backends/node/platform.js" } }, "bin": { @@ -23,14 +29,14 @@ "README.md" ], "scripts": { - "clean": "rm -rf ./dest .tsbuildinfo .tsbuildinfo.cjs ./src/cbind/generated", + "clean": "rm -rf ./dest .tsbuildinfo .tsbuildinfo.cjs ./src/cbind/generated ./src/aztec-wsdb/generated", "build": "yarn clean && yarn generate && yarn build:wasm && yarn build:native && yarn build:esm && yarn build:cjs && yarn build:browser", "build:wasm": "./scripts/copy_wasm.sh", "build:native": "./scripts/copy_native.sh", "build:esm": "tsgo -b tsconfig.esm.json && chmod +x ./dest/node/bin/index.js", "build:cjs": "tsgo -b tsconfig.cjs.json && ./scripts/cjs_postprocess.sh", "build:browser": "tsgo -b tsconfig.browser.json && ./scripts/browser_postprocess.sh", - "generate": "NODE_OPTIONS='--loader ts-node/esm' NODE_NO_WARNINGS=1 ts-node src/cbind/generate.ts", + "generate": "NODE_OPTIONS='--loader ts-node/esm' NODE_NO_WARNINGS=1 ts-node src/cbind/generate.ts && npx tsx src/aztec-wsdb/generate.ts", "formatting": "prettier --check ./src && eslint --max-warnings 0 ./src", "formatting:fix": "prettier -w ./src", "test": "NODE_OPTIONS='--loader ts-node/esm' NODE_NO_WARNINGS=1 node --experimental-vm-modules $(yarn bin jest) --no-cache --passWithNoTests", diff --git a/barretenberg/ts/scripts/copy_native.sh b/barretenberg/ts/scripts/copy_native.sh index 3fdd7aa2f10e..07ed0065f37f 100755 --- a/barretenberg/ts/scripts/copy_native.sh +++ b/barretenberg/ts/scripts/copy_native.sh @@ -9,10 +9,11 @@ cd $(dirname $0)/.. target="$(arch)-$(os)" if [ "${BUILD_CPP:-0}" -eq 1 ]; then - ../cpp/bootstrap.sh build_preset clang20 --target bb --target nodejs_module + ../cpp/bootstrap.sh build_preset clang20 --target bb --target nodejs_module --target aztec-wsdb fi mkdir -p ./build/$target cp ../cpp/build/bin/bb ./build/$target +cp ../cpp/build/bin/aztec-wsdb ./build/$target cp ../cpp/build/lib/nodejs_module.node ./build/$target diff --git a/barretenberg/ts/src/aztec-wsdb/generate.ts b/barretenberg/ts/src/aztec-wsdb/generate.ts new file mode 100644 index 000000000000..510a0179eb54 --- /dev/null +++ b/barretenberg/ts/src/aztec-wsdb/generate.ts @@ -0,0 +1,89 @@ +/** + * Code generation for aztec-wsdb TypeScript bindings. + * + * Uses the same codegen pipeline as bb.js but targets the aztec-wsdb binary schema. + * Run: npx tsx src/aztec-wsdb/generate.ts + */ + +import { writeFileSync, mkdirSync } from 'fs'; +import { dirname, join } from 'path'; +import { exec } from 'child_process'; +import { promisify } from 'util'; +import { fileURLToPath } from 'url'; +import { SchemaVisitor } from '../cbind/schema_visitor.js'; +import { TypeScriptCodegen } from '../cbind/typescript_codegen.js'; +import { CppCodegen } from '../cbind/cpp_codegen.js'; + +const execAsync = promisify(exec); + +// @ts-ignore +const __dirname = dirname(fileURLToPath(import.meta.url)); + +async function generate() { + const wsdbBuildPath = process.env.WSDB_BINARY_PATH || join(__dirname, '../../../cpp/build/bin/aztec-wsdb'); + + // Get schema from aztec-wsdb + console.log('Fetching msgpack schema from aztec-wsdb...'); + const { stdout } = await execAsync(`${wsdbBuildPath} msgpack schema`); + const schema = JSON.parse(stdout.trim()); + + if (!schema.commands || !schema.responses) { + throw new Error('Invalid schema: missing commands or responses'); + } + + // Compile schema using the shared visitor + console.log('Compiling schema...'); + const visitor = new SchemaVisitor(); + const compiled = visitor.visit(schema.commands, schema.responses); + + console.log(`Found ${compiled.commands.length} commands, ${compiled.structs.size} structs\n`); + + // Generate TypeScript bindings + const tsGen = new TypeScriptCodegen(); + + // Generate C++ IPC client + const cppGen = new CppCodegen({ + namespace: 'bb::wsdb', + prefix: 'Wsdb', + executeHeader: 'barretenberg/wsdb/wsdb_execute.hpp', + commandsHeader: 'barretenberg/wsdb/wsdb_commands.hpp', + }); + + const files = [ + { path: 'generated/api_types.ts', content: tsGen.generateTypes(compiled) }, + { path: 'generated/async.ts', content: tsGen.generateAsyncApi(compiled) }, + { path: '../../../cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.hpp', content: cppGen.generateHeader(compiled) }, + { path: '../../../cpp/src/barretenberg/wsdb/wsdb_ipc_client_generated.cpp', content: cppGen.generateImpl(compiled) }, + ]; + + // Ensure output directory exists + const outputDir = join(__dirname, 'generated'); + mkdirSync(outputDir, { recursive: true }); + + const cppFiles: string[] = []; + for (const file of files) { + const outputPath = join(__dirname, file.path); + mkdirSync(dirname(outputPath), { recursive: true }); + writeFileSync(outputPath, file.content); + console.log(` ${outputPath}`); + if (file.path.endsWith('.hpp') || file.path.endsWith('.cpp')) { + cppFiles.push(outputPath); + } + } + + // Run clang-format on generated C++ files + if (cppFiles.length > 0) { + try { + await execAsync(`clang-format-20 -i ${cppFiles.join(' ')}`); + } catch { + // clang-format-20 may not be available in all environments + } + } + + console.log('\nWsdb codegen complete.'); +} + +generate().catch(error => { + console.error('Generation failed:', error); + process.exit(1); +}); diff --git a/barretenberg/ts/src/aztec-wsdb/index.ts b/barretenberg/ts/src/aztec-wsdb/index.ts new file mode 100644 index 000000000000..6260156cdeb8 --- /dev/null +++ b/barretenberg/ts/src/aztec-wsdb/index.ts @@ -0,0 +1,449 @@ +/** + * aztec-wsdb TypeScript client. + * + * Spawns the aztec-wsdb binary and communicates via Unix Domain Socket or + * shared memory IPC. Implements IMsgpackBackendAsync so it can be used with + * the generated WsdbAsyncApi. + */ + +import { spawn, ChildProcess } from 'child_process'; +import { createRequire } from 'module'; +import * as net from 'net'; +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { IMsgpackBackendAsync } from '../bb_backends/interface.js'; +import { findNapiBinary, findPackageRoot } from '../bb_backends/node/platform.js'; +import { threadId } from 'worker_threads'; + +let instanceCounter = 0; + +export interface WsdbOptions { + /** Path to the aztec-wsdb binary */ + binaryPath: string; + /** Data directory for LMDB stores */ + dataDir: string; + /** Tree heights map: { treeId: height } */ + treeHeights?: Record; + /** Tree prefill sizes: { treeId: size } */ + treePrefill?: Record; + /** LMDB map sizes in KB: { treeId: sizeKb } */ + mapSizes?: Record; + /** Thread pool size */ + threads?: number; + /** Initial header generator point */ + initialHeaderGeneratorPoint?: number; + /** Prefilled public data as array of [slotBuffer, valueBuffer] pairs */ + prefilledPublicData?: Array<[Buffer, Buffer]>; + /** Genesis block timestamp (must match TS-side buildInitialHeader) */ + genesisTimestamp?: number; + /** Optional logger function */ + logger?: (msg: string) => void; + /** Use shared memory instead of UDS for IPC (lower latency). */ + useShm?: boolean; + /** Path to NAPI binary (required when useShm=true, auto-detected if omitted). */ + napiPath?: string; +} + +/** + * Formats a Record as a CLI-friendly JSON string: {0:1024,1:2048,...} + */ +function formatMap(map: Record | undefined): string | undefined { + if (!map || Object.keys(map).length === 0) { + return undefined; + } + const entries = Object.entries(map).map(([k, v]) => `${k}:${v}`); + return `{${entries.join(',')}}`; +} + +/** Build CLI args common to both UDS and SHM modes. */ +function buildWsdbArgs(inputPath: string, options: WsdbOptions, threads: number): string[] { + const args = [ + 'msgpack', + 'run', + '--input', + inputPath, + '--data-dir', + options.dataDir, + '--threads', + threads.toString(), + ]; + + if (options.initialHeaderGeneratorPoint !== undefined) { + args.push('--initial-header-generator-point', options.initialHeaderGeneratorPoint.toString()); + } + + const treeHeightsStr = formatMap(options.treeHeights); + if (treeHeightsStr) { + args.push('--tree-heights', treeHeightsStr); + } + + const treePrefillStr = formatMap(options.treePrefill); + if (treePrefillStr) { + args.push('--tree-prefill', treePrefillStr); + } + + const mapSizesStr = formatMap(options.mapSizes); + if (mapSizesStr) { + args.push('--map-sizes', mapSizesStr); + } + + if (options.prefilledPublicData && options.prefilledPublicData.length > 0) { + const pairs = options.prefilledPublicData.map(([slot, value]) => [slot.toString('hex'), value.toString('hex')]); + args.push('--prefilled-public-data', JSON.stringify(pairs)); + } + + if (options.genesisTimestamp !== undefined && options.genesisTimestamp !== 0) { + args.push('--genesis-timestamp', options.genesisTimestamp.toString()); + } + + return args; +} + +export { AsyncApi } from './generated/async.js'; +export * from './generated/api_types.js'; + +/** + * IPC backend that communicates with the aztec-wsdb binary. + * Supports both Unix Domain Socket and shared memory transports. + */ +export class WsdbBackend implements IMsgpackBackendAsync { + private process: ChildProcess; + /** For UDS mode */ + private socket: net.Socket | null = null; + /** For SHM mode */ + private shmClient: any = null; + private inputPath: string; + private useShm: boolean; + private connectionPromise: Promise; + private connectionTimeout: NodeJS.Timeout | null = null; + /** Resolves when the child process exits (for clean destroy). */ + private processExitPromise: Promise; + + private pendingCallbacks: Array<{ + resolve: (data: Uint8Array) => void; + reject: (error: Error) => void; + }> = []; + + // State machine for reading UDS responses + private readingLength: boolean = true; + private lengthBuffer: Buffer = Buffer.alloc(4); + private lengthBytesRead: number = 0; + private responseLength: number = 0; + private responseBuffer: Buffer | null = null; + private responseBytesRead: number = 0; + + constructor(options: WsdbOptions) { + this.useShm = options.useShm ?? false; + const instanceId = `wsdb-${process.pid}-${threadId}-${instanceCounter++}`; + + if (this.useShm) { + // SHM mode: use shared memory name (no path, just a name for /dev/shm/) + this.inputPath = `${instanceId}.shm`; + } else { + // UDS mode: use socket file in tmpdir + this.inputPath = path.join(os.tmpdir(), `${instanceId}.sock`); + if (fs.existsSync(this.inputPath)) { + fs.unlinkSync(this.inputPath); + } + } + + let connectionResolve: (() => void) | null = null; + let connectionReject: ((error: Error) => void) | null = null; + + this.connectionPromise = new Promise((resolve, reject) => { + connectionResolve = resolve; + connectionReject = reject; + }); + + const threads = options.threads ?? Math.min(16, os.cpus().length); + const env = { ...process.env, HARDWARE_CONCURRENCY: threads.toString() }; + + const args = buildWsdbArgs(this.inputPath, options, threads); + + // SHM mode needs larger ring buffers for pipelining + if (this.useShm) { + args.push('--request-ring-size', `${1024 * 1024 * 4}`); + args.push('--response-ring-size', `${1024 * 1024 * 4}`); + } + + this.process = spawn(options.binaryPath, args, { + stdio: ['ignore', options.logger ? 'pipe' : 'ignore', options.logger ? 'pipe' : 'ignore'], + env, + }); + + if (options.logger) { + const logger = options.logger; + if (this.process.stdout) { + this.process.stdout.on('data', (data: Buffer) => logger(`[wsdb stdout] ${data.toString().trimEnd()}`)); + } + if (this.process.stderr) { + this.process.stderr.on('data', (data: Buffer) => logger(`[wsdb stderr] ${data.toString().trimEnd()}`)); + } + } + + this.process.on('error', (err: Error) => { + for (const cb of this.pendingCallbacks) { + cb.reject(new Error(`aztec-wsdb process error: ${err.message}`)); + } + this.pendingCallbacks = []; + connectionReject?.(err); + }); + + this.processExitPromise = new Promise(resolve => { + this.process.on('exit', (code: number | null) => { + const error = new Error(`aztec-wsdb process exited with code ${code}`); + for (const cb of this.pendingCallbacks) { + cb.reject(error); + } + this.pendingCallbacks = []; + resolve(); + }); + }); + + if (this.useShm) { + this.connectShm(connectionResolve!, connectionReject!, options.napiPath); + } else { + this.connectUdsPoll(connectionResolve!, connectionReject!); + } + } + + /** Returns the IPC path for the running wsdb server (for other IPC clients to connect). */ + getSocketPath(): string { + return this.inputPath; + } + + /** Wait until the backend is connected and ready to accept commands. */ + waitUntilReady(): Promise { + return this.connectionPromise; + } + + // ——— SHM connection ——— + + private connectShm( + resolve: () => void, + reject: (error: Error) => void, + napiPath?: string, + ) { + const shmName = this.inputPath.replace(/\.shm$/, ''); + const addonPath = findNapiBinary(napiPath); + if (!addonPath) { + reject(new Error('NAPI binary not found — required for shared memory mode')); + return; + } + + let addon: any; + try { + const require = createRequire(findPackageRoot()!); + addon = require(addonPath); + } catch (err: any) { + reject(new Error(`Failed to load NAPI module for SHM: ${err.message}`)); + return; + } + + // Retry connecting until wsdb creates the shared memory region + const retryInterval = 100; + const maxAttempts = 100; // 10s total + let attempt = 0; + + const tryConnect = () => { + attempt++; + try { + // TS backend is client 0 in the MPSC SHM system (AVM is client 1) + this.shmClient = new addon.MsgpackClientAsync(shmName, 0); + // Register response callback + this.shmClient.setResponseCallback((responseBuffer: Buffer) => { + const callback = this.pendingCallbacks.shift(); + if (callback) { + callback.resolve(new Uint8Array(responseBuffer)); + } + if (this.pendingCallbacks.length === 0) { + this.shmClient.release(); + } + }); + resolve(); + } catch (e: any) { + if (attempt >= maxAttempts) { + reject(new Error(`Timeout connecting to wsdb shared memory after ${maxAttempts * retryInterval}ms: ${e?.message ?? e}`)); + } else { + this.connectionTimeout = setTimeout(tryConnect, retryInterval); + } + } + }; + + this.connectionTimeout = setTimeout(tryConnect, retryInterval); + } + + // ——— UDS connection ——— + + private connectUdsPoll(resolve: () => void, reject: (error: Error) => void) { + const pollInterval = 50; + const maxWait = 10000; + let waited = 0; + + const poll = () => { + if (fs.existsSync(this.inputPath)) { + this.connectUds(resolve, reject); + } else if (waited >= maxWait) { + reject(new Error(`Timeout waiting for aztec-wsdb socket at ${this.inputPath}`)); + } else { + waited += pollInterval; + this.connectionTimeout = setTimeout(poll, pollInterval); + } + }; + + this.connectionTimeout = setTimeout(poll, pollInterval); + } + + private connectUds(resolve: () => void, reject: (error: Error) => void) { + this.socket = net.createConnection(this.inputPath); + + this.socket.on('connect', () => { + resolve(); + }); + + this.socket.on('error', (err: Error) => { + reject(err); + for (const cb of this.pendingCallbacks) { + cb.reject(err); + } + this.pendingCallbacks = []; + }); + + this.socket.on('data', (chunk: Buffer) => { + this.handleData(chunk); + }); + + this.socket.on('close', () => { + const error = new Error('aztec-wsdb socket closed'); + for (const cb of this.pendingCallbacks) { + cb.reject(error); + } + this.pendingCallbacks = []; + }); + } + + private handleData(chunk: Buffer) { + let offset = 0; + + while (offset < chunk.length) { + if (this.readingLength) { + const bytesNeeded = 4 - this.lengthBytesRead; + const bytesAvailable = chunk.length - offset; + const bytesToCopy = Math.min(bytesNeeded, bytesAvailable); + + chunk.copy(this.lengthBuffer, this.lengthBytesRead, offset, offset + bytesToCopy); + this.lengthBytesRead += bytesToCopy; + offset += bytesToCopy; + + if (this.lengthBytesRead === 4) { + this.responseLength = this.lengthBuffer.readUInt32LE(0); + this.responseBuffer = Buffer.alloc(this.responseLength); + this.responseBytesRead = 0; + this.readingLength = false; + } + } else { + const bytesNeeded = this.responseLength - this.responseBytesRead; + const bytesAvailable = chunk.length - offset; + const bytesToCopy = Math.min(bytesNeeded, bytesAvailable); + + chunk.copy(this.responseBuffer!, this.responseBytesRead, offset, offset + bytesToCopy); + this.responseBytesRead += bytesToCopy; + offset += bytesToCopy; + + if (this.responseBytesRead === this.responseLength) { + const callback = this.pendingCallbacks.shift(); + if (callback) { + callback.resolve(new Uint8Array(this.responseBuffer!)); + } + + // Reset state for next message + this.readingLength = true; + this.lengthBytesRead = 0; + this.responseBuffer = null; + } + } + } + } + + // ——— Unified call/destroy ——— + + async call(inputBuffer: Uint8Array): Promise { + await this.connectionPromise; + + if (this.useShm) { + return new Promise((resolve, reject) => { + if (this.pendingCallbacks.length === 0) { + this.shmClient.acquire(); + } + this.pendingCallbacks.push({ resolve, reject }); + try { + this.shmClient.call(Buffer.from(inputBuffer)); + } catch (err: any) { + this.pendingCallbacks.pop(); + if (this.pendingCallbacks.length === 0) { + this.shmClient.release(); + } + reject(new Error(`SHM call failed: ${err.message}`)); + } + }); + } + + // UDS mode + return new Promise((resolve, reject) => { + this.pendingCallbacks.push({ resolve, reject }); + + const lengthBuf = Buffer.alloc(4); + lengthBuf.writeUInt32LE(inputBuffer.length, 0); + + this.socket!.write(lengthBuf); + this.socket!.write(Buffer.from(inputBuffer)); + }); + } + + async destroy(): Promise { + // Suppress any pending connection promise rejection to avoid unhandled rejections + // when destroying before the IPC connection is established. + this.connectionPromise?.catch(() => {}); + + if (this.connectionTimeout) { + clearTimeout(this.connectionTimeout); + this.connectionTimeout = null; + } + + if (this.socket) { + this.socket.destroy(); + this.socket = null; + } + + if (this.process && this.process.exitCode === null) { + this.process.kill('SIGTERM'); + } + await this.processExitPromise; + + // Clean up stdio streams and remove all listeners to allow the event loop to exit. + if (this.process) { + this.process.stdout?.destroy(); + this.process.stderr?.destroy(); + this.process.removeAllListeners(); + } + + // Clean up socket/shm files + try { + if (!this.useShm && fs.existsSync(this.inputPath)) { + fs.unlinkSync(this.inputPath); + } + if (this.useShm) { + const shmName = this.inputPath.replace(/\.shm$/, ''); + for (const suffix of ['_request', '_response']) { + const shmPath = `/dev/shm/${shmName}${suffix}`; + if (fs.existsSync(shmPath)) { + fs.unlinkSync(shmPath); + } + } + } + } catch { + // Ignore cleanup errors + } + } +} diff --git a/barretenberg/ts/src/cbind/cpp_codegen.ts b/barretenberg/ts/src/cbind/cpp_codegen.ts new file mode 100644 index 000000000000..f13f524da171 --- /dev/null +++ b/barretenberg/ts/src/cbind/cpp_codegen.ts @@ -0,0 +1,241 @@ +/** + * C++ IPC Client Code Generator + * + * Generates a C++ IPC client from a CompiledSchema. The generated client: + * - Connects to a server over Unix Domain Socket via ipc::IpcClient + * - Wraps each command in a NamedUnion, serializes with msgpack, sends, receives, deserializes + * - Has one method per command, returning the typed response + * + * Usage: + * const gen = new CppCodegen({ namespace: 'bb::cdb', prefix: 'Cdb' }); + * const header = gen.generateHeader(schema); + * const impl = gen.generateImpl(schema); + */ + +import type { CompiledSchema, Type, Struct, Field, Command } from './schema_visitor.js'; +import { toSnakeCase } from './naming.js'; + +export interface CppCodegenOptions { + /** C++ namespace for generated code, e.g. 'bb::cdb' */ + namespace: string; + /** Prefix for command/response types, e.g. 'Cdb' */ + prefix: string; + /** Header path for the *_execute.hpp file that defines Command/CommandResponse NamedUnions */ + executeHeader: string; + /** Header path for the *_commands.hpp file that defines the command structs */ + commandsHeader: string; +} + +export class CppCodegen { + constructor(private opts: CppCodegenOptions) {} + + /** Convert a command name to a C++ method name (snake_case without prefix) */ + private methodName(commandName: string): string { + // Strip prefix: "CdbGetContractInstance" -> "GetContractInstance" -> "get_contract_instance" + const withoutPrefix = commandName.startsWith(this.opts.prefix) + ? commandName.slice(this.opts.prefix.length) + : commandName; + return toSnakeCase(withoutPrefix); + } + + /** Check if the response has fields (non-void return) */ + private hasResponseFields(command: Command, schema: CompiledSchema): boolean { + const resp = schema.responses.get(command.responseType); + return !!resp && resp.fields.length > 0; + } + + /** Generate the method signature using command struct types directly */ + private generateMethodSignature(command: Command, schema: CompiledSchema, className?: string): string { + const method = this.methodName(command.name); + const hasFields = this.hasResponseFields(command, schema); + const retType = hasFields ? `${command.name}::Response` : 'void'; + + // If the command has fields, take the whole command struct by value + const params = command.fields.length > 0 ? `${command.name} cmd` : ''; + + const prefix = className ? `${className}::` : ''; + const constSuffix = !this.isWriteCommand(command) ? ' const' : ''; + + return `${retType} ${prefix}${method}(${params})${constSuffix}`; + } + + /** Check if a command modifies state (non-const) */ + private isWriteCommand(command: Command): boolean { + const name = command.name.toLowerCase(); + return name.includes('add') || name.includes('create') || + name.includes('commit') || name.includes('revert') || + name.includes('register') || name.includes('shutdown') || + name.includes('delete') || name.includes('sync') || + name.includes('rollback') || name.includes('unwind'); + } + + /** Generate the header file */ + generateHeader(schema: CompiledSchema): string { + const { namespace: ns, prefix } = this.opts; + const className = `${prefix}IpcClient`; + const guardName = `${ns.replace(/::/g, '_').toUpperCase()}_${prefix.toUpperCase()}_IPC_CLIENT_GENERATED_HPP`; + + const methods = schema.commands.map(cmd => { + const sig = this.generateMethodSignature(cmd, schema); + return ` ${sig};`; + }).join('\n'); + + return `// AUTOGENERATED FILE - DO NOT EDIT +#pragma once + +#include "barretenberg/common/try_catch_shim.hpp" +#include "${this.opts.executeHeader}" +#include "barretenberg/ipc/ipc_client.hpp" + +#include +#include + +namespace ${ns} { + +/** + * @brief Auto-generated IPC client. + * + * Each method sends a msgpack-serialized command to the server over UDS + * and returns the typed response. All methods block until the response arrives. + */ +class ${className} { + public: + explicit ${className}(const std::string& socket_path); + ~${className}(); + + ${className}(const ${className}&) = delete; + ${className}& operator=(const ${className}&) = delete; + +${methods} + + private: + template + typename Cmd::Response send(Cmd&& cmd) const; + + mutable std::unique_ptr client_; +}; + +} // namespace ${ns} +`; + } + + /** Generate the implementation file */ + generateImpl(schema: CompiledSchema): string { + const { namespace: ns, prefix } = this.opts; + const className = `${prefix}IpcClient`; + const commandType = `${prefix}Command`; + const responseType = `${prefix}CommandResponse`; + const errorType = `${prefix}ErrorResponse`; + + const methods = schema.commands.map(cmd => { + return this.generateMethodImpl(cmd, schema, className); + }).join('\n'); + + return `// AUTOGENERATED FILE - DO NOT EDIT + +#include "${this.headerIncludePath()}" +#include "${this.opts.executeHeader}" +#include "barretenberg/serialize/msgpack.hpp" +#include "barretenberg/serialize/msgpack_impl.hpp" + +#include +#include + +namespace ${ns} { + +${className}::${className}(const std::string& socket_path) + : client_(ipc::IpcClient::create_socket(socket_path)) +{ + if (!client_->connect()) { + throw std::runtime_error("Failed to connect to server at " + socket_path); + } +} + +${className}::~${className}() +{ + if (client_) { + client_->close(); + } +} + +template +typename Cmd::Response ${className}::send(Cmd&& cmd) const +{ + // Wrap command in ${commandType} NamedUnion, then in a 1-element tuple (matches server expectations) + ${commandType} command = std::forward(cmd); + auto wrapped = std::make_tuple(std::move(command)); + + // Serialize to msgpack + msgpack::sbuffer send_buffer; + msgpack::pack(send_buffer, wrapped); + + // Send to server + constexpr uint64_t timeout_ns = 30'000'000'000ULL; // 30 seconds + if (!client_->send(send_buffer.data(), send_buffer.size(), timeout_ns)) { + throw std::runtime_error("Failed to send command to server"); + } + + // Receive response + auto response_span = client_->receive(timeout_ns); + if (response_span.empty()) { + throw std::runtime_error("Empty response from server"); + } + + // Deserialize response + auto unpacked = msgpack::unpack(reinterpret_cast(response_span.data()), response_span.size()); + auto response_obj = unpacked.get(); + + ${responseType} response; + response_obj.convert(response); + + // Release the receive buffer + client_->release(response_span.size()); + + // Check for error response + return std::move(response).visit([](auto&& resp) -> typename Cmd::Response { + using RespType = std::decay_t; + + if constexpr (std::is_same_v) { + throw std::runtime_error("Server error: " + resp.message); + } else if constexpr (std::is_same_v) { + return std::forward(resp); + } else { + throw std::runtime_error("Unexpected response type from server"); + } + }); +} + +${methods} +} // namespace ${ns} +`; + } + + /** Generate a single method implementation */ + private generateMethodImpl(command: Command, schema: CompiledSchema, className: string): string { + const sig = this.generateMethodSignature(command, schema, className); + const hasFields = this.hasResponseFields(command, schema); + + const cmdExpr = command.fields.length > 0 ? 'std::move(cmd)' : `${command.name}{}`; + + if (!hasFields) { + return `${sig} +{ + send(${cmdExpr}); +} +`; + } + + return `${sig} +{ + return send(${cmdExpr}); +} +`; + } + + /** Compute the include path for the generated header */ + private headerIncludePath(): string { + // Derive from the executeHeader path: replace _execute.hpp with _ipc_client_generated.hpp + const dir = this.opts.executeHeader.substring(0, this.opts.executeHeader.lastIndexOf('/')); + return `${dir}/${toSnakeCase(this.opts.prefix)}_ipc_client_generated.hpp`; + } +} diff --git a/barretenberg/ts/src/cbind/rust_codegen.ts b/barretenberg/ts/src/cbind/rust_codegen.ts index 0e0a316d4ca4..572249056732 100644 --- a/barretenberg/ts/src/cbind/rust_codegen.ts +++ b/barretenberg/ts/src/cbind/rust_codegen.ts @@ -12,6 +12,8 @@ import type { CompiledSchema, Type, Struct, Field } from './schema_visitor.js'; import { toSnakeCase, toPascalCase } from './naming.js'; export class RustCodegen { + private errorTypeName: string = 'ErrorResponse'; + // Type mapping: Schema type -> Rust type private mapType(type: Type): string { switch (type.kind) { @@ -223,8 +225,9 @@ ${deserializeCases} private generateResponseEnum(schema: CompiledSchema): string { // Include all response types from commands plus ErrorResponse if it exists const commandResponseTypes = Array.from(new Set(schema.commands.map(c => c.responseType))); - const responseTypes = schema.responses.has('ErrorResponse') - ? [...commandResponseTypes, 'ErrorResponse'] + const errorName = schema.errorTypeName || 'ErrorResponse'; + const responseTypes = schema.responses.has(errorName) + ? [...commandResponseTypes, errorName] : commandResponseTypes; const variants = responseTypes .map(name => { @@ -426,6 +429,7 @@ mod serde_array4_bytes { // Generate types file generateTypes(schema: CompiledSchema): string { + this.errorTypeName = schema.errorTypeName || 'ErrorResponse'; // Create set of top-level command struct names (only these need __typename) const commandNames = new Set(schema.commands.map(c => c.name)); @@ -483,7 +487,7 @@ ${this.generateResponseEnum(schema)} let cmd = Command::${cmdRustName}(${cmdRustName}::new(${paramConversions})); match self.execute(cmd)? { Response::${respRustName}(resp) => Ok(resp), - Response::ErrorResponse(err) => Err(BarretenbergError::Backend( + Response::${toPascalCase(this.errorTypeName)}(err) => Err(BarretenbergError::Backend( err.message )), _ => Err(BarretenbergError::InvalidResponse( @@ -495,6 +499,7 @@ ${this.generateResponseEnum(schema)} // Generate API file generateApi(schema: CompiledSchema): string { + this.errorTypeName = schema.errorTypeName || 'ErrorResponse'; const apiMethods = schema.commands .filter(c => c.name !== 'Shutdown') .map(c => this.generateApiMethod(c)) diff --git a/barretenberg/ts/src/cbind/schema_visitor.ts b/barretenberg/ts/src/cbind/schema_visitor.ts index ed4f83db7ccc..e182c1f1abd4 100644 --- a/barretenberg/ts/src/cbind/schema_visitor.ts +++ b/barretenberg/ts/src/cbind/schema_visitor.ts @@ -8,7 +8,7 @@ * - Output is "compiled schema" with resolved types */ -export type PrimitiveType = 'bool' | 'u8' | 'u16' | 'u32' | 'u64' | 'f64' | 'string' | 'bytes' | 'field2'; +export type PrimitiveType = 'bool' | 'u8' | 'u16' | 'u32' | 'u64' | 'f64' | 'string' | 'bytes' | 'field2' | 'enum_u32' | 'map_u32_pair'; export interface Type { kind: 'primitive' | 'vector' | 'array' | 'optional' | 'struct'; @@ -43,6 +43,9 @@ export interface CompiledSchema { // Response types responses: Map; + + // Error response type name (e.g. 'WsdbErrorResponse') + errorTypeName?: string; } /** @@ -71,11 +74,14 @@ export class SchemaVisitor { } } + // Find the error response type name (e.g. 'WsdbErrorResponse') + const errorResponses = responsePairs.filter(([name]: [string, any]) => name.endsWith('ErrorResponse')); + const errorTypeName = errorResponses.length > 0 ? errorResponses[0][0] : undefined; + // Visit all commands and pair with responses + const normalResponses = responsePairs.filter(([name]: [string, any]) => !name.endsWith('ErrorResponse')); for (let i = 0; i < commandPairs.length; i++) { const [cmdName, cmdSchema] = commandPairs[i]; - // Find matching response (skip ErrorResponse which is always last) - const normalResponses = responsePairs.filter(([name]: [string, any]) => name !== 'ErrorResponse'); const [respName] = normalResponses[i]; // Discover command structure @@ -94,6 +100,7 @@ export class SchemaVisitor { structs: this.structs, commands, responses: this.responses, + errorTypeName, }; } @@ -203,6 +210,8 @@ export class SchemaVisitor { 'string': 'string', 'bin32': 'bytes', 'field2': 'field2', // Extension field (Fq2) - pair of field elements + 'MerkleTreeId': 'enum_u32', // C++ enum serialized as uint32 + 'unordered_map': 'map_u32_pair', // StateReference: map> }; const primitive = primitiveMap[name]; diff --git a/barretenberg/ts/src/cbind/typescript_codegen.ts b/barretenberg/ts/src/cbind/typescript_codegen.ts index 649e415cb2f8..b711efcb0246 100644 --- a/barretenberg/ts/src/cbind/typescript_codegen.ts +++ b/barretenberg/ts/src/cbind/typescript_codegen.ts @@ -17,6 +17,8 @@ function toCamelCase(name: string): string { } export class TypeScriptCodegen { + private errorTypeName: string = 'ErrorResponse'; + // Type mapping: Schema type -> TypeScript type private mapType(type: Type): string { switch (type.kind) { @@ -31,14 +33,21 @@ export class TypeScriptCodegen { case 'string': return 'string'; case 'bytes': return 'Uint8Array'; case 'field2': return '[Uint8Array, Uint8Array]'; // Extension field (Fq2) + case 'enum_u32': return 'number'; // C++ enum as integer + case 'map_u32_pair': return 'Record'; // map> } break; - case 'vector': - return `${this.mapType(type.element!)}[]`; + case 'vector': { + const inner = this.mapType(type.element!); + // Wrap union types in parens to avoid precedence issues: (Foo | undefined)[] + return type.element!.kind === 'optional' ? `(${inner})[]` : `${inner}[]`; + } - case 'array': - return `${this.mapType(type.element!)}[]`; + case 'array': { + const inner = this.mapType(type.element!); + return type.element!.kind === 'optional' ? `(${inner})[]` : `${inner}[]`; + } case 'optional': return `${this.mapType(type.element!)} | undefined`; @@ -64,14 +73,20 @@ export class TypeScriptCodegen { case 'string': return 'string'; case 'bytes': return 'Uint8Array'; case 'field2': return '[Uint8Array, Uint8Array]'; + case 'enum_u32': return 'number'; + case 'map_u32_pair': return 'Record'; } break; - case 'vector': - return `${this.mapMsgpackType(type.element!)}[]`; + case 'vector': { + const inner = this.mapMsgpackType(type.element!); + return type.element!.kind === 'optional' ? `(${inner})[]` : `${inner}[]`; + } - case 'array': - return `${this.mapMsgpackType(type.element!)}[]`; + case 'array': { + const inner = this.mapMsgpackType(type.element!); + return type.element!.kind === 'optional' ? `(${inner})[]` : `${inner}[]`; + } case 'optional': return `${this.mapMsgpackType(type.element!)} | undefined`; @@ -209,7 +224,7 @@ ${conversions} return value; case 'optional': if (this.needsConversion(type.element!)) { - return `${value} !== undefined ? ${this.generateToConverter(type.element!, value)} : undefined`; + return `${value} != null ? ${this.generateToConverter(type.element!, value)} : undefined`; } return value; case 'struct': @@ -233,7 +248,7 @@ ${conversions} return value; case 'optional': if (this.needsConversion(type.element!)) { - return `${value} !== undefined ? ${this.generateFromConverter(type.element!, value)} : undefined`; + return `${value} != null ? ${this.generateFromConverter(type.element!, value)} : undefined`; } return value; case 'struct': @@ -303,7 +318,7 @@ ${apiMethods} return ` ${methodName}(command: ${cmdType}): Promise<${respType}> { const msgpackCommand = from${cmdType}(command); return msgpackCall(this.backend, [["${command.name}", msgpackCommand]]).then(([variantName, result]: [string, any]) => { - if (variantName === 'ErrorResponse') { + if (variantName === '${this.errorTypeName}') { throw new BBApiException(result.message || 'Unknown error from barretenberg'); } if (variantName !== '${command.responseType}') { @@ -334,6 +349,7 @@ ${apiMethods} // Generate async API file generateAsyncApi(schema: CompiledSchema): string { + this.errorTypeName = schema.errorTypeName || 'ErrorResponse'; const imports = this.generateApiImports(schema); const methods = schema.commands .map(c => this.generateAsyncApiMethod(c)) @@ -366,6 +382,7 @@ ${methods} // Generate sync API file generateSyncApi(schema: CompiledSchema): string { + this.errorTypeName = schema.errorTypeName || 'ErrorResponse'; const imports = this.generateApiImports(schema); const methods = schema.commands .map(c => this.generateSyncApiMethod(c)) diff --git a/yarn-project/pxe/tsconfig.json b/yarn-project/pxe/tsconfig.json index e26348ab59f4..539e38403082 100644 --- a/yarn-project/pxe/tsconfig.json +++ b/yarn-project/pxe/tsconfig.json @@ -40,10 +40,10 @@ "path": "../stdlib" }, { - "path": "../world-state" + "path": "../noir-test-contracts.js" }, { - "path": "../noir-test-contracts.js" + "path": "../world-state" } ], "include": ["src"] diff --git a/yarn-project/world-state/package.json b/yarn-project/world-state/package.json index 18d23db27ea1..1e095490f2c9 100644 --- a/yarn-project/world-state/package.json +++ b/yarn-project/world-state/package.json @@ -64,6 +64,7 @@ ] }, "dependencies": { + "@aztec/bb.js": "workspace:^", "@aztec/constants": "workspace:^", "@aztec/foundation": "workspace:^", "@aztec/kv-store": "workspace:^", @@ -71,6 +72,7 @@ "@aztec/protocol-contracts": "workspace:^", "@aztec/stdlib": "workspace:^", "@aztec/telemetry-client": "workspace:^", + "msgpackr": "^1.11.2", "tslib": "^2.4.0", "zod": "^3.23.8" }, diff --git a/yarn-project/world-state/src/native/ipc_world_state_instance.ts b/yarn-project/world-state/src/native/ipc_world_state_instance.ts new file mode 100644 index 000000000000..5489c80c37ce --- /dev/null +++ b/yarn-project/world-state/src/native/ipc_world_state_instance.ts @@ -0,0 +1,712 @@ +import { AsyncApi } from '@aztec/bb.js/aztec-wsdb'; +import type { + WorldStateDBStats as WsdbDBStats, + DBStats as WsdbDBStatsInner, + WorldStateMeta as WsdbMeta, + SiblingPathAndIndex as WsdbSiblingPathAndIndex, + WorldStateStatusFull as WsdbStatusFull, + WorldStateStatusSummary as WsdbStatusSummary, + TreeDBStats as WsdbTreeDBStats, + TreeMeta as WsdbTreeMeta, +} from '@aztec/bb.js/aztec-wsdb'; +import { + ARCHIVE_HEIGHT, + DomainSeparator, + L1_TO_L2_MSG_TREE_HEIGHT, + MAX_NULLIFIERS_PER_TX, + MAX_TOTAL_PUBLIC_DATA_UPDATE_REQUESTS_PER_TX, + NOTE_HASH_TREE_HEIGHT, + NULLIFIER_TREE_HEIGHT, + PUBLIC_DATA_TREE_HEIGHT, +} from '@aztec/constants'; +import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; +import { MerkleTreeId } from '@aztec/stdlib/trees'; +import type { WorldStateRevision } from '@aztec/stdlib/world-state'; + +import assert from 'assert'; +import { Decoder, Encoder } from 'msgpackr'; + +import type { WorldStateInstrumentation } from '../instrumentation/instrumentation.js'; +import type { WorldStateTreeMapSizes } from '../synchronizer/factory.js'; +import { + type DBStats, + type SerializedIndexedLeaf, + type SerializedLeafValue, + type TreeDBStats, + type TreeMeta, + type WorldStateDBStats, + WorldStateMessageType, + type WorldStateMeta, + type WorldStateRequest, + type WorldStateRequestCategories, + type WorldStateResponse, + type WorldStateStatusFull, + type WorldStateStatusSummary, + isWithCanonical, + isWithForkId, + isWithRevision, +} from './message.js'; +import type { NativeWorldStateInstance } from './native_world_state_instance.js'; +import { WorldStateOpsQueue } from './world_state_ops_queue.js'; + +// ————— Msgpack helpers ————— + +const msgpackEncoder = new Encoder({ useRecords: false }); +const msgpackDecoder = new Decoder({ useRecords: false }); + +/** Msgpack-encode a SerializedLeafValue into bytes for IPC transport. */ +function serializeLeafToBytes(leaf: SerializedLeafValue): Uint8Array { + return Buffer.from(msgpackEncoder.pack(leaf)); +} + +// ————— Request conversion helpers ————— + +function toWsdbRevision(rev: WorldStateRevision): { forkid: number; blocknumber: number; includeuncommitted: boolean } { + return { + forkid: rev.forkId, + blocknumber: Number(rev.blockNumber), + includeuncommitted: rev.includeUncommitted, + }; +} + +function blockStateRefToMap(ref: Map): Map { + const result = new Map(); + for (const [treeId, [root, size]] of ref.entries()) { + result.set(treeId, [new Uint8Array(root), Number(size)]); + } + return result; +} + +// ————— Response conversion helpers ————— + +/** Convert Uint8Array fields to Buffer recursively (for opaque blob responses). */ +function convertUint8ArraysToBuffers(obj: unknown): unknown { + if (obj instanceof Uint8Array) { + return Buffer.from(obj); + } + if (Array.isArray(obj)) { + return obj.map(convertUint8ArraysToBuffers); + } + if (obj !== null && typeof obj === 'object') { + const result: Record = {}; + for (const [key, value] of Object.entries(obj)) { + result[key] = convertUint8ArraysToBuffers(value); + } + return result; + } + return obj; +} + +/** Decode a msgpack-encoded leaf value blob and convert Uint8Arrays to Buffers. */ +function decodeLeafValue(encoded: Uint8Array): SerializedLeafValue { + const decoded = msgpackDecoder.unpack(Buffer.from(encoded)); + return convertUint8ArraysToBuffers(decoded) as SerializedLeafValue; +} + +/** Decode a msgpack-encoded indexed leaf preimage blob. */ +function decodeLeafPreimage(encoded: Uint8Array): SerializedIndexedLeaf { + const decoded = msgpackDecoder.unpack(Buffer.from(encoded)); + return convertUint8ArraysToBuffers(decoded) as SerializedIndexedLeaf; +} + +/** Convert Wsdb state reference (Record) to NAPI format. */ +function convertStateRef( + state: Record, +): Record { + const result: Record = {}; + for (const [key, [root, size]] of Object.entries(state)) { + result[Number(key)] = [Buffer.from(root), BigInt(size)] as const; + } + return result; +} + +/** Convert Wsdb WorldStateStatusSummary (lowercase) to NAPI format (camelCase). */ +function convertStatusSummary(s: WsdbStatusSummary): WorldStateStatusSummary { + return { + unfinalizedBlockNumber: s.unfinalizedblocknumber, + finalizedBlockNumber: s.finalizedblocknumber, + oldestHistoricalBlock: s.oldesthistoricalblock, + treesAreSynched: s.treesaresynched, + } as unknown as WorldStateStatusSummary; +} + +function convertDBStats(s: WsdbDBStatsInner): DBStats { + return { + name: s.name, + numDataItems: s.numdataitems, + totalUsedSize: s.totalusedsize, + } as unknown as DBStats; +} + +function convertTreeDBStats(s: WsdbTreeDBStats): TreeDBStats { + return { + mapSize: s.mapsize, + physicalFileSize: s.physicalfilesize, + blocksDBStats: convertDBStats(s.blocksdbstats), + nodesDBStats: convertDBStats(s.nodesdbstats), + leafPreimagesDBStats: convertDBStats(s.leafpreimagesdbstats), + leafIndicesDBStats: convertDBStats(s.leafindicesdbstats), + blockIndicesDBStats: convertDBStats(s.blockindicesdbstats), + } as unknown as TreeDBStats; +} + +function convertWorldStateDBStats(s: WsdbDBStats): WorldStateDBStats { + return { + noteHashTreeStats: convertTreeDBStats(s.notehashtreestats), + messageTreeStats: convertTreeDBStats(s.messagetreestats), + archiveTreeStats: convertTreeDBStats(s.archivetreestats), + publicDataTreeStats: convertTreeDBStats(s.publicdatatreestats), + nullifierTreeStats: convertTreeDBStats(s.nullifiertreestats), + } as unknown as WorldStateDBStats; +} + +function convertTreeMeta(m: WsdbTreeMeta): TreeMeta { + return { + name: m.name, + depth: m.depth, + size: m.size, + committedSize: m.committedsize, + root: m.root, + initialSize: m.initialsize, + initialRoot: m.initialroot, + oldestHistoricBlock: m.oldesthistoricblock, + unfinalizedBlockHeight: m.unfinalizedblockheight, + finalizedBlockHeight: m.finalizedblockheight, + } as unknown as TreeMeta; +} + +function convertWorldStateMeta(m: WsdbMeta): WorldStateMeta { + return { + noteHashTreeMeta: convertTreeMeta(m.notehashtreemeta), + messageTreeMeta: convertTreeMeta(m.messagetreemeta), + archiveTreeMeta: convertTreeMeta(m.archivetreemeta), + publicDataTreeMeta: convertTreeMeta(m.publicdatatreemeta), + nullifierTreeMeta: convertTreeMeta(m.nullifiertreemeta), + } as unknown as WorldStateMeta; +} + +function convertStatusFull(s: WsdbStatusFull): WorldStateStatusFull { + return { + summary: convertStatusSummary(s.summary), + dbStats: convertWorldStateDBStats(s.dbstats), + meta: convertWorldStateMeta(s.meta), + } as unknown as WorldStateStatusFull; +} + +/** Convert Wsdb SiblingPathAndIndex to NAPI format. */ +function convertSiblingPathAndIndex( + s: WsdbSiblingPathAndIndex | undefined, +): { index: bigint; path: Buffer[] } | undefined { + if (!s) { + return undefined; + } + return { + index: BigInt(s.index), + path: s.path.map(p => Buffer.from(p)), + }; +} + +// ————— Public API ————— + +/** Backend interface matching WsdbBackend from bb.js. */ +export interface WsdbIpcBackend { + call(inputBuffer: Uint8Array): Promise; + getSocketPath(): string; + destroy?(): Promise; +} + +/** + * IPC-backed world state instance. + * Uses WsdbBackend (spawns aztec-wsdb binary) and the generated AsyncApi + * to communicate via the NamedUnion IPC protocol. + */ +export class IpcWorldState implements NativeWorldStateInstance { + private open = true; + private queues = new Map(); + private api: AsyncApi; + /** Tracks checkpoint depth per fork (WSDB IPC doesn't return depth in response). */ + private checkpointDepths = new Map(); + + constructor( + private readonly wsdbBackend: WsdbIpcBackend, + private readonly instrumentation: WorldStateInstrumentation, + bindings?: LoggerBindings, + private readonly log: Logger = createLogger('world-state:ipc-database', bindings), + ) { + this.api = new AsyncApi(wsdbBackend as any); + this.queues.set(0, new WorldStateOpsQueue()); + this.log.info('Created IPC-backed world state instance'); + } + + /** Returns the socket path of the underlying wsdb server. */ + getSocketPath(): string { + return this.wsdbBackend.getSocketPath(); + } + + /** + * Required by `NativeWorldStateInstance` for compatibility with the in-process + * NAPI path. The IPC backend does not expose an in-process pointer; callers that + * need to reach the WSDB process must use {@link getSocketPath} instead. + */ + getHandle(): any { + throw new Error('IpcWorldState has no in-process handle; use getSocketPath() instead'); + } + + async call( + messageType: T, + body: WorldStateRequest[T] & WorldStateRequestCategories, + responseHandler = (response: WorldStateResponse[T]): WorldStateResponse[T] => response, + errorHandler = (_: string) => {}, + ): Promise { + let forkId = -1; + let committedOnly = false; + + if (isWithCanonical(body)) { + forkId = 0; + } else if (isWithForkId(body)) { + forkId = body.forkId; + } else if (isWithRevision(body)) { + forkId = body.revision.forkId; + committedOnly = body.revision.includeUncommitted === false; + } else { + const _: never = body; + throw new Error(`Unable to determine forkId for message=${WorldStateMessageType[messageType]}`); + } + + let requestQueue = this.queues.get(forkId); + if (requestQueue === undefined) { + requestQueue = new WorldStateOpsQueue(); + this.queues.set(forkId, requestQueue); + } + + const response = await requestQueue.execute( + async () => { + assert.notEqual(messageType, WorldStateMessageType.CLOSE, 'Use close() to close the IPC instance'); + assert.equal(this.open, true, 'IPC instance is closed'); + let response: WorldStateResponse[T]; + try { + response = await this._sendMessage(messageType, body); + } catch (error: any) { + errorHandler(error.message); + throw error; + } + return responseHandler(response); + }, + messageType, + committedOnly, + ); + + if (messageType === WorldStateMessageType.DELETE_FORK) { + await requestQueue.stop(); + this.queues.delete(forkId); + } + return response; + } + + async close(): Promise { + if (!this.open) { + return; + } + this.open = false; + const queue = this.queues.get(0)!; + + // Send shutdown command. Under normal operation, the WSDB process sends its + // response before exiting (via ShutdownRequested in ipc_server.hpp). The + // try/catch is defensive: if the process is killed externally (SIGKILL, OOM) + // before responding, the pending IPC callback would be rejected by the socket + // close handler. We proceed to destroy the backend regardless. + try { + await queue.execute( + async () => { + await this.api.wsdbShutdown({}); + }, + WorldStateMessageType.CLOSE, + false, + ); + } catch (err: any) { + this.log.debug(`wsdbShutdown completed with error: ${err.message}`); + } + await queue.stop(); + + if (this.wsdbBackend.destroy) { + await this.wsdbBackend.destroy(); + } + } + + private async _sendMessage( + messageType: T, + body: WorldStateRequest[T] & WorldStateRequestCategories, + ): Promise { + const start = performance.now(); + try { + const response = await this.dispatch(messageType, body); + const durationMs = performance.now() - start; + this.log.trace(`Call ${WorldStateMessageType[messageType]} took (ms)`, { duration: durationMs }); + this.instrumentation.recordRoundTrip(durationMs * 1000, messageType); + return response; + } catch (error) { + this.log.error(`Call ${WorldStateMessageType[messageType]} failed: ${error}`, error); + throw error; + } + } + + private async dispatch( + messageType: T, + body: WorldStateRequest[T] & WorldStateRequestCategories, + ): Promise { + switch (messageType) { + // ——— Tree info & state reference ——— + + case WorldStateMessageType.GET_TREE_INFO: { + const b = body as WorldStateRequest[WorldStateMessageType.GET_TREE_INFO]; + const resp = await this.api.wsdbGetTreeInfo({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + }); + return { + treeId: resp.treeid, + root: Buffer.from(resp.root), + size: resp.size, + depth: resp.depth, + } as WorldStateResponse[T]; + } + + case WorldStateMessageType.GET_STATE_REFERENCE: { + const b = body as WorldStateRequest[WorldStateMessageType.GET_STATE_REFERENCE]; + const resp = await this.api.wsdbGetStateReference({ + revision: toWsdbRevision(b.revision), + }); + return { state: convertStateRef(resp.state) } as WorldStateResponse[T]; + } + + case WorldStateMessageType.GET_INITIAL_STATE_REFERENCE: { + const resp = await this.api.wsdbGetInitialStateReference({}); + return { state: convertStateRef(resp.state) } as WorldStateResponse[T]; + } + + // ——— Leaf queries ——— + + case WorldStateMessageType.GET_LEAF_VALUE: { + const b = body as WorldStateRequest[WorldStateMessageType.GET_LEAF_VALUE]; + const resp = await this.api.wsdbGetLeafValue({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + leafindex: Number(b.leafIndex), + }); + if (!resp.value) { + return undefined as WorldStateResponse[T]; + } + return decodeLeafValue(resp.value) as WorldStateResponse[T]; + } + + case WorldStateMessageType.GET_LEAF_PREIMAGE: { + const b = body as WorldStateRequest[WorldStateMessageType.GET_LEAF_PREIMAGE]; + const resp = await this.api.wsdbGetLeafPreimage({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + leafindex: Number(b.leafIndex), + }); + if (!resp.preimage) { + return undefined as WorldStateResponse[T]; + } + return decodeLeafPreimage(resp.preimage) as WorldStateResponse[T]; + } + + case WorldStateMessageType.GET_SIBLING_PATH: { + const b = body as WorldStateRequest[WorldStateMessageType.GET_SIBLING_PATH]; + const resp = await this.api.wsdbGetSiblingPath({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + leafindex: Number(b.leafIndex), + }); + return resp.path.map(p => Buffer.from(p)) as WorldStateResponse[T]; + } + + case WorldStateMessageType.GET_BLOCK_NUMBERS_FOR_LEAF_INDICES: { + const b = body as WorldStateRequest[WorldStateMessageType.GET_BLOCK_NUMBERS_FOR_LEAF_INDICES]; + const resp = await this.api.wsdbGetBlockNumbersForLeafIndices({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + leafindices: b.leafIndices.map(Number), + }); + return { + blockNumbers: resp.blocknumbers.map(n => (n != null ? BigInt(n) : undefined)), + } as WorldStateResponse[T]; + } + + // ——— Find operations ——— + + case WorldStateMessageType.FIND_LEAF_INDICES: { + const b = body as WorldStateRequest[WorldStateMessageType.FIND_LEAF_INDICES]; + const resp = await this.api.wsdbFindLeafIndices({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + leaves: b.leaves.map(serializeLeafToBytes), + startindex: Number(b.startIndex), + }); + return { + indices: resp.indices.map(n => (n != null ? BigInt(n) : undefined)), + } as WorldStateResponse[T]; + } + + case WorldStateMessageType.FIND_LOW_LEAF: { + const b = body as WorldStateRequest[WorldStateMessageType.FIND_LOW_LEAF]; + const resp = await this.api.wsdbFindLowLeaf({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + key: new Uint8Array(b.key.toBuffer()), + }); + return { + alreadyPresent: resp.alreadypresent, + index: BigInt(resp.index), + } as WorldStateResponse[T]; + } + + case WorldStateMessageType.FIND_SIBLING_PATHS: { + const b = body as WorldStateRequest[WorldStateMessageType.FIND_SIBLING_PATHS]; + const resp = await this.api.wsdbFindSiblingPaths({ + treeid: b.treeId, + revision: toWsdbRevision(b.revision), + leaves: b.leaves.map(serializeLeafToBytes), + }); + return { + paths: resp.paths.map(convertSiblingPathAndIndex), + } as WorldStateResponse[T]; + } + + // ——— Mutations ——— + + case WorldStateMessageType.APPEND_LEAVES: { + const b = body as WorldStateRequest[WorldStateMessageType.APPEND_LEAVES]; + await this.api.wsdbAppendLeaves({ + treeid: b.treeId, + leaves: b.leaves.map(serializeLeafToBytes), + forkid: b.forkId, + }); + return undefined as WorldStateResponse[T]; + } + + case WorldStateMessageType.BATCH_INSERT: { + const b = body as WorldStateRequest[WorldStateMessageType.BATCH_INSERT]; + const resp = await this.api.wsdbBatchInsert({ + treeid: b.treeId, + leaves: b.leaves.map(serializeLeafToBytes), + subtreedepth: b.subtreeDepth, + forkid: b.forkId, + }); + const decoded = msgpackDecoder.unpack(Buffer.from(resp.result)); + return convertUint8ArraysToBuffers(decoded) as WorldStateResponse[T]; + } + + case WorldStateMessageType.SEQUENTIAL_INSERT: { + const b = body as WorldStateRequest[WorldStateMessageType.SEQUENTIAL_INSERT]; + const resp = await this.api.wsdbSequentialInsert({ + treeid: b.treeId, + leaves: b.leaves.map(serializeLeafToBytes), + forkid: b.forkId, + }); + const decoded = msgpackDecoder.unpack(Buffer.from(resp.result)); + return convertUint8ArraysToBuffers(decoded) as WorldStateResponse[T]; + } + + case WorldStateMessageType.UPDATE_ARCHIVE: { + const b = body as WorldStateRequest[WorldStateMessageType.UPDATE_ARCHIVE]; + await this.api.wsdbUpdateArchive({ + blockstateref: blockStateRefToMap(b.blockStateRef as Map) as any, + blockheaderhash: new Uint8Array(b.blockHeaderHash), + forkid: b.forkId, + }); + return undefined as WorldStateResponse[T]; + } + + // ——— Commit / Rollback ——— + + case WorldStateMessageType.COMMIT: { + await this.api.wsdbCommit({}); + return undefined as WorldStateResponse[T]; + } + + case WorldStateMessageType.ROLLBACK: { + await this.api.wsdbRollback({}); + return undefined as WorldStateResponse[T]; + } + + // ——— Block sync ——— + + case WorldStateMessageType.SYNC_BLOCK: { + const b = body as WorldStateRequest[WorldStateMessageType.SYNC_BLOCK]; + const resp = await this.api.wsdbSyncBlock({ + blocknumber: Number(b.blockNumber), + blockstateref: blockStateRefToMap(b.blockStateRef as Map) as any, + blockheaderhash: new Uint8Array(b.blockHeaderHash), + paddednotehashes: b.paddedNoteHashes.map(l => new Uint8Array(l as Buffer)), + paddedl1tol2messages: b.paddedL1ToL2Messages.map(l => new Uint8Array(l as Buffer)), + paddednullifiers: b.paddedNullifiers.map(l => ({ + nullifier: new Uint8Array((l as { nullifier: Buffer }).nullifier), + })), + publicdatawrites: b.publicDataWrites.map(l => ({ + slot: new Uint8Array((l as { slot: Buffer; value: Buffer }).slot), + value: new Uint8Array((l as { slot: Buffer; value: Buffer }).value), + })), + }); + return convertStatusFull(resp.status) as WorldStateResponse[T]; + } + + // ——— Fork management ——— + + case WorldStateMessageType.CREATE_FORK: { + const b = body as WorldStateRequest[WorldStateMessageType.CREATE_FORK]; + const resp = await this.api.wsdbCreateFork({ + latest: b.latest, + blocknumber: Number(b.blockNumber), + }); + return { forkId: resp.forkid } as WorldStateResponse[T]; + } + + case WorldStateMessageType.DELETE_FORK: { + const b = body as WorldStateRequest[WorldStateMessageType.DELETE_FORK]; + await this.api.wsdbDeleteFork({ forkid: b.forkId }); + return undefined as WorldStateResponse[T]; + } + + // ——— Block finalization ——— + + case WorldStateMessageType.FINALIZE_BLOCKS: { + const b = body as WorldStateRequest[WorldStateMessageType.FINALIZE_BLOCKS]; + const resp = await this.api.wsdbFinalizeBlocks({ toblocknumber: Number(b.toBlockNumber) }); + return convertStatusSummary(resp.status) as WorldStateResponse[T]; + } + + case WorldStateMessageType.UNWIND_BLOCKS: { + const b = body as WorldStateRequest[WorldStateMessageType.UNWIND_BLOCKS]; + const resp = await this.api.wsdbUnwindBlocks({ toblocknumber: Number(b.toBlockNumber) }); + return convertStatusFull(resp.status) as WorldStateResponse[T]; + } + + case WorldStateMessageType.REMOVE_HISTORICAL_BLOCKS: { + const b = body as WorldStateRequest[WorldStateMessageType.REMOVE_HISTORICAL_BLOCKS]; + const resp = await this.api.wsdbRemoveHistoricalBlocks({ toblocknumber: Number(b.toBlockNumber) }); + return convertStatusFull(resp.status) as WorldStateResponse[T]; + } + + // ——— Status ——— + + case WorldStateMessageType.GET_STATUS: { + const resp = await this.api.wsdbGetStatus({}); + return convertStatusSummary(resp.status) as WorldStateResponse[T]; + } + + // ——— Checkpoints ——— + + case WorldStateMessageType.CREATE_CHECKPOINT: { + const b = body as WorldStateRequest[WorldStateMessageType.CREATE_CHECKPOINT]; + await this.api.wsdbCreateCheckpoint({ forkid: b.forkId }); + const depth = (this.checkpointDepths.get(b.forkId) ?? 0) + 1; + this.checkpointDepths.set(b.forkId, depth); + return { depth } as WorldStateResponse[T]; + } + + case WorldStateMessageType.COMMIT_CHECKPOINT: { + const b = body as WorldStateRequest[WorldStateMessageType.COMMIT_CHECKPOINT]; + await this.api.wsdbCommitCheckpoint({ forkid: b.forkId }); + const depth = Math.max(0, (this.checkpointDepths.get(b.forkId) ?? 0) - 1); + this.checkpointDepths.set(b.forkId, depth); + return undefined as WorldStateResponse[T]; + } + + case WorldStateMessageType.REVERT_CHECKPOINT: { + const b = body as WorldStateRequest[WorldStateMessageType.REVERT_CHECKPOINT]; + await this.api.wsdbRevertCheckpoint({ forkid: b.forkId }); + const depth = Math.max(0, (this.checkpointDepths.get(b.forkId) ?? 0) - 1); + this.checkpointDepths.set(b.forkId, depth); + return undefined as WorldStateResponse[T]; + } + + case WorldStateMessageType.COMMIT_ALL_CHECKPOINTS: { + const b = body as WorldStateRequest[WorldStateMessageType.COMMIT_ALL_CHECKPOINTS]; + const targetDepth = b.depth ?? 0; + const currentDepth = this.checkpointDepths.get(b.forkId) ?? 0; + if (targetDepth === 0) { + // Commit everything — use the bulk operation + await this.api.wsdbCommitAllCheckpoints({ forkid: b.forkId }); + } else { + // Commit one level at a time down to target depth + for (let d = currentDepth; d > targetDepth; d--) { + await this.api.wsdbCommitCheckpoint({ forkid: b.forkId }); + } + } + this.checkpointDepths.set(b.forkId, targetDepth); + return undefined as WorldStateResponse[T]; + } + + case WorldStateMessageType.REVERT_ALL_CHECKPOINTS: { + const b = body as WorldStateRequest[WorldStateMessageType.REVERT_ALL_CHECKPOINTS]; + const targetDepth = b.depth ?? 0; + const currentDepth = this.checkpointDepths.get(b.forkId) ?? 0; + if (targetDepth === 0) { + // Revert everything — use the bulk operation + await this.api.wsdbRevertAllCheckpoints({ forkid: b.forkId }); + } else { + // Revert one level at a time down to target depth + for (let d = currentDepth; d > targetDepth; d--) { + await this.api.wsdbRevertCheckpoint({ forkid: b.forkId }); + } + } + this.checkpointDepths.set(b.forkId, targetDepth); + return undefined as WorldStateResponse[T]; + } + + // ——— Misc ——— + + case WorldStateMessageType.COPY_STORES: { + const b = body as WorldStateRequest[WorldStateMessageType.COPY_STORES]; + await this.api.wsdbCopyStores({ dstpath: b.dstPath, compact: b.compact }); + return undefined as WorldStateResponse[T]; + } + + case WorldStateMessageType.CLOSE: { + await this.api.wsdbShutdown({}); + return undefined as WorldStateResponse[T]; + } + + default: + throw new Error(`Unknown message type: ${messageType}`); + } + } +} + +/** + * Helper to create WsdbOptions from standard world state config. + * Returns the options needed to construct a WsdbBackend. + */ +export function getWsdbOptions( + dataDir: string, + wsTreeMapSizes: WorldStateTreeMapSizes, +): { + treeHeights: Record; + treePrefill: Record; + mapSizes: Record; + initialHeaderGeneratorPoint: number; +} { + return { + treeHeights: { + [MerkleTreeId.NULLIFIER_TREE]: NULLIFIER_TREE_HEIGHT, + [MerkleTreeId.NOTE_HASH_TREE]: NOTE_HASH_TREE_HEIGHT, + [MerkleTreeId.PUBLIC_DATA_TREE]: PUBLIC_DATA_TREE_HEIGHT, + [MerkleTreeId.L1_TO_L2_MESSAGE_TREE]: L1_TO_L2_MSG_TREE_HEIGHT, + [MerkleTreeId.ARCHIVE]: ARCHIVE_HEIGHT, + }, + treePrefill: { + [MerkleTreeId.NULLIFIER_TREE]: 2 * MAX_NULLIFIERS_PER_TX, + [MerkleTreeId.PUBLIC_DATA_TREE]: 2 * MAX_TOTAL_PUBLIC_DATA_UPDATE_REQUESTS_PER_TX, + }, + mapSizes: { + [MerkleTreeId.NULLIFIER_TREE]: wsTreeMapSizes.nullifierTreeMapSizeKb, + [MerkleTreeId.NOTE_HASH_TREE]: wsTreeMapSizes.noteHashTreeMapSizeKb, + [MerkleTreeId.PUBLIC_DATA_TREE]: wsTreeMapSizes.publicDataTreeMapSizeKb, + [MerkleTreeId.L1_TO_L2_MESSAGE_TREE]: wsTreeMapSizes.messageTreeMapSizeKb, + [MerkleTreeId.ARCHIVE]: wsTreeMapSizes.archiveTreeMapSizeKb, + }, + initialHeaderGeneratorPoint: DomainSeparator.BLOCK_HEADER_HASH, + }; +} diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 8ebb1f9b47db..9e6d581588d4 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -2299,6 +2299,7 @@ __metadata: resolution: "@aztec/world-state@workspace:world-state" dependencies: "@aztec/archiver": "workspace:^" + "@aztec/bb.js": "workspace:^" "@aztec/constants": "workspace:^" "@aztec/foundation": "workspace:^" "@aztec/kv-store": "workspace:^" @@ -2312,6 +2313,7 @@ __metadata: "@typescript/native-preview": "npm:7.0.0-dev.20260113.1" jest: "npm:^30.0.0" jest-mock-extended: "npm:^4.0.0" + msgpackr: "npm:^1.11.2" ts-node: "npm:^10.9.1" tslib: "npm:^2.4.0" typescript: "npm:^5.3.3"