diff --git a/crates/hotblocks/src/api.rs b/crates/hotblocks/src/api.rs index c5cb4fe..03c32cd 100644 --- a/crates/hotblocks/src/api.rs +++ b/crates/hotblocks/src/api.rs @@ -60,6 +60,7 @@ pub fn build_api(app: App) -> Router { .route("/datasets/{id}/finalized-stream", post(finalized_stream)) .route("/datasets/{id}/head", get(get_head)) .route("/datasets/{id}/finalized-head", get(get_finalized_head)) + .route("/datasets/{id}/hashes/{hash}/block", get(get_block_by_hash)) .route("/datasets/{id}/retention", get(get_retention).post(set_retention)) .route("/datasets/{id}/status", get(get_status)) .route("/datasets/{id}/metadata", get(get_metadata)) @@ -362,6 +363,55 @@ async fn get_head( }) } +async fn get_block_by_hash( + Extension(app): Extension, + Extension(client_id): Extension, + Path((dataset_id, hash)): Path<(DatasetId, String)> +) -> impl IntoResponse { + // `get_dataset!` returns from the enclosing fn, which doesn't work inside the + // synchronous `with_response` closure, and the lookup must be `.await`ed + // first - so existence/validation happen up front and the awaited result is + // handed to `with_response`. + + // Reject obviously-invalid lengths before touching the DB. Crypto hashes are + // ~64-66 chars (hex, EVM/Bitcoin/Tron) up to ~88 (base58, Solana); 256 is a + // generous ceiling that still cuts off megabyte-sized URLs. + if hash.is_empty() || hash.len() > 256 { + return ResponseWithMetadata::new() + .with_client_id(&client_id) + .with_dataset_id(dataset_id) + .with_endpoint("/hashes/{hash}/block") + .with_response(|| text!(StatusCode::BAD_REQUEST, "invalid hash length")); + } + + let dataset = match app.data_service.get_dataset(dataset_id) { + Ok(ds) => ds, + Err(err) => { + return ResponseWithMetadata::new() + .with_client_id(&client_id) + .with_dataset_id(dataset_id) + .with_endpoint("/hashes/{hash}/block") + .with_response(|| text!(StatusCode::NOT_FOUND, "{}", err)); + } + }; + + let response = match dataset.get_block_by_hash(hash).await { + Ok(Some(block_ref)) => json_ok!(block_ref), + Ok(None) => text!(StatusCode::NOT_FOUND, "block not found"), + Err(err) => { + // Terse body; the full chain (and any backtrace from `{:?}`) goes to the log. + error!(error = ?err, dataset_id = %dataset_id, "get_block_by_hash failed"); + text!(StatusCode::INTERNAL_SERVER_ERROR, "internal error") + } + }; + + ResponseWithMetadata::new() + .with_client_id(&client_id) + .with_dataset_id(dataset_id) + .with_endpoint("/hashes/{hash}/block") + .with_response(|| response) +} + async fn get_retention( Extension(app): Extension, Extension(client_id): Extension, diff --git a/crates/hotblocks/src/dataset_controller/dataset_controller.rs b/crates/hotblocks/src/dataset_controller/dataset_controller.rs index 8568b79..cf206bd 100644 --- a/crates/hotblocks/src/dataset_controller/dataset_controller.rs +++ b/crates/hotblocks/src/dataset_controller/dataset_controller.rs @@ -18,6 +18,7 @@ use crate::{ }; pub struct DatasetController { + db: DBRef, dataset_id: DatasetId, dataset_kind: DatasetKind, retention_sender: tokio::sync::watch::Sender, @@ -71,9 +72,10 @@ impl DatasetController { let task = tokio::spawn(ctl.run(write).in_current_span()); let compaction_task = - tokio::spawn(compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span()); + tokio::spawn(compaction_loop(db.clone(), dataset_id, compaction_enabled_receiver).in_current_span()); Ok(Self { + db, dataset_id, dataset_kind, retention_sender, @@ -105,6 +107,18 @@ impl DatasetController { self.head_receiver.borrow().as_ref().map(|h| h.number) } + /// Resolves a block hash to its `BlockRef` via the storage index. + /// + /// A point lookup against RocksDB, run on the blocking pool (same pattern as + /// `Ctl::new_write_ctx`). `Ok(None)` means the hash is not in the index. + pub async fn get_block_by_hash(&self, hash: String) -> anyhow::Result> { + let db = self.db.clone(); + let dataset_id = self.dataset_id; + tokio::task::spawn_blocking(move || db.snapshot().find_block_by_hash(dataset_id, &hash)) + .await + .context("get_block_by_hash task panicked")? + } + pub fn enable_compaction(&self, yes: bool) { let _ = self.compaction_enabled_sender.send(yes); } diff --git a/crates/storage/src/db/data.rs b/crates/storage/src/db/data.rs index a4fda75..46937fb 100644 --- a/crates/storage/src/db/data.rs +++ b/crates/storage/src/db/data.rs @@ -99,6 +99,57 @@ impl Display for ChunkId { } } +/// Key for the `hash -> block number` index stored in `CF_BLOCK_HASHES`. +/// +/// Layout: `dataset_id (48 bytes) || hash UTF-8 bytes`. The hash is stored +/// exactly as it appears in the Arrow `hash` column (no normalization), so the +/// encoding stays chain-agnostic. +pub(crate) struct BlockHashIndexKey { + bytes: Vec +} + +impl BlockHashIndexKey { + pub fn new(dataset_id: DatasetId, hash: &str) -> Self { + let mut bytes = Vec::with_capacity(48 + hash.len()); + bytes.extend_from_slice(dataset_id.as_ref()); + bytes.extend_from_slice(hash.as_bytes()); + Self { bytes } + } + + /// The `[start, end)` key range covering every entry of `dataset_id`. Lets a + /// whole dataset's index be dropped with a single `delete_range_cf` instead + /// of one `delete_cf` per block. Relies on the `dataset_id` being a + /// fixed-length (48-byte) prefix, so no other dataset's keys fall inside. + pub fn dataset_range(dataset_id: DatasetId) -> (Vec, Vec) { + let start = dataset_id.as_ref().to_vec(); + let end = prefix_upper_bound(&start); + (start, end) + } +} + +impl AsRef<[u8]> for BlockHashIndexKey { + fn as_ref(&self) -> &[u8] { + &self.bytes + } +} + +/// Smallest byte string strictly greater than every key beginning with +/// `prefix` - the exclusive upper bound of the prefix's key range. Increments +/// the last non-`0xFF` byte, dropping trailing `0xFF`s. A 48-byte `DatasetId` +/// is ASCII/zero-padded, never all-`0xFF`, so this always yields a non-empty +/// bound in practice. +fn prefix_upper_bound(prefix: &[u8]) -> Vec { + let mut end = prefix.to_vec(); + while let Some(last) = end.last_mut() { + if *last < u8::MAX { + *last += 1; + return end; + } + end.pop(); + } + end +} + #[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Eq, PartialEq)] pub enum Chunk { V0 { diff --git a/crates/storage/src/db/db.rs b/crates/storage/src/db/db.rs index 5327185..52920d8 100644 --- a/crates/storage/src/db/db.rs +++ b/crates/storage/src/db/db.rs @@ -6,14 +6,14 @@ use rocksdb::{ColumnFamilyDescriptor, Options as RocksOptions}; use sqd_primitives::Name; use super::{ - data::{Dataset, DatasetId, DatasetKind, DatasetLabel}, - read::snapshot::ReadSnapshot + data::{BlockHashIndexKey, Dataset, DatasetId, DatasetKind, DatasetLabel}, + read::snapshot::ReadSnapshot, }; use crate::db::{ ops::{perform_dataset_compaction, CompactionStatus}, read::datasets::list_all_datasets, write::{ops::deleted_deleted_tables, table_builder::TableBuilder, tx::Tx}, - Chunk, DatasetUpdate + Chunk, DatasetUpdate, }; pub(super) const CF_DATASETS: Name = "DATASETS"; @@ -21,6 +21,7 @@ pub(super) const CF_CHUNKS: Name = "CHUNKS"; pub(super) const CF_TABLES: Name = "TABLES"; pub(super) const CF_DIRTY_TABLES: Name = "DIRTY_TABLES"; pub(super) const CF_DELETED_TABLES: Name = "DELETED_TABLES"; +pub(super) const CF_BLOCK_HASHES: Name = "BLOCK_HASHES"; pub(super) type RocksDB = rocksdb::OptimisticTransactionDB; pub(super) type RocksTransaction<'a> = rocksdb::Transaction<'a, RocksDB>; @@ -34,7 +35,7 @@ pub struct DatabaseSettings { data_cache_size: usize, with_rocksdb_stats: bool, direct_io: bool, - cache_index_and_filter_blocks: bool + cache_index_and_filter_blocks: bool, } impl Default for DatabaseSettings { @@ -44,7 +45,7 @@ impl Default for DatabaseSettings { data_cache_size: 256, with_rocksdb_stats: false, direct_io: false, - cache_index_and_filter_blocks: false + cache_index_and_filter_blocks: false, } } } @@ -144,8 +145,9 @@ impl DatabaseSettings { ColumnFamilyDescriptor::new(CF_CHUNKS, self.chunks_cf_options()), ColumnFamilyDescriptor::new(CF_TABLES, self.tables_cf_options()), ColumnFamilyDescriptor::new(CF_DIRTY_TABLES, self.cf_default_options()), - ColumnFamilyDescriptor::new(CF_DELETED_TABLES, self.cf_default_options()) - ] + ColumnFamilyDescriptor::new(CF_DELETED_TABLES, self.cf_default_options()), + ColumnFamilyDescriptor::new(CF_BLOCK_HASHES, self.cf_default_options()), + ], )?; Ok(Database { db, options }) @@ -154,7 +156,7 @@ impl DatabaseSettings { pub struct Database { db: RocksDB, - options: RocksOptions + options: RocksOptions, } impl Database { @@ -167,8 +169,8 @@ impl Database { &DatasetLabel::V0 { kind, version: 0, - finalized_head: None - } + finalized_head: None, + }, ) }) } @@ -190,8 +192,8 @@ impl Database { &DatasetLabel::V0 { kind, version: 0, - finalized_head: None - } + finalized_head: None, + }, ) } }) @@ -211,7 +213,7 @@ impl Database { pub fn update_dataset(&self, dataset_id: DatasetId, mut cb: F) -> anyhow::Result where - F: FnMut(&mut DatasetUpdate<'_>) -> anyhow::Result + F: FnMut(&mut DatasetUpdate<'_>) -> anyhow::Result, { Tx::new(&self.db).run(|tx| { let mut upd = DatasetUpdate::new(tx, dataset_id)?; @@ -235,33 +237,38 @@ impl Database { dataset_id: DatasetId, max_chunk_size: Option, write_amplification_limit: Option, - compaction_len_limit: Option + compaction_len_limit: Option, ) -> anyhow::Result { perform_dataset_compaction( &self.db, dataset_id, max_chunk_size, write_amplification_limit, - compaction_len_limit + compaction_len_limit, ) } pub fn delete_dataset(&self, dataset_id: DatasetId) -> anyhow::Result<()> { + // Drop the whole index for this dataset with one range tombstone over + // its `dataset_id` prefix. Kept out of the transaction below because + // `delete_range` isn't allowed inside a RocksDB tx; a crash in between + // leaves chunks without index entries (hashes 404 until re-indexed), + // not corruption, and the next startup retries. + let (start, end) = BlockHashIndexKey::dataset_range(dataset_id); + self.db + .delete_range_cf(self.db.cf_handle(CF_BLOCK_HASHES).unwrap(), start, end)?; + + // Metadata is removed atomically in one transaction, so the dataset is + // never observed half-deleted. `find_label_for_update` takes the lock. Tx::new(&self.db).run(|tx| { - let label = tx.find_label_for_update(dataset_id)?; - if label.is_none() { + if tx.find_label_for_update(dataset_id)?.is_none() { return Ok(()); } - - let chunks = tx.list_chunks(dataset_id, 0, None); - for chunk_result in chunks { + for chunk_result in tx.list_chunks(dataset_id, 0, None) { let chunk = chunk_result?; tx.delete_chunk(dataset_id, &chunk)?; } - - tx.delete_label(dataset_id)?; - - Ok(()) + tx.delete_label(dataset_id) })?; self.cleanup()?; diff --git a/crates/storage/src/db/read/blocks_table.rs b/crates/storage/src/db/read/blocks_table.rs index c9a947c..409662d 100644 --- a/crates/storage/src/db/read/blocks_table.rs +++ b/crates/storage/src/db/read/blocks_table.rs @@ -54,3 +54,76 @@ fn find_block_row(numbers: &[BN], block: BN) -> Option { .min_by_key(|e| e.1) .map(|e| e.0) } + +/// Streams all `(block_number, hash)` pairs of a `blocks` table. +/// +/// The `number` and `hash` columns are read in batches of [`BLOCK_HASH_BATCH_SIZE`] +/// rows so that peak memory stays `O(batch)` rather than `O(num_blocks)` even for +/// large compacted chunks. `visit` is called once per row. +/// +/// Schema contract (mirrors [`get_parent_block_hash`]): `number` must be `UInt32` +/// or `UInt64` and `hash` must be `Utf8`; anything else is a hard error so that a +/// future schema change surfaces loudly instead of indexing garbage. +pub fn for_each_block_hash( + blocks_table: &TableReader, + mut visit: impl FnMut(BlockNumber, &str) -> anyhow::Result<()> +) -> anyhow::Result<()> { + const BLOCK_HASH_BATCH_SIZE: usize = 4096; + + let schema = blocks_table.schema(); + + let number_idx = schema.index_of("number")?; + let number_type = schema.field(number_idx).data_type().clone(); + match number_type { + DataType::UInt32 | DataType::UInt64 => {} + ref ty => bail!("'number' column has unexpected data type - {}", ty) + } + + let hash_idx = schema.index_of("hash")?; + let hash_type = schema.field(hash_idx).data_type().clone(); + if hash_type != DataType::Utf8 { + bail!("'hash' column has unexpected data type - {}", hash_type) + } + + let num_rows = blocks_table.num_rows(); + let mut number_reader = blocks_table.create_column_reader(number_idx)?; + let mut hash_reader = blocks_table.create_column_reader(hash_idx)?; + + let mut offset = 0; + while offset < num_rows { + let len = std::cmp::min(BLOCK_HASH_BATCH_SIZE, num_rows - offset); + + let numbers = { + let mut builder = AnyBuilder::new(&number_type); + number_reader.read_slice(&mut builder, offset, len)?; + builder.finish() + }; + + let hashes = { + let mut builder = AnyBuilder::new(&hash_type); + hash_reader.read_slice(&mut builder, offset, len)?; + builder.finish() + }; + let hashes = hashes.as_string::(); + + match numbers.data_type() { + DataType::UInt32 => { + let numbers = numbers.as_primitive::().values(); + for i in 0..len { + visit(numbers[i] as BlockNumber, hashes.value(i))?; + } + } + DataType::UInt64 => { + let numbers = numbers.as_primitive::().values(); + for i in 0..len { + visit(numbers[i], hashes.value(i))?; + } + } + _ => unreachable!("'number' column type was validated above") + } + + offset += len; + } + + Ok(()) +} diff --git a/crates/storage/src/db/read/snapshot.rs b/crates/storage/src/db/read/snapshot.rs index 3165939..52d092c 100644 --- a/crates/storage/src/db/read/snapshot.rs +++ b/crates/storage/src/db/read/snapshot.rs @@ -1,14 +1,14 @@ use std::{collections::BTreeMap, ops::Deref, sync::Arc}; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use parking_lot::Mutex; use rocksdb::{ColumnFamily, ReadOptions}; -use sqd_primitives::{BlockNumber, Name}; +use sqd_primitives::{BlockNumber, BlockRef, Name}; use crate::{ db::{ - data::{Chunk, DatasetId}, - db::{RocksDB, RocksIterator, RocksSnapshot, CF_CHUNKS, CF_DATASETS, CF_TABLES}, + data::{BlockHashIndexKey, Chunk, DatasetId}, + db::{RocksDB, RocksIterator, RocksSnapshot, CF_BLOCK_HASHES, CF_CHUNKS, CF_DATASETS, CF_TABLES}, read::chunk::ChunkIterator, table_id::TableId, DatasetLabel @@ -75,6 +75,31 @@ impl<'a> ReadSnapshot<'a> { self.list_chunks(dataset_id, 0, None).into_reversed().next().transpose() } + /// Resolves a block hash to its `BlockRef` via the `CF_BLOCK_HASHES` index. + /// + /// `Ok(None)` means the hash is not indexed (unknown, or from a chunk that + /// predates the index / a non-indexed dataset kind). + pub fn find_block_by_hash(&self, dataset_id: DatasetId, hash: &str) -> anyhow::Result> { + let key = BlockHashIndexKey::new(dataset_id, hash); + let Some(bytes) = self + .db + .get_pinned_cf_opt(self.cf_handle(CF_BLOCK_HASHES), &key, &self.new_options())? + else { + return Ok(None); + }; + // Defensive on the storage boundary: a wrong length means corruption + // (bit rot, a write-path bug, a downgrade). Returning an error keeps the + // process alive (HTTP 500) instead of panicking into a crash loop. + let arr: [u8; 8] = bytes + .as_ref() + .try_into() + .context("CF_BLOCK_HASHES value has unexpected length, expected 8 bytes")?; + Ok(Some(BlockRef { + number: BlockNumber::from_be_bytes(arr), + hash: hash.to_string() + })) + } + fn new_options(&self) -> ReadOptions { let mut options = ReadOptions::default(); options.set_snapshot(&self.snapshot); diff --git a/crates/storage/src/db/write/dataset_update.rs b/crates/storage/src/db/write/dataset_update.rs index 34006f8..f2e04f8 100644 --- a/crates/storage/src/db/write/dataset_update.rs +++ b/crates/storage/src/db/write/dataset_update.rs @@ -29,7 +29,8 @@ impl<'a> DatasetUpdate<'a> { pub fn insert_chunk(&self, chunk: &Chunk) -> anyhow::Result<()> { self.tx.validate_chunk_insertion(self.dataset_id, chunk)?; - self.tx.write_chunk(self.dataset_id, chunk) + self.tx.write_chunk(self.dataset_id, chunk)?; + self.tx.index_block_hashes(self.dataset_id, chunk) } pub fn insert_fork(&self, chunk: &Chunk) -> anyhow::Result<()> { @@ -47,6 +48,7 @@ impl<'a> DatasetUpdate<'a> { } pub fn delete_chunk(&self, chunk: &Chunk) -> anyhow::Result<()> { + self.tx.unindex_block_hashes(self.dataset_id, chunk)?; self.tx.delete_chunk(self.dataset_id, chunk) } diff --git a/crates/storage/src/db/write/tx.rs b/crates/storage/src/db/write/tx.rs index b7ba820..21f5c0b 100644 --- a/crates/storage/src/db/write/tx.rs +++ b/crates/storage/src/db/write/tx.rs @@ -9,14 +9,17 @@ use rocksdb::ColumnFamily; use sqd_primitives::BlockNumber; use crate::db::{ - data::ChunkId, + data::{BlockHashIndexKey, ChunkId}, db::{ - RocksDB, RocksIterator, RocksTransaction, RocksTransactionOptions, CF_CHUNKS, CF_DATASETS, CF_DELETED_TABLES, - CF_DIRTY_TABLES + RocksDB, RocksIterator, RocksTransaction, RocksTransactionOptions, CF_BLOCK_HASHES, CF_CHUNKS, CF_DATASETS, + CF_DELETED_TABLES, CF_DIRTY_TABLES + }, + read::{ + blocks_table::{for_each_block_hash, get_parent_block_hash}, + chunk::ChunkIterator }, - read::{blocks_table::get_parent_block_hash, chunk::ChunkIterator}, table_id::TableId, - Chunk, DatasetId, DatasetLabel, ReadSnapshot + Chunk, DatasetId, DatasetKind, DatasetLabel, ReadSnapshot }; static GLOBAL_RESTARTS: AtomicU64 = AtomicU64::new(0); @@ -38,6 +41,15 @@ fn record_restart() { LOCAL_RESTARTS.with_borrow_mut(|val| *val = val.wrapping_add(1)) } +/// Whether a dataset of the given kind gets its block hashes indexed in +/// `CF_BLOCK_HASHES`. Currently EVM-only; extend this whitelist (e.g. Bitcoin, +/// Tron) when those chains need hash lookups. Hyperliquid is intentionally +/// excluded - its `hash` is an arbitrary string, not a crypto hash, so it can +/// collide and silently overwrite index entries. +fn is_indexed_kind(kind: DatasetKind) -> bool { + kind == DatasetKind::from_str("evm") +} + pub struct Tx<'a> { db: &'a RocksDB, transaction: RocksTransaction<'a> @@ -131,12 +143,73 @@ impl<'a> Tx<'a> { Ok(()) } + /// Adds every `(hash -> block_number)` pair of `chunk`'s `blocks` table to + /// `CF_BLOCK_HASHES`. Called one level above `write_chunk` (which stays a + /// pure metadata op) whenever a chunk enters a dataset: ingest and fork. + /// + /// No-op unless the dataset kind is whitelisted in [`is_indexed_kind`]. + /// Reads the table through a fresh `ReadSnapshot` (the same pattern as + /// `validate_parent_block_hash`): tables are immutable once `finish()`ed, so + /// this is safe, while the index writes go through `self.transaction` and are + /// thus atomic with the chunk metadata. + pub fn index_block_hashes(&self, dataset_id: DatasetId, chunk: &Chunk) -> anyhow::Result<()> { + let Some(label) = self.find_label_for_update(dataset_id)? else { + return Ok(()); // dataset does not exist - nothing to index + }; + if !is_indexed_kind(label.kind()) { + return Ok(()); + } + + let Some(blocks_table_id) = chunk.tables().get("blocks").copied() else { + return Ok(()); // defensively skip chunks without a blocks table + }; + + let snapshot = ReadSnapshot::new(self.db); + let reader = snapshot.create_table_reader(blocks_table_id)?; + let cf = self.cf_handle(CF_BLOCK_HASHES); + for_each_block_hash(&reader, |number, hash| { + self.transaction + .put_cf(cf, BlockHashIndexKey::new(dataset_id, hash), number.to_be_bytes())?; + Ok(()) + }) + } + + /// Removes every `(hash -> block_number)` pair of `chunk`'s `blocks` table + /// from `CF_BLOCK_HASHES`. Called one level above `delete_chunk` whenever a + /// chunk leaves a dataset: fork overwrite, retention, dataset deletion. + /// + /// Idempotent: `delete_cf` on a missing key is a no-op in RocksDB, so it is + /// safe over chunks that were never indexed (e.g. pre-upgrade chunks, or + /// non-EVM datasets which short-circuit on the kind check). + pub fn unindex_block_hashes(&self, dataset_id: DatasetId, chunk: &Chunk) -> anyhow::Result<()> { + let Some(label) = self.find_label_for_update(dataset_id)? else { + return Ok(()); + }; + if !is_indexed_kind(label.kind()) { + return Ok(()); + } + + let Some(blocks_table_id) = chunk.tables().get("blocks").copied() else { + return Ok(()); + }; + + let snapshot = ReadSnapshot::new(self.db); + let reader = snapshot.create_table_reader(blocks_table_id)?; + let cf = self.cf_handle(CF_BLOCK_HASHES); + for_each_block_hash(&reader, |_number, hash| { + self.transaction + .delete_cf(cf, BlockHashIndexKey::new(dataset_id, hash))?; + Ok(()) + }) + } + pub fn insert_fork(&self, dataset_id: DatasetId, chunk: &Chunk) -> anyhow::Result<()> { let existing = self.list_chunks(dataset_id, 0, None).into_reversed(); for head_result in existing { let head = head_result?; if chunk.first_block() <= head.first_block() { + self.unindex_block_hashes(dataset_id, &head)?; self.delete_chunk(dataset_id, &head)?; } else if head.last_block() + 1 == chunk.first_block() { ensure!( @@ -159,6 +232,7 @@ impl<'a> Tx<'a> { } self.write_chunk(dataset_id, chunk)?; + self.index_block_hashes(dataset_id, chunk)?; Ok(()) } diff --git a/crates/storage/tests/block_hash_index.rs b/crates/storage/tests/block_hash_index.rs new file mode 100644 index 0000000..ca89475 --- /dev/null +++ b/crates/storage/tests/block_hash_index.rs @@ -0,0 +1,249 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use arrow::{ + array::{ArrayRef, RecordBatch, StringArray, UInt64Array}, + datatypes::{DataType, Field, Schema} +}; +use sqd_primitives::BlockRef; +use sqd_storage::{ + db::{Chunk, CompactionStatus, Database, DatabaseSettings, DatasetId, DatasetKind}, + table::write::use_small_buffers +}; +use tempfile::TempDir; + +fn open_db(kind: &str) -> (TempDir, Database, DatasetId) { + // The TempDir guard is returned and kept alive for the whole test so the + // on-disk database isn't removed out from under RocksDB. + let db_dir = tempfile::tempdir().unwrap(); + let db = DatabaseSettings::default().open(db_dir.path()).unwrap(); + let dataset_id = DatasetId::from_str("test-dataset"); + db.create_dataset(dataset_id, DatasetKind::from_str(kind)).unwrap(); + (db_dir, db, dataset_id) +} + +fn setup_evm_db() -> (TempDir, Database, DatasetId) { + open_db("evm") +} + +/// Canonical hash for a block number. +fn block_hash(n: u64) -> String { + format!("0x{:064x}", n) +} + +/// Builds an EVM-shaped chunk: a `blocks` table with `number` (UInt64) and +/// `hash` (Utf8) columns covering `first..=last`, hashes derived via `hash_fn`. +fn make_evm_chunk_with( + db: &Database, + first: u64, + last: u64, + parent_hash: &str, + hash_fn: impl Fn(u64) -> String +) -> Chunk { + let schema = Arc::new(Schema::new(vec![ + Field::new("number", DataType::UInt64, false), + Field::new("hash", DataType::Utf8, false), + ])); + + let numbers: Vec = (first..=last).collect(); + let hashes: Vec = numbers.iter().map(|n| hash_fn(*n)).collect(); + + let number_arr = Arc::new(UInt64Array::from(numbers)) as ArrayRef; + let hash_arr = Arc::new(StringArray::from( + hashes.iter().map(String::as_str).collect::>() + )) as ArrayRef; + + let mut builder = db.new_table_builder(schema.clone()); + let batch = RecordBatch::try_new(schema, vec![number_arr, hash_arr]).unwrap(); + builder.write_record_batch(&batch).unwrap(); + + let mut tables = BTreeMap::new(); + tables.insert("blocks".to_owned(), builder.finish().unwrap()); + + Chunk::V1 { + first_block: first, + last_block: last, + last_block_hash: hash_fn(last), + parent_block_hash: parent_hash.to_owned(), + first_block_time: None, + last_block_time: None, + tables + } +} + +fn make_evm_chunk(db: &Database, first: u64, last: u64, parent_hash: &str) -> Chunk { + make_evm_chunk_with(db, first, last, parent_hash, block_hash) +} + +fn lookup(db: &Database, dataset_id: DatasetId, hash: &str) -> Option { + db.snapshot().find_block_by_hash(dataset_id, hash).unwrap() +} + +fn assert_resolves(db: &Database, dataset_id: DatasetId, n: u64) { + assert_eq!( + lookup(db, dataset_id, &block_hash(n)), + Some(BlockRef { + number: n, + hash: block_hash(n) + }), + "block {} should resolve via its canonical hash", + n + ); +} + +fn assert_absent(db: &Database, dataset_id: DatasetId, hash: &str) { + assert_eq!(lookup(db, dataset_id, hash), None, "hash {} should not resolve", hash); +} + +#[test] +fn index_ingest_and_lookup() { + let (_dir, db, dataset_id) = setup_evm_db(); + + let chunk = make_evm_chunk(&db, 0, 9, "base"); + db.insert_chunk(dataset_id, &chunk).unwrap(); + + for n in 0..=9 { + assert_resolves(&db, dataset_id, n); + } + assert_absent(&db, dataset_id, "0xdeadbeef"); + assert_absent(&db, dataset_id, &block_hash(10)); +} + +#[test] +fn index_large_chunk_spans_multiple_read_batches() { + // > 4096 rows forces `for_each_block_hash` through more than one batch, + // exercising the offset advancement across the batch boundary. + let (_dir, db, dataset_id) = setup_evm_db(); + + let last = 5000; + let chunk = make_evm_chunk(&db, 0, last, "base"); + db.insert_chunk(dataset_id, &chunk).unwrap(); + + for n in [0, 1, 4095, 4096, 4097, last] { + assert_resolves(&db, dataset_id, n); + } +} + +#[test] +fn index_fork_replaces_hashes() { + let (_dir, db, dataset_id) = setup_evm_db(); + + let chunk1 = make_evm_chunk(&db, 0, 9, "base"); + let chunk2 = make_evm_chunk(&db, 10, 19, &block_hash(9)); + db.insert_chunk(dataset_id, &chunk1).unwrap(); + db.insert_chunk(dataset_id, &chunk2).unwrap(); + + // Fork rewrites blocks 10..=19 with different hashes. + let fork = make_evm_chunk_with(&db, 10, 19, &block_hash(9), |n| format!("fork_{}", n)); + db.insert_fork(dataset_id, &fork).unwrap(); + + // chunk1's hashes are untouched. + for n in 0..=9 { + assert_resolves(&db, dataset_id, n); + } + // old canonical hashes of the forked range are gone, forked ones resolve. + for n in 10..=19 { + assert_absent(&db, dataset_id, &block_hash(n)); + assert_eq!( + lookup(&db, dataset_id, &format!("fork_{}", n)), + Some(BlockRef { + number: n, + hash: format!("fork_{}", n) + }) + ); + } +} + +#[test] +fn index_delete_chunk_removes_hashes() { + // Models the retention path (DatasetUpdate::delete_chunk). + let (_dir, db, dataset_id) = setup_evm_db(); + + let chunk1 = make_evm_chunk(&db, 0, 9, "base"); + let chunk2 = make_evm_chunk(&db, 10, 19, &block_hash(9)); + db.insert_chunk(dataset_id, &chunk1).unwrap(); + db.insert_chunk(dataset_id, &chunk2).unwrap(); + + db.update_dataset(dataset_id, |tx| tx.delete_chunk(&chunk1)).unwrap(); + + for n in 0..=9 { + assert_absent(&db, dataset_id, &block_hash(n)); + } + for n in 10..=19 { + assert_resolves(&db, dataset_id, n); + } +} + +#[test] +fn index_delete_dataset_removes_all_hashes() { + let (_dir, db, dataset_id) = setup_evm_db(); + + let chunk1 = make_evm_chunk(&db, 0, 9, "base"); + let chunk2 = make_evm_chunk(&db, 10, 19, &block_hash(9)); + db.insert_chunk(dataset_id, &chunk1).unwrap(); + db.insert_chunk(dataset_id, &chunk2).unwrap(); + + db.delete_dataset(dataset_id).unwrap(); + + assert!(db.get_all_datasets().unwrap().is_empty()); + for n in 0..=19 { + assert_absent(&db, dataset_id, &block_hash(n)); + } +} + +#[test] +fn index_survives_compaction() { + // Regression guard for the "compaction must not touch the index" decision. + // Many small chunks (>= 50) ensure real merging is triggered. + let (_dir, db, dataset_id) = setup_evm_db(); + let _sb = use_small_buffers(); + + let n_chunks = 60u64; + let blocks_per_chunk = 4u64; + let mut parent = "base".to_owned(); + for c in 0..n_chunks { + let first = c * blocks_per_chunk; + let last = first + blocks_per_chunk - 1; + let chunk = make_evm_chunk(&db, first, last, &parent); + db.insert_chunk(dataset_id, &chunk).unwrap(); + parent = block_hash(last); + } + let total_blocks = n_chunks * blocks_per_chunk; + + for n in 0..total_blocks { + assert_resolves(&db, dataset_id, n); + } + + let mut merged = false; + loop { + match db + .perform_dataset_compaction(dataset_id, Some(100), Some(1.25), None) + .unwrap() + { + CompactionStatus::Ok(_) => merged = true, + _ => break + } + } + assert!(merged, "expected compaction to merge at least once"); + + // Sanity: chunks really were merged (fewer than we inserted). + let chunk_count = db.snapshot().list_chunks(dataset_id, 0, None).count(); + assert!(chunk_count < n_chunks as usize, "compaction should reduce chunk count"); + + // The index is untouched: every hash still resolves to the same number. + for n in 0..total_blocks { + assert_resolves(&db, dataset_id, n); + } +} + +#[test] +fn non_evm_dataset_is_not_indexed() { + // Same EVM-shaped blocks table, but a solana dataset -> nothing is indexed. + let (_dir, db, dataset_id) = open_db("solana"); + + let chunk = make_evm_chunk(&db, 0, 9, "base"); + db.insert_chunk(dataset_id, &chunk).unwrap(); + + for n in 0..=9 { + assert_absent(&db, dataset_id, &block_hash(n)); + } +}