use crate::blockstore_meta;
use bincode::{deserialize, serialize};
use byteorder::{BigEndian, ByteOrder};
use columns::{EvmBlockHeader, EvmHeaderIndexByHash, EvmTransactionReceipts};
use evm_state::H256;
use log::*;
use prost::Message;
pub use rocksdb::Direction as IteratorDirection;
use rocksdb::{
self, ColumnFamily, ColumnFamilyDescriptor, DBIterator, DBRawIterator, DBRecoveryMode,
IteratorMode as RocksIteratorMode, Options, WriteBatch as RWriteBatch, DB,
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use solana_runtime::hardened_unpack::UnpackError;
use solana_sdk::{
clock::{Slot, UnixTimestamp},
pubkey::Pubkey,
signature::Signature,
};
use solana_storage_proto::convert::{generated, generated_evm};
use std::{collections::HashMap, fs, marker::PhantomData, path::Path, sync::Arc};
use thiserror::Error;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024;
const META_CF: &str = "meta";
const DEAD_SLOTS_CF: &str = "dead_slots";
const DUPLICATE_SLOTS_CF: &str = "duplicate_slots";
const ERASURE_META_CF: &str = "erasure_meta";
const ORPHANS_CF: &str = "orphans";
const ROOT_CF: &str = "root";
const INDEX_CF: &str = "index";
const DATA_SHRED_CF: &str = "data_shred";
const CODE_SHRED_CF: &str = "code_shred";
const TRANSACTION_STATUS_CF: &str = "transaction_status";
const ADDRESS_SIGNATURES_CF: &str = "address_signatures";
const TRANSACTION_STATUS_INDEX_CF: &str = "transaction_status_index";
const REWARDS_CF: &str = "rewards";
const BLOCKTIME_CF: &str = "blocktime";
const PERF_SAMPLES_CF: &str = "perf_samples";
const EVM_HEADERS: &str = "evm_headers";
const EVM_BLOCK_BY_HASH: &str = "evm_block_by_hash";
const EVM_TRANSACTIONS: &str = "evm_transactions";
#[derive(Error, Debug)]
pub enum BlockstoreError {
ShredForIndexExists,
InvalidShredData(Box<bincode::ErrorKind>),
RocksDb(#[from] rocksdb::Error),
SlotNotRooted,
DeadSlot,
Io(#[from] std::io::Error),
Serialize(#[from] Box<bincode::ErrorKind>),
FsExtraError(#[from] fs_extra::error::Error),
SlotCleanedUp,
UnpackError(#[from] UnpackError),
UnableToSetOpenFileDescriptorLimit,
TransactionStatusSlotMismatch,
EmptyEpochStakes,
NoVoteTimestampsInRange,
ProtobufEncodeError(#[from] prost::EncodeError),
ProtobufDecodeError(#[from] prost::DecodeError),
ParentEntriesUnavailable,
}
pub type Result<T> = std::result::Result<T, BlockstoreError>;
impl std::fmt::Display for BlockstoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "blockstore error")
}
}
pub enum IteratorMode<Index> {
Start,
End,
From(Index, IteratorDirection),
}
pub mod columns {
#[derive(Debug)]
pub struct SlotMeta;
#[derive(Debug)]
pub struct Orphans;
#[derive(Debug)]
pub struct DeadSlots;
#[derive(Debug)]
pub struct DuplicateSlots;
#[derive(Debug)]
pub struct ErasureMeta;
#[derive(Debug)]
pub struct Root;
#[derive(Debug)]
pub struct Index;
#[derive(Debug)]
pub struct ShredData;
#[derive(Debug)]
pub struct ShredCode;
#[derive(Debug)]
pub struct TransactionStatus;
#[derive(Debug)]
pub struct AddressSignatures;
#[derive(Debug)]
pub struct TransactionStatusIndex;
#[derive(Debug)]
pub struct Rewards;
#[derive(Debug)]
pub struct Blocktime;
#[derive(Debug)]
pub struct PerfSamples;
#[derive(Debug)]
pub struct EvmBlockHeader;
#[derive(Debug)]
pub struct EvmHeaderIndexByHash;
#[derive(Debug)]
pub struct EvmTransactionReceipts;
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct EvmTransactionReceiptsIndex {
pub index: u64,
pub hash: H256,
pub block_num: evm_state::BlockNum,
pub slot: Option<Slot>,
}
pub enum AccessType {
PrimaryOnly,
PrimaryOnlyForMaintenance,
TryPrimaryThenSecondary,
}
#[derive(Debug, PartialEq)]
pub enum ActualAccessType {
Primary,
Secondary,
}
#[derive(Debug, Clone)]
pub enum BlockstoreRecoveryMode {
TolerateCorruptedTailRecords,
AbsoluteConsistency,
PointInTime,
SkipAnyCorruptedRecord,
}
impl From<&str> for BlockstoreRecoveryMode {
fn from(string: &str) -> Self {
match string {
"tolerate_corrupted_tail_records" => {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords
}
"absolute_consistency" => BlockstoreRecoveryMode::AbsoluteConsistency,
"point_in_time" => BlockstoreRecoveryMode::PointInTime,
"skip_any_corrupted_record" => BlockstoreRecoveryMode::SkipAnyCorruptedRecord,
bad_mode => panic!("Invalid recovery mode: {}", bad_mode),
}
}
}
impl From<BlockstoreRecoveryMode> for DBRecoveryMode {
fn from(brm: BlockstoreRecoveryMode) -> Self {
match brm {
BlockstoreRecoveryMode::TolerateCorruptedTailRecords => {
DBRecoveryMode::TolerateCorruptedTailRecords
}
BlockstoreRecoveryMode::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency,
BlockstoreRecoveryMode::PointInTime => DBRecoveryMode::PointInTime,
BlockstoreRecoveryMode::SkipAnyCorruptedRecord => {
DBRecoveryMode::SkipAnyCorruptedRecord
}
}
}
}
#[derive(Debug)]
struct Rocks(rocksdb::DB, ActualAccessType);
impl Rocks {
fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Rocks> {
use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex,
};
fs::create_dir_all(&path)?;
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update...");
}
let mut db_options = get_db_options(&access_type);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
let meta_cf_descriptor =
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type));
let duplicate_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type));
let shred_data_cf_descriptor =
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type));
let shred_code_cf_descriptor =
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type));
let transaction_status_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type));
let address_signatures_cf_descriptor =
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type));
let transaction_status_index_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type));
let rewards_cf_descriptor =
ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type));
let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type));
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type));
let evm_headers_cf_descriptor =
ColumnFamilyDescriptor::new(EvmBlockHeader::NAME, get_cf_options(&access_type));
let evm_headers_by_hash_cf_descriptor =
ColumnFamilyDescriptor::new(EvmHeaderIndexByHash::NAME, get_cf_options(&access_type));
let evm_transactions_cf_descriptor =
ColumnFamilyDescriptor::new(EvmTransactionReceipts::NAME, get_cf_options(&access_type));
let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor),
(DeadSlots::NAME, dead_slots_cf_descriptor),
(DuplicateSlots::NAME, duplicate_slots_cf_descriptor),
(ErasureMeta::NAME, erasure_meta_cf_descriptor),
(Orphans::NAME, orphans_cf_descriptor),
(Root::NAME, root_cf_descriptor),
(Index::NAME, index_cf_descriptor),
(ShredData::NAME, shred_data_cf_descriptor),
(ShredCode::NAME, shred_code_cf_descriptor),
(TransactionStatus::NAME, transaction_status_cf_descriptor),
(AddressSignatures::NAME, address_signatures_cf_descriptor),
(
TransactionStatusIndex::NAME,
transaction_status_index_cf_descriptor,
),
(Rewards::NAME, rewards_cf_descriptor),
(Blocktime::NAME, blocktime_cf_descriptor),
(PerfSamples::NAME, perf_samples_cf_descriptor),
(EvmBlockHeader::NAME, evm_headers_cf_descriptor),
(EvmTransactionReceipts::NAME, evm_transactions_cf_descriptor),
(
EvmHeaderIndexByHash::NAME,
evm_headers_by_hash_cf_descriptor,
),
];
let db = match access_type {
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
ActualAccessType::Primary,
),
AccessType::TryPrimaryThenSecondary => {
let names: Vec<_> = cfs.iter().map(|c| c.0).collect();
match DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1)) {
Ok(db) => Rocks(db, ActualAccessType::Primary),
Err(err) => {
let secondary_path = path.join("solana-secondary");
warn!("Error when opening as primary: {}", err);
warn!("Trying as secondary at : {:?}", secondary_path);
warn!("This active secondary db use may temporarily cause the performance of another db use (like by validator) to degrade");
db_options.set_max_open_files(-1);
Rocks(
DB::open_cf_as_secondary(&db_options, path, &secondary_path, names)?,
ActualAccessType::Secondary,
)
}
}
}
};
Ok(db)
}
fn columns(&self) -> Vec<&'static str> {
use columns::{
AddressSignatures, Blocktime, DeadSlots, DuplicateSlots, ErasureMeta, Index, Orphans,
PerfSamples, Rewards, Root, ShredCode, ShredData, SlotMeta, TransactionStatus,
TransactionStatusIndex,
};
vec![
ErasureMeta::NAME,
DeadSlots::NAME,
DuplicateSlots::NAME,
Index::NAME,
Orphans::NAME,
Root::NAME,
SlotMeta::NAME,
ShredData::NAME,
ShredCode::NAME,
TransactionStatus::NAME,
AddressSignatures::NAME,
TransactionStatusIndex::NAME,
Rewards::NAME,
Blocktime::NAME,
PerfSamples::NAME,
EvmBlockHeader::NAME,
EvmTransactionReceipts::NAME,
EvmHeaderIndexByHash::NAME,
]
}
fn destroy(path: &Path) -> Result<()> {
DB::destroy(&Options::default(), path)?;
Ok(())
}
fn cf_handle(&self, cf: &str) -> &ColumnFamily {
self.0
.cf_handle(cf)
.expect("should never get an unknown column")
}
fn get_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
let opt = self.0.get_cf(cf, key)?.map(|db_vec| db_vec.to_vec());
Ok(opt)
}
fn put_cf(&self, cf: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
self.0.put_cf(cf, key, value)?;
Ok(())
}
fn iterator_cf<C>(&self, cf: &ColumnFamily, iterator_mode: IteratorMode<C::Index>) -> DBIterator
where
C: Column,
{
let start_key;
let iterator_mode = match iterator_mode {
IteratorMode::From(start_from, direction) => {
start_key = C::key(start_from);
RocksIteratorMode::From(&start_key, direction)
}
IteratorMode::Start => RocksIteratorMode::Start,
IteratorMode::End => RocksIteratorMode::End,
};
self.0.iterator_cf(cf, iterator_mode)
}
fn raw_iterator_cf(&self, cf: &ColumnFamily) -> DBRawIterator {
self.0.raw_iterator_cf(cf)
}
fn batch(&self) -> RWriteBatch {
RWriteBatch::default()
}
fn write(&self, batch: RWriteBatch) -> Result<()> {
self.0.write(batch)?;
Ok(())
}
fn is_primary_access(&self) -> bool {
self.1 == ActualAccessType::Primary
}
}
pub trait Column {
type Index;
fn key_size() -> usize {
std::mem::size_of::<Self::Index>()
}
fn key(index: Self::Index) -> Vec<u8>;
fn index(key: &[u8]) -> Self::Index;
fn primary_index(index: Self::Index) -> Slot;
#[allow(clippy::wrong_self_convention)]
fn as_index(slot: Slot) -> Self::Index;
}
pub trait ColumnName {
const NAME: &'static str;
}
pub trait TypedColumn: Column {
type Type: Serialize + DeserializeOwned;
}
impl TypedColumn for columns::AddressSignatures {
type Type = blockstore_meta::AddressSignatureMeta;
}
impl TypedColumn for columns::TransactionStatusIndex {
type Type = blockstore_meta::TransactionStatusIndexMeta;
}
pub trait ProtobufColumn: Column {
type Type: prost::Message + Default;
}
pub trait SlotColumn<Index = u64> {}
impl<T: SlotColumn> Column for T {
type Index = u64;
fn key(slot: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], slot);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn primary_index(index: u64) -> Slot {
index
}
fn as_index(slot: Slot) -> u64 {
slot
}
}
impl Column for columns::TransactionStatus {
type Index = (u64, Signature, Slot);
fn key((index, signature, slot): (u64, Signature, Slot)) -> Vec<u8> {
let mut key = vec![0; 8 + 64 + 8];
BigEndian::write_u64(&mut key[0..8], index);
key[8..72].clone_from_slice(&signature.as_ref()[0..64]);
BigEndian::write_u64(&mut key[72..80], slot);
key
}
fn index(key: &[u8]) -> (u64, Signature, Slot) {
if key.len() != 80 {
Self::as_index(0)
} else {
let index = BigEndian::read_u64(&key[0..8]);
let signature = Signature::new(&key[8..72]);
let slot = BigEndian::read_u64(&key[72..80]);
(index, signature, slot)
}
}
fn primary_index(index: Self::Index) -> u64 {
index.0
}
fn as_index(index: u64) -> Self::Index {
(index, Signature::default(), 0)
}
}
impl ColumnName for columns::TransactionStatus {
const NAME: &'static str = TRANSACTION_STATUS_CF;
}
impl ProtobufColumn for columns::TransactionStatus {
type Type = generated::TransactionStatusMeta;
}
impl Column for columns::AddressSignatures {
type Index = (u64, Pubkey, Slot, Signature);
fn key((index, pubkey, slot, signature): (u64, Pubkey, Slot, Signature)) -> Vec<u8> {
let mut key = vec![0; 8 + 32 + 8 + 64];
BigEndian::write_u64(&mut key[0..8], index);
key[8..40].clone_from_slice(&pubkey.as_ref()[0..32]);
BigEndian::write_u64(&mut key[40..48], slot);
key[48..112].clone_from_slice(&signature.as_ref()[0..64]);
key
}
fn index(key: &[u8]) -> (u64, Pubkey, Slot, Signature) {
let index = BigEndian::read_u64(&key[0..8]);
let pubkey = Pubkey::new(&key[8..40]);
let slot = BigEndian::read_u64(&key[40..48]);
let signature = Signature::new(&key[48..112]);
(index, pubkey, slot, signature)
}
fn primary_index(index: Self::Index) -> u64 {
index.0
}
fn as_index(index: u64) -> Self::Index {
(index, Pubkey::default(), 0, Signature::default())
}
}
impl ColumnName for columns::AddressSignatures {
const NAME: &'static str = ADDRESS_SIGNATURES_CF;
}
impl Column for columns::TransactionStatusIndex {
type Index = u64;
fn key(index: u64) -> Vec<u8> {
let mut key = vec![0; 8];
BigEndian::write_u64(&mut key[..], index);
key
}
fn index(key: &[u8]) -> u64 {
BigEndian::read_u64(&key[..8])
}
fn primary_index(index: u64) -> u64 {
index
}
fn as_index(slot: u64) -> u64 {
slot
}
}
impl ColumnName for columns::TransactionStatusIndex {
const NAME: &'static str = TRANSACTION_STATUS_INDEX_CF;
}
impl SlotColumn for columns::Rewards {}
impl ColumnName for columns::Rewards {
const NAME: &'static str = REWARDS_CF;
}
impl ProtobufColumn for columns::Rewards {
type Type = generated::Rewards;
}
impl SlotColumn for columns::Blocktime {}
impl ColumnName for columns::Blocktime {
const NAME: &'static str = BLOCKTIME_CF;
}
impl TypedColumn for columns::Blocktime {
type Type = UnixTimestamp;
}
impl SlotColumn for columns::PerfSamples {}
impl ColumnName for columns::PerfSamples {
const NAME: &'static str = PERF_SAMPLES_CF;
}
impl TypedColumn for columns::PerfSamples {
type Type = blockstore_meta::PerfSample;
}
impl Column for columns::ShredCode {
type Index = (u64, u64);
fn key(index: (u64, u64)) -> Vec<u8> {
columns::ShredData::key(index)
}
fn index(key: &[u8]) -> (u64, u64) {
columns::ShredData::index(key)
}
fn primary_index(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl ColumnName for columns::ShredCode {
const NAME: &'static str = CODE_SHRED_CF;
}
impl Column for columns::ShredData {
type Index = (u64, u64);
fn key((slot, index): (u64, u64)) -> Vec<u8> {
let mut key = vec![0; 16];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..16], index);
key
}
fn index(key: &[u8]) -> (u64, u64) {
let slot = BigEndian::read_u64(&key[..8]);
let index = BigEndian::read_u64(&key[8..16]);
(slot, index)
}
fn primary_index(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl ColumnName for columns::ShredData {
const NAME: &'static str = DATA_SHRED_CF;
}
impl SlotColumn for columns::Index {}
impl ColumnName for columns::Index {
const NAME: &'static str = INDEX_CF;
}
impl TypedColumn for columns::Index {
type Type = blockstore_meta::Index;
}
impl SlotColumn for columns::DeadSlots {}
impl ColumnName for columns::DeadSlots {
const NAME: &'static str = DEAD_SLOTS_CF;
}
impl TypedColumn for columns::DeadSlots {
type Type = bool;
}
impl SlotColumn for columns::DuplicateSlots {}
impl ColumnName for columns::DuplicateSlots {
const NAME: &'static str = DUPLICATE_SLOTS_CF;
}
impl TypedColumn for columns::DuplicateSlots {
type Type = blockstore_meta::DuplicateSlotProof;
}
impl SlotColumn for columns::Orphans {}
impl ColumnName for columns::Orphans {
const NAME: &'static str = ORPHANS_CF;
}
impl TypedColumn for columns::Orphans {
type Type = bool;
}
impl SlotColumn for columns::Root {}
impl ColumnName for columns::Root {
const NAME: &'static str = ROOT_CF;
}
impl TypedColumn for columns::Root {
type Type = bool;
}
impl SlotColumn for columns::SlotMeta {}
impl ColumnName for columns::SlotMeta {
const NAME: &'static str = META_CF;
}
impl TypedColumn for columns::SlotMeta {
type Type = blockstore_meta::SlotMeta;
}
impl Column for columns::ErasureMeta {
type Index = (u64, u64);
fn index(key: &[u8]) -> (u64, u64) {
let slot = BigEndian::read_u64(&key[..8]);
let set_index = BigEndian::read_u64(&key[8..]);
(slot, set_index)
}
fn key((slot, set_index): (u64, u64)) -> Vec<u8> {
let mut key = vec![0; 16];
BigEndian::write_u64(&mut key[..8], slot);
BigEndian::write_u64(&mut key[8..], set_index);
key
}
fn primary_index(index: Self::Index) -> Slot {
index.0
}
fn as_index(slot: Slot) -> Self::Index {
(slot, 0)
}
}
impl ColumnName for columns::ErasureMeta {
const NAME: &'static str = ERASURE_META_CF;
}
impl TypedColumn for columns::ErasureMeta {
type Type = blockstore_meta::ErasureMeta;
}
impl Column for columns::EvmBlockHeader {
type Index = (evm_state::BlockNum, Option<Slot>);
fn key((block, slot): (evm_state::BlockNum, Option<Slot>)) -> Vec<u8> {
let mut key = if let Some(slot) = slot {
let mut key = vec![0; 16];
BigEndian::write_u64(&mut key[8..], slot);
key
} else {
let key = vec![0; 8];
key
};
BigEndian::write_u64(&mut key[..8], block);
key
}
fn index(key: &[u8]) -> (evm_state::BlockNum, Option<Slot>) {
if key.len() < 8 {
return (0, None);
}
let block = BigEndian::read_u64(&key[..8]);
if key.len() < 16 {
return (block, None);
}
let slot = BigEndian::read_u64(&key[8..]);
(block, Some(slot))
}
fn primary_index((block, _): (evm_state::BlockNum, Option<Slot>)) -> u64 {
block
}
fn as_index(block: u64) -> (evm_state::BlockNum, Option<Slot>) {
(block, None)
}
}
impl ColumnName for columns::EvmBlockHeader {
const NAME: &'static str = EVM_HEADERS;
}
impl ProtobufColumn for columns::EvmBlockHeader {
type Type = generated_evm::EvmBlockHeader;
}
impl Column for columns::EvmHeaderIndexByHash {
type Index = (u64, H256);
fn key((index, hash): (u64, H256)) -> Vec<u8> {
let mut key = vec![0; 8 + 32];
BigEndian::write_u64(&mut key[0..8], index);
key[8..40].clone_from_slice(&hash.as_bytes()[0..32]);
key
}
fn index(key: &[u8]) -> (u64, H256) {
if key.len() != 40 {
Self::as_index(0)
} else {
let index = BigEndian::read_u64(&key[0..8]);
let hash = H256::from_slice(&key[8..40]);
(index, hash)
}
}
fn primary_index(index: Self::Index) -> u64 {
index.0
}
fn as_index(index: u64) -> Self::Index {
(index, H256::default())
}
}
impl ColumnName for columns::EvmHeaderIndexByHash {
const NAME: &'static str = EVM_BLOCK_BY_HASH;
}
impl ProtobufColumn for columns::EvmHeaderIndexByHash {
type Type = evm_state::BlockNum;
}
impl Column for columns::EvmTransactionReceipts {
type Index = EvmTransactionReceiptsIndex;
fn key(
EvmTransactionReceiptsIndex {
index,
hash,
block_num,
slot,
}: EvmTransactionReceiptsIndex,
) -> Vec<u8> {
let mut key = if let Some(slot) = slot {
let mut key = vec![0; 8 + 32 + 8 + 8];
BigEndian::write_u64(&mut key[48..56], slot);
key
} else {
let key = vec![0; 8 + 32 + 8];
key
};
BigEndian::write_u64(&mut key[0..8], index);
key[8..40].clone_from_slice(&hash.as_bytes()[0..32]);
BigEndian::write_u64(&mut key[40..48], block_num);
key
}
fn index(key: &[u8]) -> EvmTransactionReceiptsIndex {
if key.len() < 48 {
return Self::as_index(0);
}
let index = BigEndian::read_u64(&key[0..8]);
let hash = H256::from_slice(&key[8..40]);
let block_num = BigEndian::read_u64(&key[40..48]);
if key.len() < 56 {
return EvmTransactionReceiptsIndex {
index,
hash,
block_num,
slot: None,
};
}
let slot = BigEndian::read_u64(&key[48..56]);
EvmTransactionReceiptsIndex {
index,
hash,
block_num,
slot: Some(slot),
}
}
fn primary_index(index: Self::Index) -> u64 {
index.index
}
fn as_index(index: u64) -> Self::Index {
EvmTransactionReceiptsIndex {
index,
hash: H256::default(),
block_num: 0,
slot: None,
}
}
}
impl ColumnName for columns::EvmTransactionReceipts {
const NAME: &'static str = EVM_TRANSACTIONS;
}
impl ProtobufColumn for columns::EvmTransactionReceipts {
type Type = generated_evm::TransactionReceipt;
}
#[derive(Debug, Clone)]
pub struct Database {
backend: Arc<Rocks>,
path: Arc<Path>,
}
#[derive(Debug, Clone)]
pub struct LedgerColumn<C>
where
C: Column,
{
backend: Arc<Rocks>,
column: PhantomData<C>,
}
pub struct WriteBatch<'a> {
write_batch: RWriteBatch,
map: HashMap<&'static str, &'a ColumnFamily>,
}
impl Database {
pub fn open(
path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
) -> Result<Self> {
let backend = Arc::new(Rocks::open(path, access_type, recovery_mode)?);
Ok(Database {
backend,
path: Arc::from(path),
})
}
pub fn destroy(path: &Path) -> Result<()> {
Rocks::destroy(path)?;
Ok(())
}
pub fn get<C>(&self, key: C::Index) -> Result<Option<C::Type>>
where
C: TypedColumn + ColumnName,
{
if let Some(serialized_value) = self.backend.get_cf(self.cf_handle::<C>(), &C::key(key))? {
let value = deserialize(&serialized_value)?;
Ok(Some(value))
} else {
Ok(None)
}
}
pub fn iter<C>(
&self,
iterator_mode: IteratorMode<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + '_>
where
C: Column + ColumnName,
{
let cf = self.cf_handle::<C>();
let iter = self.backend.iterator_cf::<C>(cf, iterator_mode);
Ok(iter.map(|(key, value)| (C::index(&key), value)))
}
#[inline]
pub fn cf_handle<C: ColumnName>(&self) -> &ColumnFamily
where
C: Column + ColumnName,
{
self.backend.cf_handle(C::NAME)
}
pub fn column<C>(&self) -> LedgerColumn<C>
where
C: Column + ColumnName,
{
LedgerColumn {
backend: Arc::clone(&self.backend),
column: PhantomData,
}
}
#[inline]
pub fn raw_iterator_cf(&self, cf: &ColumnFamily) -> Result<DBRawIterator> {
Ok(self.backend.raw_iterator_cf(cf))
}
pub fn batch(&self) -> Result<WriteBatch> {
let write_batch = self.backend.batch();
let map = self
.backend
.columns()
.into_iter()
.map(|desc| (desc, self.backend.cf_handle(desc)))
.collect();
Ok(WriteBatch { write_batch, map })
}
pub fn write(&self, batch: WriteBatch) -> Result<()> {
self.backend.write(batch.write_batch)
}
pub fn storage_size(&self) -> Result<u64> {
Ok(fs_extra::dir::get_size(&self.path)?)
}
pub fn delete_range_cf<C>(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()>
where
C: Column + ColumnName,
{
let cf = self.cf_handle::<C>();
let from_index = C::as_index(from);
let to_index = C::as_index(to);
batch.delete_range_cf::<C>(cf, from_index, to_index)
}
pub fn is_primary_access(&self) -> bool {
self.backend.is_primary_access()
}
}
impl<C> LedgerColumn<C>
where
C: Column + ColumnName,
{
pub fn get_bytes(&self, key: C::Index) -> Result<Option<Vec<u8>>> {
self.backend.get_cf(self.handle(), &C::key(key))
}
pub fn iter(
&self,
iterator_mode: IteratorMode<C::Index>,
) -> Result<impl Iterator<Item = (C::Index, Box<[u8]>)> + '_> {
let cf = self.handle();
let iter = self.backend.iterator_cf::<C>(cf, iterator_mode);
Ok(iter.map(|(key, value)| (C::index(&key), value)))
}
pub fn delete_slot(
&self,
batch: &mut WriteBatch,
from: Option<Slot>,
to: Option<Slot>,
) -> Result<bool>
where
C::Index: PartialOrd + Copy + ColumnName,
{
let mut end = true;
let iter_config = match from {
Some(s) => IteratorMode::From(C::as_index(s), IteratorDirection::Forward),
None => IteratorMode::Start,
};
let iter = self.iter(iter_config)?;
for (index, _) in iter {
if let Some(to) = to {
if C::primary_index(index) > to {
end = false;
break;
}
};
if let Err(e) = batch.delete::<C>(index) {
error!(
"Error: {:?} while adding delete from_slot {:?} to batch {:?}",
e,
from,
C::NAME
)
}
}
Ok(end)
}
pub fn compact_range(&self, from: Slot, to: Slot) -> Result<bool>
where
C::Index: PartialOrd + Copy,
{
let cf = self.handle();
let from = Some(C::key(C::as_index(from)));
let to = Some(C::key(C::as_index(to)));
self.backend.0.compact_range_cf(cf, from, to);
Ok(true)
}
#[inline]
pub fn handle(&self) -> &ColumnFamily {
self.backend.cf_handle(C::NAME)
}
#[cfg(test)]
pub fn is_empty(&self) -> Result<bool> {
let mut iter = self.backend.raw_iterator_cf(self.handle());
iter.seek_to_first();
Ok(!iter.valid())
}
pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> {
self.backend.put_cf(self.handle(), &C::key(key), value)
}
}
impl<C> LedgerColumn<C>
where
C: TypedColumn + ColumnName,
{
pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
let value = deserialize(&serialized_value)?;
Ok(Some(value))
} else {
Ok(None)
}
}
pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> {
let serialized_value = serialize(value)?;
self.backend
.put_cf(self.handle(), &C::key(key), &serialized_value)
}
}
impl<C> LedgerColumn<C>
where
C: ProtobufColumn + ColumnName,
{
pub fn get_protobuf_or_bincode<T: DeserializeOwned + Into<C::Type>>(
&self,
key: C::Index,
) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
Ok(Some(
self.deserialize_protobuf_or_bincode::<T>(&serialized_value)?,
))
} else {
Ok(None)
}
}
pub fn get_protobuf(&self, key: C::Index) -> Result<Option<C::Type>> {
if let Some(serialized_value) = self.backend.get_cf(self.handle(), &C::key(key))? {
Ok(Some(C::Type::decode(&serialized_value[..])?))
} else {
Ok(None)
}
}
pub fn put_protobuf(&self, key: C::Index, value: &C::Type) -> Result<()> {
let mut buf = Vec::with_capacity(value.encoded_len());
value.encode(&mut buf)?;
self.backend.put_cf(self.handle(), &C::key(key), &buf)
}
pub fn deserialize_protobuf_or_bincode<T>(&self, serialized_value: &[u8]) -> Result<C::Type>
where
T: Into<C::Type> + DeserializeOwned,
{
let value = match C::Type::decode(serialized_value) {
Ok(value) => value,
Err(_) => deserialize::<T>(serialized_value)?.into(),
};
Ok(value)
}
}
impl<'a> WriteBatch<'a> {
pub fn put_bytes<C: Column + ColumnName>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> {
self.write_batch
.put_cf(self.get_cf::<C>(), &C::key(key), bytes);
Ok(())
}
pub fn delete<C: Column + ColumnName>(&mut self, key: C::Index) -> Result<()> {
self.write_batch.delete_cf(self.get_cf::<C>(), &C::key(key));
Ok(())
}
pub fn put<C: TypedColumn + ColumnName>(
&mut self,
key: C::Index,
value: &C::Type,
) -> Result<()> {
let serialized_value = serialize(&value)?;
self.write_batch
.put_cf(self.get_cf::<C>(), &C::key(key), &serialized_value);
Ok(())
}
#[inline]
fn get_cf<C: Column + ColumnName>(&self) -> &'a ColumnFamily {
self.map[C::NAME]
}
pub fn delete_range_cf<C: Column>(
&mut self,
cf: &ColumnFamily,
from: C::Index,
to: C::Index,
) -> Result<()> {
self.write_batch
.delete_range_cf(cf, C::key(from), C::key(to));
Ok(())
}
}
fn get_cf_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
options.set_max_write_buffer_number(8);
options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE as usize);
let file_num_compaction_trigger = 4;
let total_size_base = MAX_WRITE_BUFFER_SIZE * file_num_compaction_trigger;
let file_size_base = total_size_base / 10;
options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
options.set_max_bytes_for_level_base(total_size_base);
options.set_target_file_size_base(file_size_base);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}
options
}
fn get_db_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
options.increase_parallelism(num_cpus::get() as i32);
let mut env = rocksdb::Env::default().unwrap();
env.set_high_priority_background_threads(4);
options.set_env(&env);
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}
options
}