diff --git a/crates/node/src/builder.rs b/crates/node/src/builder.rs index 3995070..68610f8 100644 --- a/crates/node/src/builder.rs +++ b/crates/node/src/builder.rs @@ -5,7 +5,11 @@ use eyre::OptionExt; use signet_blobber::CacheHandle; use signet_block_processor::AliasOracleFactory; use signet_cold::BlockData; -use signet_hot::db::{HotDbRead, UnsafeDbWrite}; +use signet_hot::{ + db::{HotDbRead, UnsafeDbWrite}, + model::HotKvWrite, + tables, +}; use signet_node_config::SignetNodeConfig; use signet_node_types::HostNotifier; use signet_rpc::{ServeConfig, StorageRpcConfig}; @@ -177,6 +181,23 @@ where self.notifier.as_ref().ok_or_eyre("Notifier must be set")?; let storage = self.storage.as_ref().ok_or_eyre("Storage must be set")?; + // Ensure all hot storage tables exist. On a fresh MDBX database, + // named tables must be created in a write transaction before + // read-only transactions can open them. + { + let writer = storage.hot().writer()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.queue_create::()?; + writer.raw_commit()?; + } + // Load genesis into hot storage if absent. let reader = storage.reader()?; let has_hot_genesis = reader.has_block(0)?; diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index ea045fc..ff8fc24 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -11,7 +11,7 @@ use signet_rpc::{ ChainNotifier, NewBlockNotification, RemovedBlock, ReorgNotification, RpcServerGuard, ServeConfig, StorageRpcConfig, }; -use signet_storage::{DrainedBlock, HistoryRead, HotKv, HotKvRead, UnifiedStorage}; +use signet_storage::{DrainedBlock, HistoryRead, HotKv, HotKvRead, StorageError, UnifiedStorage}; use signet_types::{PairedHeights, constants::SignetSystemConstants}; use std::{fmt, sync::Arc}; use tokio::sync::watch; @@ -282,7 +282,15 @@ where ); let executed = processor.process_block(block_extracts).await?; self.notify_new_block(&executed); - self.storage.append_blocks(vec![executed])?; + // Cold backpressure is non-fatal: hot storage is authoritative + // and cold will catch up once the channel drains. + match self.storage.append_blocks(vec![executed]) { + Ok(()) => {} + Err(StorageError::Cold(e)) => { + tracing::debug!(%e, "cold storage backpressure, hot storage is ahead"); + } + Err(e) => return Err(e.into()), + } processed = true; } Ok(processed)