Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions crates/hotblocks/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub fn build_api(app: App) -> Router {
.route("/datasets/{id}/finalized-stream", post(finalized_stream))
.route("/datasets/{id}/head", get(get_head))
.route("/datasets/{id}/finalized-head", get(get_finalized_head))
.route("/datasets/{id}/hashes/{hash}/block", get(get_block_by_hash))
.route("/datasets/{id}/retention", get(get_retention).post(set_retention))
.route("/datasets/{id}/status", get(get_status))
.route("/datasets/{id}/metadata", get(get_metadata))
Expand Down Expand Up @@ -362,6 +363,55 @@ async fn get_head(
})
}

async fn get_block_by_hash(
Extension(app): Extension<AppRef>,
Extension(client_id): Extension<ClientId>,
Path((dataset_id, hash)): Path<(DatasetId, String)>
) -> impl IntoResponse {
// `get_dataset!` returns from the enclosing fn, which doesn't work inside the
// synchronous `with_response` closure, and the lookup must be `.await`ed
// first - so existence/validation happen up front and the awaited result is
// handed to `with_response`.

// Reject obviously-invalid lengths before touching the DB. Crypto hashes are
// ~64-66 chars (hex, EVM/Bitcoin/Tron) up to ~88 (base58, Solana); 256 is a
// generous ceiling that still cuts off megabyte-sized URLs.
if hash.is_empty() || hash.len() > 256 {
return ResponseWithMetadata::new()
.with_client_id(&client_id)
.with_dataset_id(dataset_id)
.with_endpoint("/hashes/{hash}/block")
.with_response(|| text!(StatusCode::BAD_REQUEST, "invalid hash length"));
}

let dataset = match app.data_service.get_dataset(dataset_id) {
Ok(ds) => ds,
Err(err) => {
return ResponseWithMetadata::new()
.with_client_id(&client_id)
.with_dataset_id(dataset_id)
.with_endpoint("/hashes/{hash}/block")
.with_response(|| text!(StatusCode::NOT_FOUND, "{}", err));
}
};

let response = match dataset.get_block_by_hash(hash).await {
Ok(Some(block_ref)) => json_ok!(block_ref),
Ok(None) => text!(StatusCode::NOT_FOUND, "block not found"),
Err(err) => {
// Terse body; the full chain (and any backtrace from `{:?}`) goes to the log.
error!(error = ?err, dataset_id = %dataset_id, "get_block_by_hash failed");
text!(StatusCode::INTERNAL_SERVER_ERROR, "internal error")
}
};

ResponseWithMetadata::new()
.with_client_id(&client_id)
.with_dataset_id(dataset_id)
.with_endpoint("/hashes/{hash}/block")
.with_response(|| response)
}

async fn get_retention(
Extension(app): Extension<AppRef>,
Extension(client_id): Extension<ClientId>,
Expand Down
16 changes: 15 additions & 1 deletion crates/hotblocks/src/dataset_controller/dataset_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
};

