Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
26 changes: 26 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,29 @@ jobs:
- uses: dsherret/rust-toolchain-file@v1
- uses: Swatinem/rust-cache@v2
- run: cargo clippy --all-targets --all-features -- -D clippy::correctness

fmt:
name: rustfmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# rustfmt.toml uses nightly-only options (imports_granularity), so check with nightly.
- uses: dtolnay/rust-toolchain@nightly
with:
components: rustfmt
- run: cargo +nightly fmt --all -- --check

test:
name: Test
runs-on: ubuntu-latest
if: github.event_name == 'push' || !github.event.pull_request.draft
steps:
- uses: actions/checkout@v4
with:
lfs: true
- uses: dsherret/rust-toolchain-file@v1
- uses: Swatinem/rust-cache@v2
- name: Fetch LFS files
run: git lfs fetch --all && git lfs checkout
- name: Run tests
run: cargo test --all-features --jobs 2
57 changes: 28 additions & 29 deletions crates/archive/src/archive.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use crate::cli::{Cli, NetworkKind};
use crate::fs::create_fs;
use crate::ingest::ingest_from_service;
use crate::layout::Layout;
use crate::metrics;
use crate::proc::Proc;
use crate::server::run_server;
use crate::writer::Writer;
use std::time::Duration;

use anyhow::{ensure, Context};
use prometheus_client::registry::Registry;
use sqd_data::bitcoin::tables::BitcoinChunkBuilder;
use sqd_data::evm::tables::EvmChunkBuilder;
use sqd_data::hyperliquid_fills::tables::HyperliquidFillsChunkBuilder;
use sqd_data::hyperliquid_replica_cmds::tables::HyperliquidReplicaCmdsChunkBuilder;
use sqd_data::solana::tables::SolanaChunkBuilder;
use sqd_data::tron::tables::TronChunkBuilder;
use sqd_data::{
bitcoin::tables::BitcoinChunkBuilder, evm::tables::EvmChunkBuilder,
hyperliquid_fills::tables::HyperliquidFillsChunkBuilder,
hyperliquid_replica_cmds::tables::HyperliquidReplicaCmdsChunkBuilder, solana::tables::SolanaChunkBuilder,
tron::tables::TronChunkBuilder
};
use sqd_primitives::BlockNumber;
use std::time::Duration;

use crate::{
cli::{Cli, NetworkKind},
fs::create_fs,
ingest::ingest_from_service,
layout::Layout,
metrics,
proc::Proc,
server::run_server,
writer::Writer
};

pub async fn run(args: Cli) -> anyhow::Result<()> {
ensure!(
Expand All @@ -27,12 +30,9 @@ pub async fn run(args: Cli) -> anyhow::Result<()> {
let fs = create_fs(&args.dest).await?;
let layout = Layout::new(fs.clone());

let chunk_tracker = layout.create_chunk_tracker(
&chunk_check,
args.top_dir_size,
args.first_block,
args.last_block
).await?;
let chunk_tracker = layout
.create_chunk_tracker(&chunk_check, args.top_dir_size, args.first_block, args.last_block)
.await?;

if let Some(last_block) = args.last_block {
if chunk_tracker.next_block() > last_block {
Expand Down Expand Up @@ -78,32 +78,31 @@ pub async fn run(args: Cli) -> anyhow::Result<()> {
let attach_idx_field = args.attach_idx_field;
let write_task = tokio::spawn(async move {
let mut writer = Writer::new(fs, chunk_receiver, attach_idx_field);
writer.start().await
writer.start().await
});

match write_task.await.context("write task panicked") {
Ok(Ok(_)) => {
proc_task.await.context("processing task panicked")??;
},
}
Ok(Err(err)) => {
proc_task.abort();
return Err(err)
},
return Err(err);
}
Err(err) => {
proc_task.abort();
return Err(err)
return Err(err);
}
}

Ok(())
}


fn chunk_check(filelist: &[String]) -> bool {
for file in filelist {
if file.starts_with("blocks.parquet") {
return true;
}
}
false
}
}
8 changes: 3 additions & 5 deletions crates/archive/src/chunk_writer.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use sqd_data_core::{BlockChunkBuilder, ChunkProcessor, PreparedChunk};
use sqd_dataset::DatasetDescriptionRef;


pub struct ChunkWriter<B> {
chunk_builder: B,
processor: ChunkProcessor,
memory_threshold: usize,
memory_threshold: usize
}


impl<B: BlockChunkBuilder> ChunkWriter<B> {
pub fn new(chunk_builder: B) -> anyhow::Result<Self> {
Ok(Self {
processor: chunk_builder.new_chunk_processor()?,
chunk_builder,
memory_threshold: 40 * 1024 * 1024,
memory_threshold: 40 * 1024 * 1024
})
}

Expand Down Expand Up @@ -52,4 +50,4 @@ impl<B: BlockChunkBuilder> ChunkWriter<B> {
let new_processor = self.chunk_builder.new_chunk_processor()?;
std::mem::replace(&mut self.processor, new_processor).finish()
}
}
}
4 changes: 1 addition & 3 deletions crates/archive/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use clap::{value_parser, Parser, ValueEnum};
use sqd_primitives::BlockNumber;
use url::Url;


#[derive(ValueEnum, Clone, Debug)]
pub enum NetworkKind {
Bitcoin,
Expand All @@ -13,7 +12,6 @@ pub enum NetworkKind {
Tron
}


#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
pub struct Cli {
Expand Down Expand Up @@ -67,5 +65,5 @@ pub struct Cli {

/// Whether to attach an index field to each record
#[arg(long)]
pub attach_idx_field: bool,
pub attach_idx_field: bool
}
19 changes: 9 additions & 10 deletions crates/archive/src/fs/local.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
use crate::fs::{FSRef, Fs};
use std::{
path::{Path, PathBuf},
sync::Arc
};

use async_trait::async_trait;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use crate::fs::{FSRef, Fs};

pub struct LocalFs {
root: PathBuf,
root: PathBuf
}


impl LocalFs {
pub fn new(root: impl Into<PathBuf>) -> LocalFs {
Self { root: root.into() }
}
}


#[async_trait]
impl Fs for LocalFs {
fn cd(&self, path: &str) -> FSRef {
Arc::new(Self::new(
self.root.join(path)
))
Arc::new(Self::new(self.root.join(path)))
}

async fn ls(&self) -> anyhow::Result<Vec<String>> {
Expand Down Expand Up @@ -57,4 +56,4 @@ impl Fs for LocalFs {
}
Ok(())
}
}
}
20 changes: 7 additions & 13 deletions crates/archive/src/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,42 @@
use crate::fs::local::LocalFs;
use crate::fs::s3::S3Fs;
use std::{path::Path, sync::Arc};

use anyhow::{anyhow, bail, ensure};
use async_trait::async_trait;
use std::path::Path;
use std::sync::Arc;
use url::Url;

use crate::fs::{local::LocalFs, s3::S3Fs};

pub mod local;
pub mod s3;


pub type FSRef = Arc<dyn Fs + Sync + Send>;


#[async_trait]
pub trait Fs {
fn cd(&self, path: &str) -> FSRef;

async fn ls(&self) -> anyhow::Result<Vec<String>>;

async fn move_local(&self, local_src: &Path, dest: &str) -> anyhow::Result<()>;

async fn delete(&self, path: &str) -> anyhow::Result<()>;
}


pub async fn create_fs(url: &str) -> anyhow::Result<FSRef> {
match Url::parse(url) {
Ok(u) => {
if u.scheme() == "s3" {
ensure!(!u.cannot_be_a_base(), "invalid s3 url - {}", url);

let bucket = u.host_str().ok_or_else(|| {
anyhow!("bucket is missing in {}", url)
})?;
let bucket = u.host_str().ok_or_else(|| anyhow!("bucket is missing in {}", url))?;

let mut config_loader = aws_config::from_env();
if let Ok(s3_endpoint) = std::env::var("AWS_S3_ENDPOINT") {
config_loader = config_loader.endpoint_url(s3_endpoint);
}
let config = config_loader.load().await;
let s3_client = aws_sdk_s3::Client::new(&config);

let fs = S3Fs::new(s3_client, bucket.to_string()).cd(u.path());
Ok(fs)
} else {
Expand Down
Loading
Loading