pub struct DatasetController {
db: DBRef,
dataset_id: DatasetId,
dataset_kind: DatasetKind,
retention_sender: tokio::sync::watch::Sender<RetentionStrategy>,
Expand Down Expand Up @@ -71,9 +72,10 @@ impl DatasetController {
let task = tokio::spawn(ctl.run(write).in_current_span());

let compaction_task =
tokio::spawn(compaction_loop(db, dataset_id, compaction_enabled_receiver).in_current_span());
tokio::spawn(compaction_loop(db.clone(), dataset_id, compaction_enabled_receiver).in_current_span());

Ok(Self {
db,
dataset_id,
dataset_kind,
retention_sender,
Expand Down Expand Up @@ -105,6 +107,18 @@ impl DatasetController {
self.head_receiver.borrow().as_ref().map(|h| h.number)
}

/// Resolves a block hash to its `BlockRef` via the storage index.
///
/// A point lookup against RocksDB, run on the blocking pool (same pattern as
/// `Ctl::new_write_ctx`). `Ok(None)` means the hash is not in the index.
pub async fn get_block_by_hash(&self, hash: String) -> anyhow::Result<Option<BlockRef>> {
let db = self.db.clone();
let dataset_id = self.dataset_id;
tokio::task::spawn_blocking(move || db.snapshot().find_block_by_hash(dataset_id, &hash))
.await
.context("get_block_by_hash task panicked")?
}

pub fn enable_compaction(&self, yes: bool) {
let _ = self.compaction_enabled_sender.send(yes);
}
Expand Down
51 changes: 51 additions & 0 deletions crates/storage/src/db/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,57 @@ impl Display for ChunkId {
}
}

/// Key for the `hash -> block number` index stored in `CF_BLOCK_HASHES`.
///
/// Layout: `dataset_id (48 bytes) || hash UTF-8 bytes`. The hash is stored
/// exactly as it appears in the Arrow `hash` column (no normalization), so the
/// encoding stays chain-agnostic.
pub(crate) struct BlockHashIndexKey {
bytes: Vec<u8>
}

impl BlockHashIndexKey {
pub fn new(dataset_id: DatasetId, hash: &str) -> Self {
let mut bytes = Vec::with_capacity(48 + hash.len());
bytes.extend_from_slice(dataset_id.as_ref());
bytes.extend_from_slice(hash.as_bytes());
Self { bytes }
}

/// The `[start, end)` key range covering every entry of `dataset_id`. Lets a
/// whole dataset's index be dropped with a single `delete_range_cf` instead
/// of one `delete_cf` per block. Relies on the `dataset_id` being a
/// fixed-length (48-byte) prefix, so no other dataset's keys fall inside.
pub fn dataset_range(dataset_id: DatasetId) -> (Vec<u8>, Vec<u8>) {
let start = dataset_id.as_ref().to_vec();
let end = prefix_upper_bound(&start);
(start, end)
}
}

impl AsRef<[u8]> for BlockHashIndexKey {
fn as_ref(&self) -> &[u8] {
&self.bytes
}
}

/// Smallest byte string strictly greater than every key beginning with
/// `prefix` - the exclusive upper bound of the prefix's key range. Increments
/// the last non-`0xFF` byte, dropping trailing `0xFF`s. A 48-byte `DatasetId`
/// is ASCII/zero-padded, never all-`0xFF`, so this always yields a non-empty
/// bound in practice.
fn prefix_upper_bound(prefix: &[u8]) -> Vec<u8> {
let mut end = prefix.to_vec();
while let Some(last) = end.last_mut() {
if *last < u8::MAX {
*last += 1;
return end;
}
end.pop();
}
end
}

#[derive(BorshSerialize, BorshDeserialize, Debug, Clone, Eq, PartialEq)]
pub enum Chunk {
V0 {
Expand Down
55 changes: 31 additions & 24 deletions crates/storage/src/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,22 @@ use rocksdb::{ColumnFamilyDescriptor, Options as RocksOptions};
use sqd_primitives::Name;

use super::{
data::{Dataset, DatasetId, DatasetKind, DatasetLabel},
read::snapshot::ReadSnapshot
data::{BlockHashIndexKey, Dataset, DatasetId, DatasetKind, DatasetLabel},
read::snapshot::ReadSnapshot,
};
use crate::db::{
ops::{perform_dataset_compaction, CompactionStatus},
read::datasets::list_all_datasets,
write::{ops::deleted_deleted_tables, table_builder::TableBuilder, tx::Tx},
Chunk, DatasetUpdate
Chunk, DatasetUpdate,
};

pub(super) const CF_DATASETS: Name = "DATASETS";
pub(super) const CF_CHUNKS: Name = "CHUNKS";
pub(super) const CF_TABLES: Name = "TABLES";
pub(super) const CF_DIRTY_TABLES: Name = "DIRTY_TABLES";
pub(super) const CF_DELETED_TABLES: Name = "DELETED_TABLES";
pub(super) const CF_BLOCK_HASHES: Name = "BLOCK_HASHES";

pub(super) type RocksDB = rocksdb::OptimisticTransactionDB;
pub(super) type RocksTransaction<'a> = rocksdb::Transaction<'a, RocksDB>;
Expand All @@ -34,7 +35,7 @@ pub struct DatabaseSettings {
data_cache_size: usize,
with_rocksdb_stats: bool,
direct_io: bool,
cache_index_and_filter_blocks: bool
cache_index_and_filter_blocks: bool,
}

impl Default for DatabaseSettings {
Expand All @@ -44,7 +45,7 @@ impl Default for DatabaseSettings {
data_cache_size: 256,
with_rocksdb_stats: false,
direct_io: false,
cache_index_and_filter_blocks: false
cache_index_and_filter_blocks: false,
}
}
}
Expand Down Expand Up @@ -144,8 +145,9 @@ impl DatabaseSettings {
ColumnFamilyDescriptor::new(CF_CHUNKS, self.chunks_cf_options()),
ColumnFamilyDescriptor::new(CF_TABLES, self.tables_cf_options()),
ColumnFamilyDescriptor::new(CF_DIRTY_TABLES, self.cf_default_options()),
ColumnFamilyDescriptor::new(CF_DELETED_TABLES, self.cf_default_options())
]
ColumnFamilyDescriptor::new(CF_DELETED_TABLES, self.cf_default_options()),
ColumnFamilyDescriptor::new(CF_BLOCK_HASHES, self.cf_default_options()),
],
)?;

Ok(Database { db, options })
Expand All @@ -154,7 +156,7 @@ impl DatabaseSettings {

pub struct Database {
db: RocksDB,
options: RocksOptions
options: RocksOptions,
}

impl Database {
Expand All @@ -167,8 +169,8 @@ impl Database {
&DatasetLabel::V0 {
kind,
version: 0,
finalized_head: None
}
finalized_head: None,
},
)
})
}
Expand All @@ -190,8 +192,8 @@ impl Database {
&DatasetLabel::V0 {
kind,
version: 0,
finalized_head: None
}
finalized_head: None,
},
)
}
})
Expand All @@ -211,7 +213,7 @@ impl Database {

pub fn update_dataset<F, R>(&self, dataset_id: DatasetId, mut cb: F) -> anyhow::Result<R>
where
F: FnMut(&mut DatasetUpdate<'_>) -> anyhow::Result<R>
F: FnMut(&mut DatasetUpdate<'_>) -> anyhow::Result<R>,
{
Tx::new(&self.db).run(|tx| {
let mut upd = DatasetUpdate::new(tx, dataset_id)?;
Expand All @@ -235,33 +237,38 @@ impl Database {
dataset_id: DatasetId,
max_chunk_size: Option<usize>,
write_amplification_limit: Option<f64>,
compaction_len_limit: Option<usize>
compaction_len_limit: Option<usize>,
) -> anyhow::Result<CompactionStatus> {
perform_dataset_compaction(
&self.db,
dataset_id,
max_chunk_size,
write_amplification_limit,
compaction_len_limit
compaction_len_limit,
)
}

pub fn delete_dataset(&self, dataset_id: DatasetId) -> anyhow::Result<()> {
// Drop the whole index for this dataset with one range tombstone over
// its `dataset_id` prefix. Kept out of the transaction below because
// `delete_range` isn't allowed inside a RocksDB tx; a crash in between
// leaves chunks without index entries (hashes 404 until re-indexed),
// not corruption, and the next startup retries.
let (start, end) = BlockHashIndexKey::dataset_range(dataset_id);
self.db
.delete_range_cf(self.db.cf_handle(CF_BLOCK_HASHES).unwrap(), start, end)?;

// Metadata is removed atomically in one transaction, so the dataset is
// never observed half-deleted. `find_label_for_update` takes the lock.
Tx::new(&self.db).run(|tx| {
let label = tx.find_label_for_update(dataset_id)?;
if label.is_none() {
if tx.find_label_for_update(dataset_id)?.is_none() {
return Ok(());
}

let chunks = tx.list_chunks(dataset_id, 0, None);
for chunk_result in chunks {
for chunk_result in tx.list_chunks(dataset_id, 0, None) {
let chunk = chunk_result?;
tx.delete_chunk(dataset_id, &chunk)?;
}

tx.delete_label(dataset_id)?;

Ok(())
tx.delete_label(dataset_id)
})?;

self.cleanup()?;
Expand Down
73 changes: 73 additions & 0 deletions crates/storage/src/db/read/blocks_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,76 @@ fn find_block_row<BN: Copy + Ord>(numbers: &[BN], block: BN) -> Option<usize> {
.min_by_key(|e| e.1)
.map(|e| e.0)
}

/// Streams all `(block_number, hash)` pairs of a `blocks` table.
///
/// The `number` and `hash` columns are read in batches of [`BLOCK_HASH_BATCH_SIZE`]
/// rows so that peak memory stays `O(batch)` rather than `O(num_blocks)` even for
/// large compacted chunks. `visit` is called once per row.
///
/// Schema contract (mirrors [`get_parent_block_hash`]): `number` must be `UInt32`
/// or `UInt64` and `hash` must be `Utf8`; anything else is a hard error so that a
/// future schema change surfaces loudly instead of indexing garbage.
pub fn for_each_block_hash<S: KvRead + Sync>(
blocks_table: &TableReader<S>,
mut visit: impl FnMut(BlockNumber, &str) -> anyhow::Result<()>
) -> anyhow::Result<()> {
const BLOCK_HASH_BATCH_SIZE: usize = 4096;

let schema = blocks_table.schema();

let number_idx = schema.index_of("number")?;
let number_type = schema.field(number_idx).data_type().clone();
match number_type {
DataType::UInt32 | DataType::UInt64 => {}
ref ty => bail!("'number' column has unexpected data type - {}", ty)
}

let hash_idx = schema.index_of("hash")?;
let hash_type = schema.field(hash_idx).data_type().clone();
if hash_type != DataType::Utf8 {
bail!("'hash' column has unexpected data type - {}", hash_type)
}

let num_rows = blocks_table.num_rows();
let mut number_reader = blocks_table.create_column_reader(number_idx)?;
let mut hash_reader = blocks_table.create_column_reader(hash_idx)?;

let mut offset = 0;
while offset < num_rows {
let len = std::cmp::min(BLOCK_HASH_BATCH_SIZE, num_rows - offset);

let numbers = {
let mut builder = AnyBuilder::new(&number_type);
number_reader.read_slice(&mut builder, offset, len)?;
builder.finish()
};

let hashes = {
let mut builder = AnyBuilder::new(&hash_type);
hash_reader.read_slice(&mut builder, offset, len)?;
builder.finish()
};
let hashes = hashes.as_string::<i32>();

match numbers.data_type() {
DataType::UInt32 => {
let numbers = numbers.as_primitive::<UInt32Type>().values();
for i in 0..len {
visit(numbers[i] as BlockNumber, hashes.value(i))?;
}
}
DataType::UInt64 => {
let numbers = numbers.as_primitive::<UInt64Type>().values();
for i in 0..len {
visit(numbers[i], hashes.value(i))?;
}
}
_ => unreachable!("'number' column type was validated above")
}

offset += len;
}

Ok(())
}
Loading
Loading