pub use crate::{blockstore_db::BlockstoreError, blockstore_meta::SlotMeta};
use crate::{
blockstore_db::{
columns as cf, AccessType, BlockstoreRecoveryMode, Column, Database,
EvmTransactionReceiptsIndex, IteratorDirection, IteratorMode, LedgerColumn, Result,
WriteBatch,
},
blockstore_meta::*,
entry::{create_ticks, Entry},
erasure::ErasureConfig,
leader_schedule_cache::LeaderScheduleCache,
next_slots_iterator::NextSlotsIterator,
shred::{Result as ShredResult, Shred, Shredder},
};
use bincode::deserialize;
use evm::H256;
use log::*;
use rayon::{
iter::{IntoParallelRefIterator, ParallelIterator},
ThreadPool,
};
use rocksdb::DBRawIterator;
use solana_measure::measure::Measure;
use solana_metrics::{datapoint_debug, datapoint_error};
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::hardened_unpack::{unpack_genesis_archive, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE};
use solana_sdk::{
clock::{Slot, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, MS_PER_TICK},
genesis_config::GenesisConfig,
hash::Hash,
pubkey::Pubkey,
sanitize::Sanitize,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
};
use solana_storage_proto::{StoredExtendedRewards, StoredTransactionStatusMeta};
use solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Rewards,
TransactionStatusMeta, TransactionWithStatusMeta,
};
use evm_state as evm;
use std::{
cell::RefCell,
cmp,
collections::{HashMap, HashSet},
convert::TryInto,
fs,
io::{Error as IoError, ErrorKind},
path::{Path, PathBuf},
rc::Rc,
sync::{
mpsc::{sync_channel, Receiver, SyncSender, TrySendError},
Arc, Mutex, RwLock,
},
};
use tempfile::TempDir;
use thiserror::Error;
use trees::{Tree, TreeWalk};
pub mod blockstore_purge;
pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb";
thread_local!(static PAR_THREAD_POOL: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|ix| format!("blockstore_{}", ix))
.build()
.unwrap()));
thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell<ThreadPool> = RefCell::new(rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
.thread_name(|ix| format!("blockstore_{}", ix))
.build()
.unwrap()));
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100;
pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK;
pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768;
pub type CompletedSlotsReceiver = Receiver<Vec<u64>>;
type CompletedRanges = Vec<(u32, u32)>;
#[derive(Clone, Copy)]
pub enum PurgeType {
Exact,
PrimaryIndex,
}
#[derive(Error, Debug)]
pub enum InsertDataShredError {
Exists,
InvalidShred,
BlockstoreError(#[from] BlockstoreError),
}
impl std::fmt::Display for InsertDataShredError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "insert data shred error")
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct CompletedDataSetInfo {
pub slot: Slot,
pub start_index: u32,
pub end_index: u32,
}
pub struct BlockstoreSignals {
pub blockstore: Blockstore,
pub ledger_signal_receiver: Receiver<bool>,
pub completed_slots_receiver: CompletedSlotsReceiver,
}
pub struct Blockstore {
ledger_path: PathBuf,
db: Arc<Database>,
meta_cf: LedgerColumn<cf::SlotMeta>,
dead_slots_cf: LedgerColumn<cf::DeadSlots>,
duplicate_slots_cf: LedgerColumn<cf::DuplicateSlots>,
erasure_meta_cf: LedgerColumn<cf::ErasureMeta>,
orphans_cf: LedgerColumn<cf::Orphans>,
index_cf: LedgerColumn<cf::Index>,
data_shred_cf: LedgerColumn<cf::ShredData>,
code_shred_cf: LedgerColumn<cf::ShredCode>,
transaction_status_cf: LedgerColumn<cf::TransactionStatus>,
address_signatures_cf: LedgerColumn<cf::AddressSignatures>,
transaction_status_index_cf: LedgerColumn<cf::TransactionStatusIndex>,
active_transaction_status_index: RwLock<u64>,
rewards_cf: LedgerColumn<cf::Rewards>,
blocktime_cf: LedgerColumn<cf::Blocktime>,
perf_samples_cf: LedgerColumn<cf::PerfSamples>,
evm_blocks_cf: LedgerColumn<cf::EvmBlockHeader>,
evm_transactions_cf: LedgerColumn<cf::EvmTransactionReceipts>,
evm_blocks_by_hash_cf: LedgerColumn<cf::EvmHeaderIndexByHash>,
last_root: Arc<RwLock<Slot>>,
insert_shreds_lock: Arc<Mutex<()>>,
pub new_shreds_signals: Vec<SyncSender<bool>>,
pub completed_slots_senders: Vec<SyncSender<Vec<Slot>>>,
pub lowest_cleanup_slot: Arc<RwLock<u64>>,
no_compaction: bool,
}
pub struct IndexMetaWorkingSetEntry {
index: Index,
did_insert_occur: bool,
}
pub struct SlotMetaWorkingSetEntry {
new_slot_meta: Rc<RefCell<SlotMeta>>,
old_slot_meta: Option<SlotMeta>,
did_insert_occur: bool,
}
#[derive(Default)]
pub struct BlockstoreInsertionMetrics {
pub num_shreds: usize,
pub insert_lock_elapsed: u64,
pub insert_shreds_elapsed: u64,
pub shred_recovery_elapsed: u64,
pub chaining_elapsed: u64,
pub commit_working_sets_elapsed: u64,
pub write_batch_elapsed: u64,
pub total_elapsed: u64,
pub num_inserted: u64,
pub num_repair: u64,
pub num_recovered: usize,
pub num_recovered_inserted: usize,
pub num_recovered_failed_sig: usize,
pub num_recovered_failed_invalid: usize,
pub num_recovered_exists: usize,
pub index_meta_time: u64,
}
impl SlotMetaWorkingSetEntry {
fn new(new_slot_meta: Rc<RefCell<SlotMeta>>, old_slot_meta: Option<SlotMeta>) -> Self {
Self {
new_slot_meta,
old_slot_meta,
did_insert_occur: false,
}
}
}
impl BlockstoreInsertionMetrics {
pub fn report_metrics(&self, metric_name: &'static str) {
datapoint_info!(
metric_name,
("num_shreds", self.num_shreds as i64, i64),
("total_elapsed", self.total_elapsed as i64, i64),
("insert_lock_elapsed", self.insert_lock_elapsed as i64, i64),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
(
"shred_recovery_elapsed",
self.shred_recovery_elapsed as i64,
i64
),
("chaining_elapsed", self.chaining_elapsed as i64, i64),
(
"commit_working_sets_elapsed",
self.commit_working_sets_elapsed as i64,
i64
),
("write_batch_elapsed", self.write_batch_elapsed as i64, i64),
("num_inserted", self.num_inserted as i64, i64),
("num_repair", self.num_repair as i64, i64),
("num_recovered", self.num_recovered as i64, i64),
(
"num_recovered_inserted",
self.num_recovered_inserted as i64,
i64
),
(
"num_recovered_failed_sig",
self.num_recovered_failed_sig as i64,
i64
),
(
"num_recovered_failed_invalid",
self.num_recovered_failed_invalid as i64,
i64
),
(
"num_recovered_exists",
self.num_recovered_exists as i64,
i64
),
);
}
}
impl Blockstore {
pub fn db(self) -> Arc<Database> {
self.db
}
pub fn ledger_path(&self) -> &Path {
&self.ledger_path
}
pub fn open(ledger_path: &Path) -> Result<Blockstore> {
Self::do_open(ledger_path, AccessType::PrimaryOnly, None, true)
}
pub fn open_with_access_type(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
enforce_ulimit_nofile: bool,
) -> Result<Blockstore> {
Self::do_open(
ledger_path,
access_type,
recovery_mode,
enforce_ulimit_nofile,
)
}
fn do_open(
ledger_path: &Path,
access_type: AccessType,
recovery_mode: Option<BlockstoreRecoveryMode>,
enforce_ulimit_nofile: bool,
) -> Result<Blockstore> {
fs::create_dir_all(&ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);
adjust_ulimit_nofile(enforce_ulimit_nofile)?;
let mut measure = Measure::start("open");
info!("Opening database at {:?}", blockstore_path);
let db = Database::open(&blockstore_path, access_type, recovery_mode)?;
let meta_cf = db.column();
let dead_slots_cf = db.column();
let duplicate_slots_cf = db.column();
let erasure_meta_cf = db.column();
let orphans_cf = db.column();
let index_cf = db.column();
let data_shred_cf = db.column();
let code_shred_cf = db.column();
let transaction_status_cf = db.column();
let address_signatures_cf = db.column();
let transaction_status_index_cf = db.column();
let rewards_cf = db.column();
let blocktime_cf = db.column();
let perf_samples_cf = db.column();
let evm_blocks_cf = db.column();
let evm_transactions_cf = db.column();
let evm_blocks_by_hash_cf = db.column();
let db = Arc::new(db);
let max_root = db
.iter::<cf::Root>(IteratorMode::End)?
.next()
.map(|(slot, _)| slot)
.unwrap_or(0);
let last_root = Arc::new(RwLock::new(max_root));
let active_transaction_status_index = db
.iter::<cf::TransactionStatusIndex>(IteratorMode::Start)?
.next();
let initialize_transaction_status_index = active_transaction_status_index.is_none();
let active_transaction_status_index = active_transaction_status_index
.and_then(|(_, data)| {
let index0: TransactionStatusIndexMeta = deserialize(&data).unwrap();
if index0.frozen {
Some(1)
} else {
None
}
})
.unwrap_or(0);
measure.stop();
info!("{:?} {}", blockstore_path, measure);
let blockstore = Blockstore {
ledger_path: ledger_path.to_path_buf(),
db,
meta_cf,
dead_slots_cf,
duplicate_slots_cf,
erasure_meta_cf,
orphans_cf,
index_cf,
data_shred_cf,
code_shred_cf,
transaction_status_cf,
address_signatures_cf,
transaction_status_index_cf,
active_transaction_status_index: RwLock::new(active_transaction_status_index),
rewards_cf,
blocktime_cf,
perf_samples_cf,
evm_blocks_cf,
evm_transactions_cf,
evm_blocks_by_hash_cf,
new_shreds_signals: vec![],
completed_slots_senders: vec![],
insert_shreds_lock: Arc::new(Mutex::new(())),
last_root,
lowest_cleanup_slot: Arc::new(RwLock::new(0)),
no_compaction: false,
};
if initialize_transaction_status_index {
blockstore.initialize_transaction_status_index()?;
}
Ok(blockstore)
}
pub fn open_with_signal(
ledger_path: &Path,
recovery_mode: Option<BlockstoreRecoveryMode>,
enforce_ulimit_nofile: bool,
) -> Result<BlockstoreSignals> {
let mut blockstore = Self::open_with_access_type(
ledger_path,
AccessType::PrimaryOnly,
recovery_mode,
enforce_ulimit_nofile,
)?;
let (ledger_signal_sender, ledger_signal_receiver) = sync_channel(1);
let (completed_slots_sender, completed_slots_receiver) =
sync_channel(MAX_COMPLETED_SLOTS_IN_CHANNEL);
blockstore.new_shreds_signals = vec![ledger_signal_sender];
blockstore.completed_slots_senders = vec![completed_slots_sender];
Ok(BlockstoreSignals {
blockstore,
ledger_signal_receiver,
completed_slots_receiver,
})
}
pub fn add_tree(
&self,
forks: Tree<Slot>,
is_orphan: bool,
is_slot_complete: bool,
num_ticks: u64,
starting_hash: Hash,
) {
let mut walk = TreeWalk::from(forks);
let mut blockhashes = HashMap::new();
while let Some(visit) = walk.get() {
let slot = visit.node().data;
if self.meta(slot).unwrap().is_some() && self.orphan(slot).unwrap().is_none() {
walk.forward();
continue;
}
let parent = walk.get_parent().map(|n| n.data);
if parent.is_some() || !is_orphan {
let parent_hash = parent
.and_then(|parent| blockhashes.get(&parent))
.unwrap_or(&starting_hash);
let mut entries = create_ticks(
num_ticks * (std::cmp::max(1, slot - parent.unwrap_or(slot))),
0,
*parent_hash,
);
blockhashes.insert(slot, entries.last().unwrap().hash);
if !is_slot_complete {
entries.pop().unwrap();
}
let shreds = entries_to_test_shreds(
entries.clone(),
slot,
parent.unwrap_or(slot),
is_slot_complete,
0,
);
self.insert_shreds(shreds, None, false).unwrap();
}
walk.forward();
}
}
pub fn set_no_compaction(&mut self, no_compaction: bool) {
self.no_compaction = no_compaction;
}
pub fn destroy(ledger_path: &Path) -> Result<()> {
fs::create_dir_all(ledger_path)?;
let blockstore_path = ledger_path.join(BLOCKSTORE_DIRECTORY);
Database::destroy(&blockstore_path)
}
pub fn meta(&self, slot: Slot) -> Result<Option<SlotMeta>> {
self.meta_cf.get(slot)
}
pub fn is_full(&self, slot: Slot) -> bool {
if let Ok(Some(meta)) = self.meta_cf.get(slot) {
return meta.is_full();
}
false
}
pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result<Option<ErasureMeta>> {
self.erasure_meta_cf.get((slot, set_index))
}
pub fn orphan(&self, slot: Slot) -> Result<Option<bool>> {
self.orphans_cf.get(slot)
}
pub fn max_root(&self) -> Slot {
self.db
.iter::<cf::Root>(IteratorMode::End)
.expect("Couldn't get rooted iterator for max_root()")
.next()
.map(|(slot, _)| slot)
.unwrap_or(0)
}
pub fn slot_meta_iterator(
&self,
slot: Slot,
) -> Result<impl Iterator<Item = (Slot, SlotMeta)> + '_> {
let meta_iter = self
.db
.iter::<cf::SlotMeta>(IteratorMode::From(slot, IteratorDirection::Forward))?;
Ok(meta_iter.map(|(slot, slot_meta_bytes)| {
(
slot,
deserialize(&slot_meta_bytes).unwrap_or_else(|e| {
panic!("Could not deserialize SlotMeta for slot {}: {:?}", slot, e)
}),
)
}))
}
#[allow(dead_code)]
pub fn live_slots_iterator(&self, root: Slot) -> impl Iterator<Item = (Slot, SlotMeta)> + '_ {
let root_forks = NextSlotsIterator::new(root, self);
let orphans_iter = self.orphans_iterator(root + 1).unwrap();
root_forks.chain(orphans_iter.flat_map(move |orphan| NextSlotsIterator::new(orphan, self)))
}
pub fn slot_data_iterator(
&self,
slot: Slot,
index: u64,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + '_> {
let slot_iterator = self.db.iter::<cf::ShredData>(IteratorMode::From(
(slot, index),
IteratorDirection::Forward,
))?;
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
pub fn slot_coding_iterator(
&self,
slot: Slot,
index: u64,
) -> Result<impl Iterator<Item = ((u64, u64), Box<[u8]>)> + '_> {
let slot_iterator = self.db.iter::<cf::ShredCode>(IteratorMode::From(
(slot, index),
IteratorDirection::Forward,
))?;
Ok(slot_iterator.take_while(move |((shred_slot, _), _)| *shred_slot == slot))
}
pub fn rooted_slot_iterator(&self, slot: Slot) -> Result<impl Iterator<Item = u64> + '_> {
let slot_iterator = self
.db
.iter::<cf::Root>(IteratorMode::From(slot, IteratorDirection::Forward))?;
Ok(slot_iterator.map(move |(rooted_slot, _)| rooted_slot))
}
fn get_recovery_data_shreds(
index: &mut Index,
set_index: u64,
slot: Slot,
erasure_meta: &ErasureMeta,
available_shreds: &mut Vec<Shred>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
) {
(set_index..set_index + erasure_meta.config.num_data() as u64).for_each(|i| {
if index.data().is_present(i) {
if let Some(shred) = prev_inserted_datas.remove(&(slot, i)).or_else(|| {
let some_data = data_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch data shred");
if let Some(data) = some_data {
Shred::new_from_serialized_shred(data).ok()
} else {
warn!("Data shred deleted while reading for recovery");
None
}
}) {
available_shreds.push(shred);
}
}
});
}
fn get_recovery_coding_shreds(
index: &mut Index,
slot: Slot,
erasure_meta: &ErasureMeta,
available_shreds: &mut Vec<Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
(erasure_meta.first_coding_index
..erasure_meta.first_coding_index + erasure_meta.config.num_coding() as u64)
.for_each(|i| {
if let Some(shred) = prev_inserted_codes
.remove(&(slot, i))
.map(|s| {
index.coding_mut().set_present(i, false);
s
})
.or_else(|| {
if index.coding().is_present(i) {
let some_code = code_cf
.get_bytes((slot, i))
.expect("Database failure, could not fetch code shred");
if let Some(code) = some_code {
Shred::new_from_serialized_shred(code).ok()
} else {
warn!("Code shred deleted while reading for recovery");
None
}
} else {
None
}
})
{
available_shreds.push(shred);
}
});
}
fn recover_shreds(
index: &mut Index,
set_index: u64,
erasure_meta: &ErasureMeta,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
recovered_data_shreds: &mut Vec<Shred>,
data_cf: &LedgerColumn<cf::ShredData>,
code_cf: &LedgerColumn<cf::ShredCode>,
) {
let slot = index.slot;
let mut available_shreds = vec![];
Self::get_recovery_data_shreds(
index,
set_index,
slot,
erasure_meta,
&mut available_shreds,
prev_inserted_datas,
data_cf,
);
Self::get_recovery_coding_shreds(
index,
slot,
erasure_meta,
&mut available_shreds,
prev_inserted_codes,
code_cf,
);
if let Ok(mut result) = Shredder::try_recovery(
available_shreds,
erasure_meta.config.num_data(),
erasure_meta.config.num_coding(),
set_index as usize,
erasure_meta.first_coding_index as usize,
slot,
) {
Self::submit_metrics(
slot,
set_index,
erasure_meta,
true,
"complete".into(),
result.len(),
);
recovered_data_shreds.append(&mut result);
} else {
Self::submit_metrics(slot, set_index, erasure_meta, true, "incomplete".into(), 0);
}
}
fn submit_metrics(
slot: Slot,
set_index: u64,
erasure_meta: &ErasureMeta,
attempted: bool,
status: String,
recovered: usize,
) {
datapoint_debug!(
"blockstore-erasure",
("slot", slot as i64, i64),
("start_index", set_index as i64, i64),
(
"end_index",
(erasure_meta.set_index + erasure_meta.config.num_data() as u64) as i64,
i64
),
("recovery_attempted", attempted, bool),
("recovery_status", status, String),
("recovered", recovered as i64, i64),
);
}
fn try_shred_recovery(
db: &Database,
erasure_metas: &HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_datas: &mut HashMap<(u64, u64), Shred>,
prev_inserted_codes: &mut HashMap<(u64, u64), Shred>,
) -> Vec<Shred> {
let data_cf = db.column::<cf::ShredData>();
let code_cf = db.column::<cf::ShredCode>();
let mut recovered_data_shreds = vec![];
for (&(slot, set_index), erasure_meta) in erasure_metas.iter() {
let index_meta_entry = index_working_set.get_mut(&slot).expect("Index");
let index = &mut index_meta_entry.index;
match erasure_meta.status(&index) {
ErasureMetaStatus::CanRecover => {
Self::recover_shreds(
index,
set_index,
erasure_meta,
prev_inserted_datas,
prev_inserted_codes,
&mut recovered_data_shreds,
&data_cf,
&code_cf,
);
}
ErasureMetaStatus::DataFull => {
(set_index..set_index + erasure_meta.config.num_coding() as u64).for_each(
|i| {
if prev_inserted_codes.remove(&(slot, i)).is_some() {
index.coding_mut().set_present(i, false);
}
},
);
Self::submit_metrics(
slot,
set_index,
erasure_meta,
false,
"complete".into(),
0,
);
}
ErasureMetaStatus::StillNeed(needed) => {
Self::submit_metrics(
slot,
set_index,
erasure_meta,
false,
format!("still need: {}", needed),
0,
);
}
};
}
recovered_data_shreds
}
pub fn insert_shreds_handle_duplicate<F>(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
handle_duplicate: &F,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)>
where
F: Fn(Shred),
{
let mut total_start = Measure::start("Total elapsed");
let mut start = Measure::start("Blockstore lock");
let _lock = self.insert_shreds_lock.lock().unwrap();
start.stop();
let insert_lock_elapsed = start.as_us();
let db = &*self.db;
let mut write_batch = db.batch()?;
let mut just_inserted_coding_shreds = HashMap::new();
let mut just_inserted_data_shreds = HashMap::new();
let mut erasure_metas = HashMap::new();
let mut slot_meta_working_set = HashMap::new();
let mut index_working_set = HashMap::new();
let num_shreds = shreds.len();
let mut start = Measure::start("Shred insertion");
let mut num_inserted = 0;
let mut index_meta_time = 0;
let mut newly_completed_data_sets: Vec<CompletedDataSetInfo> = vec![];
let mut inserted_indices = Vec::new();
shreds.into_iter().enumerate().for_each(|(i, shred)| {
if shred.is_data() {
let shred_slot = shred.slot();
if let Ok(completed_data_sets) = self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
handle_duplicate,
leader_schedule,
false,
) {
newly_completed_data_sets.extend(completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
));
inserted_indices.push(i);
num_inserted += 1;
}
} else if shred.is_code() {
self.check_cache_coding_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_inserted_coding_shreds,
&mut index_meta_time,
handle_duplicate,
is_trusted,
);
} else {
panic!("There should be no other case");
}
});
start.stop();
let insert_shreds_elapsed = start.as_us();
let mut start = Measure::start("Shred recovery");
let mut num_recovered = 0;
let mut num_recovered_inserted = 0;
let mut num_recovered_failed_sig = 0;
let mut num_recovered_failed_invalid = 0;
let mut num_recovered_exists = 0;
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_data = Self::try_shred_recovery(
&db,
&erasure_metas,
&mut index_working_set,
&mut just_inserted_data_shreds,
&mut just_inserted_coding_shreds,
);
num_recovered = recovered_data.len();
recovered_data.into_iter().for_each(|shred| {
if let Some(leader) = leader_schedule_cache.slot_leader_at(shred.slot(), None) {
let shred_slot = shred.slot();
if shred.verify(&leader) {
match self.check_insert_data_shred(
shred,
&mut erasure_metas,
&mut index_working_set,
&mut slot_meta_working_set,
&mut write_batch,
&mut just_inserted_data_shreds,
&mut index_meta_time,
is_trusted,
&handle_duplicate,
leader_schedule,
true,
) {
Err(InsertDataShredError::Exists) => {
num_recovered_exists += 1;
}
Err(InsertDataShredError::InvalidShred) => {
num_recovered_failed_invalid += 1;
}
Err(InsertDataShredError::BlockstoreError(_)) => {}
Ok(completed_data_sets) => {
newly_completed_data_sets.extend(
completed_data_sets.into_iter().map(
|(start_index, end_index)| CompletedDataSetInfo {
slot: shred_slot,
start_index,
end_index,
},
),
);
num_recovered_inserted += 1;
}
}
} else {
num_recovered_failed_sig += 1;
}
}
});
}
start.stop();
let shred_recovery_elapsed = start.as_us();
just_inserted_coding_shreds
.into_iter()
.for_each(|((_, _), shred)| {
self.check_insert_coding_shred(
shred,
&mut index_working_set,
&mut write_batch,
&mut index_meta_time,
);
num_inserted += 1;
});
let mut start = Measure::start("Shred recovery");
handle_chaining(&self.db, &mut write_batch, &mut slot_meta_working_set)?;
start.stop();
let chaining_elapsed = start.as_us();
let mut start = Measure::start("Commit Working Sets");
let (should_signal, newly_completed_slots) = commit_slot_meta_working_set(
&slot_meta_working_set,
&self.completed_slots_senders,
&mut write_batch,
)?;
for ((slot, set_index), erasure_meta) in erasure_metas {
write_batch.put::<cf::ErasureMeta>((slot, set_index), &erasure_meta)?;
}
for (&slot, index_working_set_entry) in index_working_set.iter() {
if index_working_set_entry.did_insert_occur {
write_batch.put::<cf::Index>(slot, &index_working_set_entry.index)?;
}
}
start.stop();
let commit_working_sets_elapsed = start.as_us();
let mut start = Measure::start("Write Batch");
self.db.write(write_batch)?;
start.stop();
let write_batch_elapsed = start.as_us();
send_signals(
&self.new_shreds_signals,
&self.completed_slots_senders,
should_signal,
newly_completed_slots,
);
total_start.stop();
metrics.num_shreds += num_shreds;
metrics.total_elapsed += total_start.as_us();
metrics.insert_lock_elapsed += insert_lock_elapsed;
metrics.insert_shreds_elapsed += insert_shreds_elapsed;
metrics.shred_recovery_elapsed += shred_recovery_elapsed;
metrics.chaining_elapsed += chaining_elapsed;
metrics.commit_working_sets_elapsed += commit_working_sets_elapsed;
metrics.write_batch_elapsed += write_batch_elapsed;
metrics.num_inserted += num_inserted;
metrics.num_recovered += num_recovered;
metrics.num_recovered_inserted += num_recovered_inserted;
metrics.num_recovered_failed_sig += num_recovered_failed_sig;
metrics.num_recovered_failed_invalid = num_recovered_failed_invalid;
metrics.num_recovered_exists = num_recovered_exists;
metrics.index_meta_time += index_meta_time;
Ok((newly_completed_data_sets, inserted_indices))
}
pub fn clear_unconfirmed_slot(&self, slot: Slot) {
let _lock = self.insert_shreds_lock.lock().unwrap();
if let Some(mut slot_meta) = self
.meta(slot)
.expect("Couldn't fetch from SlotMeta column family")
{
self.run_purge(slot, slot, PurgeType::PrimaryIndex)
.expect("Purge database operations failed");
slot_meta.clear_unconfirmed_slot();
self.meta_cf
.put(slot, &slot_meta)
.expect("Couldn't insert into SlotMeta column family");
} else {
error!(
"clear_unconfirmed_slot() called on slot {} with no SlotMeta",
slot
);
}
}
pub fn insert_shreds(
&self,
shreds: Vec<Shred>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_trusted: bool,
) -> Result<(Vec<CompletedDataSetInfo>, Vec<usize>)> {
self.insert_shreds_handle_duplicate(
shreds,
leader_schedule,
is_trusted,
&|_| {},
&mut BlockstoreInsertionMetrics::default(),
)
}
fn check_insert_coding_shred(
&self,
shred: Shred,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
index_meta_time: &mut u64,
) -> bool {
let slot = shred.slot();
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
self.insert_coding_shred(index_meta, &shred, write_batch)
.map(|_| {
index_meta_working_set_entry.did_insert_occur = true;
})
.is_ok()
}
fn erasure_mismatch(shred1: &Shred, shred2: &Shred) -> bool {
shred1.coding_header.num_coding_shreds != shred2.coding_header.num_coding_shreds
|| shred1.coding_header.num_data_shreds != shred2.coding_header.num_data_shreds
}
fn check_cache_coding_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
handle_duplicate: &F,
is_trusted: bool,
) -> bool
where
F: Fn(Shred),
{
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
if !is_trusted {
if index_meta.coding().is_present(shred_index) {
handle_duplicate(shred);
return false;
}
if !Blockstore::should_insert_coding_shred(&shred, &self.last_root) {
return false;
}
}
let set_index = u64::from(shred.common_header.fec_set_index);
let erasure_config = ErasureConfig::new(
shred.coding_header.num_data_shreds as usize,
shred.coding_header.num_coding_shreds as usize,
);
let erasure_meta = erasure_metas.entry((slot, set_index)).or_insert_with(|| {
let first_coding_index =
u64::from(shred.index()) - u64::from(shred.coding_header.position);
self.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
.unwrap_or_else(|| ErasureMeta::new(set_index, first_coding_index, &erasure_config))
});
if erasure_config != erasure_meta.config {
let conflicting_shred = self.find_conflicting_coding_shred(
&shred,
slot,
erasure_meta,
just_received_coding_shreds,
);
if let Some(conflicting_shred) = conflicting_shred {
if self
.store_duplicate_if_not_existing(slot, conflicting_shred, shred.payload.clone())
.is_err()
{
warn!("bad duplicate store..");
}
} else {
datapoint_info!("bad-conflict-shred", ("slot", slot, i64));
}
warn!("Received multiple erasure configs for the same erasure set!!!");
warn!(
"Slot: {}, shred index: {}, set_index: {}, is_duplicate: {}, stored config: {:#?}, new config: {:#?}",
slot, shred.index(), set_index, self.has_duplicate_shreds_in_slot(slot), erasure_meta.config, erasure_config
);
return false;
}
index_meta.coding_mut().set_present(shred_index, true);
just_received_coding_shreds
.entry((slot, shred_index))
.or_insert_with(|| shred);
true
}
fn find_conflicting_coding_shred(
&self,
shred: &Shred,
slot: Slot,
erasure_meta: &ErasureMeta,
just_received_coding_shreds: &mut HashMap<(u64, u64), Shred>,
) -> Option<Vec<u8>> {
let coding_start = erasure_meta.first_coding_index;
let coding_end = coding_start + erasure_meta.config.num_coding() as u64;
let mut conflicting_shred = None;
for coding_index in coding_start..coding_end {
let maybe_shred = self.get_coding_shred(slot, coding_index);
if let Ok(Some(shred_data)) = maybe_shred {
let potential_shred = Shred::new_from_serialized_shred(shred_data).unwrap();
if Self::erasure_mismatch(&potential_shred, &shred) {
conflicting_shred = Some(potential_shred.payload);
}
break;
} else if let Some(potential_shred) =
just_received_coding_shreds.get(&(slot, coding_index))
{
if Self::erasure_mismatch(&potential_shred, &shred) {
conflicting_shred = Some(potential_shred.payload.clone());
}
break;
}
}
conflicting_shred
}
#[allow(clippy::too_many_arguments)]
fn check_insert_data_shred<F>(
&self,
shred: Shred,
erasure_metas: &mut HashMap<(u64, u64), ErasureMeta>,
index_working_set: &mut HashMap<u64, IndexMetaWorkingSetEntry>,
slot_meta_working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
write_batch: &mut WriteBatch,
just_inserted_data_shreds: &mut HashMap<(u64, u64), Shred>,
index_meta_time: &mut u64,
is_trusted: bool,
handle_duplicate: &F,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_recovered: bool,
) -> std::result::Result<Vec<(u32, u32)>, InsertDataShredError>
where
F: Fn(Shred),
{
let slot = shred.slot();
let shred_index = u64::from(shred.index());
let index_meta_working_set_entry =
get_index_meta_entry(&self.db, slot, index_working_set, index_meta_time);
let index_meta = &mut index_meta_working_set_entry.index;
let slot_meta_entry =
get_slot_meta_entry(&self.db, slot_meta_working_set, slot, shred.parent());
let slot_meta = &mut slot_meta_entry.new_slot_meta.borrow_mut();
if !is_trusted {
if Self::is_data_shred_present(&shred, slot_meta, &index_meta.data()) {
handle_duplicate(shred);
return Err(InsertDataShredError::Exists);
} else if !self.should_insert_data_shred(
&shred,
slot_meta,
&self.last_root,
leader_schedule,
is_recovered,
) {
return Err(InsertDataShredError::InvalidShred);
}
}
let set_index = u64::from(shred.common_header.fec_set_index);
let newly_completed_data_sets =
self.insert_data_shred(slot_meta, index_meta.data_mut(), &shred, write_batch)?;
just_inserted_data_shreds.insert((slot, shred_index), shred);
index_meta_working_set_entry.did_insert_occur = true;
slot_meta_entry.did_insert_occur = true;
if !erasure_metas.contains_key(&(slot, set_index)) {
if let Some(meta) = self
.erasure_meta_cf
.get((slot, set_index))
.expect("Expect database get to succeed")
{
erasure_metas.insert((slot, set_index), meta);
}
}
Ok(newly_completed_data_sets)
}
fn should_insert_coding_shred(shred: &Shred, last_root: &RwLock<u64>) -> bool {
let slot = shred.slot();
let shred_index = shred.index();
if shred.is_data() || shred_index < u32::from(shred.coding_header.position) {
return false;
}
let set_index = shred.common_header.fec_set_index;
!(shred.coding_header.num_coding_shreds == 0
|| shred.coding_header.position >= shred.coding_header.num_coding_shreds
|| std::u32::MAX - set_index < u32::from(shred.coding_header.num_coding_shreds) - 1
|| slot <= *last_root.read().unwrap()
|| shred.coding_header.num_coding_shreds as u32
> (8 * crate::shred::MAX_DATA_SHREDS_PER_FEC_BLOCK))
}
fn insert_coding_shred(
&self,
index_meta: &mut Index,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<()> {
let slot = shred.slot();
let shred_index = u64::from(shred.index());
assert!(shred.is_code() && shred_index >= u64::from(shred.coding_header.position));
write_batch.put_bytes::<cf::ShredCode>((slot, shred_index), &shred.payload)?;
index_meta.coding_mut().set_present(shred_index, true);
Ok(())
}
fn is_data_shred_present(shred: &Shred, slot_meta: &SlotMeta, data_index: &ShredIndex) -> bool {
let shred_index = u64::from(shred.index());
shred_index < slot_meta.consumed || data_index.is_present(shred_index)
}
fn should_insert_data_shred(
&self,
shred: &Shred,
slot_meta: &SlotMeta,
last_root: &RwLock<u64>,
leader_schedule: Option<&Arc<LeaderScheduleCache>>,
is_recovered: bool,
) -> bool {
let shred_index = u64::from(shred.index());
let slot = shred.slot();
let last_in_slot = if shred.last_in_slot() {
debug!("got last in slot");
true
} else {
false
};
let last_index = slot_meta.last_index;
if shred_index >= last_index {
let leader_pubkey = leader_schedule
.map(|leader_schedule| leader_schedule.slot_leader_at(slot, None))
.unwrap_or(None);
let ending_shred = self.get_data_shred(slot, last_index).unwrap().unwrap();
if self
.store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone())
.is_err()
{
warn!("store duplicate error");
}
datapoint_error!(
"blockstore_error",
(
"error",
format!(
"Leader {:?}, slot {}: received index {} >= slot.last_index {}, is_recovered: {}",
leader_pubkey, slot, shred_index, last_index, is_recovered
),
String
)
);
return false;
}
if last_in_slot && shred_index < slot_meta.received {
let leader_pubkey = leader_schedule
.map(|leader_schedule| leader_schedule.slot_leader_at(slot, None))
.unwrap_or(None);
let ending_shred = self
.get_data_shred(slot, slot_meta.received - 1)
.unwrap()
.unwrap();
if self
.store_duplicate_if_not_existing(slot, ending_shred, shred.payload.clone())
.is_err()
{
warn!("store duplicate error");
}
datapoint_error!(
"blockstore_error",
(
"error",
format!(
"Leader {:?}, slot {}: received shred_index {} < slot.received {}, is_recovered: {}",
leader_pubkey, slot, shred_index, slot_meta.received, is_recovered
),
String
)
);
return false;
}
let last_root = *last_root.read().unwrap();
verify_shred_slots(slot, slot_meta.parent_slot, last_root)
}
fn insert_data_shred(
&self,
slot_meta: &mut SlotMeta,
data_index: &mut ShredIndex,
shred: &Shred,
write_batch: &mut WriteBatch,
) -> Result<Vec<(u32, u32)>> {
let slot = shred.slot();
let index = u64::from(shred.index());
let last_in_slot = if shred.last_in_slot() {
debug!("got last in slot");
true
} else {
false
};
let last_in_data = if shred.data_complete() {
debug!("got last in data");
true
} else {
false
};
assert!(!is_orphan(slot_meta));
let new_consumed = if slot_meta.consumed == index {
let mut current_index = index + 1;
while data_index.is_present(current_index) {
current_index += 1;
}
current_index
} else {
slot_meta.consumed
};
write_batch.put_bytes::<cf::ShredData>((slot, index), &shred.payload)?;
data_index.set_present(index, true);
let newly_completed_data_sets = update_slot_meta(
last_in_slot,
last_in_data,
slot_meta,
index as u32,
new_consumed,
shred.reference_tick(),
&data_index,
);
if slot_meta.is_full() {
datapoint_info!(
"shred_insert_is_full",
(
"total_time_ms",
solana_sdk::timing::timestamp() - slot_meta.first_shred_timestamp,
i64
),
("slot", slot_meta.slot, i64),
("last_index", slot_meta.last_index, i64),
);
}
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(newly_completed_data_sets)
}
pub fn get_data_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
self.data_shred_cf.get_bytes((slot, index))
}
pub fn get_data_shreds_for_slot(
&self,
slot: Slot,
start_index: u64,
) -> ShredResult<Vec<Shred>> {
self.slot_data_iterator(slot, start_index)
.expect("blockstore couldn't fetch iterator")
.map(|data| Shred::new_from_serialized_shred(data.1.to_vec()))
.collect()
}
pub fn get_data_shreds(
&self,
slot: Slot,
from_index: u64,
to_index: u64,
buffer: &mut [u8],
) -> Result<(u64, usize)> {
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let meta_cf = self.db.column::<cf::SlotMeta>();
let mut buffer_offset = 0;
let mut last_index = 0;
if let Some(meta) = meta_cf.get(slot)? {
if !meta.is_full() {
warn!("The slot is not yet full. Will not return any shreds");
return Ok((last_index, buffer_offset));
}
let to_index = cmp::min(to_index, meta.consumed);
for index in from_index..to_index {
if let Some(shred_data) = self.get_data_shred(slot, index)? {
let shred_len = shred_data.len();
if buffer.len().saturating_sub(buffer_offset) >= shred_len {
buffer[buffer_offset..buffer_offset + shred_len]
.copy_from_slice(&shred_data[..shred_len]);
buffer_offset += shred_len;
last_index = index;
if buffer.len().saturating_sub(buffer_offset) < shred_len {
break;
}
} else {
break;
}
}
}
}
Ok((last_index, buffer_offset))
}
pub fn get_coding_shred(&self, slot: Slot, index: u64) -> Result<Option<Vec<u8>>> {
self.code_shred_cf.get_bytes((slot, index))
}
pub fn get_coding_shreds_for_slot(
&self,
slot: Slot,
start_index: u64,
) -> ShredResult<Vec<Shred>> {
self.slot_coding_iterator(slot, start_index)
.expect("blockstore couldn't fetch iterator")
.map(|code| Shred::new_from_serialized_shred(code.1.to_vec()))
.collect()
}
#[allow(clippy::too_many_arguments)]
pub fn write_entries(
&self,
start_slot: Slot,
num_ticks_in_start_slot: u64,
start_index: u32,
ticks_per_slot: u64,
parent: Option<u64>,
is_full_slot: bool,
keypair: &Arc<Keypair>,
entries: Vec<Entry>,
version: u16,
) -> Result<usize> {
let mut parent_slot = parent.map_or(start_slot.saturating_sub(1), |v| v);
let num_slots = (start_slot - parent_slot).max(1);
assert!(num_ticks_in_start_slot < num_slots * ticks_per_slot);
let mut remaining_ticks_in_slot = num_slots * ticks_per_slot - num_ticks_in_start_slot;
let mut current_slot = start_slot;
let mut shredder =
Shredder::new(current_slot, parent_slot, 0.0, keypair.clone(), 0, version)
.expect("Failed to create entry shredder");
let mut all_shreds = vec![];
let mut slot_entries = vec![];
for entry in entries.into_iter() {
if remaining_ticks_in_slot == 0 {
current_slot += 1;
parent_slot = current_slot - 1;
remaining_ticks_in_slot = ticks_per_slot;
let mut current_entries = vec![];
std::mem::swap(&mut slot_entries, &mut current_entries);
let start_index = {
if all_shreds.is_empty() {
start_index
} else {
0
}
};
let (mut data_shreds, mut coding_shreds, _) =
shredder.entries_to_shreds(¤t_entries, true, start_index);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
shredder = Shredder::new(
current_slot,
parent_slot,
0.0,
keypair.clone(),
(ticks_per_slot - remaining_ticks_in_slot) as u8,
version,
)
.expect("Failed to create entry shredder");
}
if entry.is_tick() {
remaining_ticks_in_slot -= 1;
}
slot_entries.push(entry);
}
if !slot_entries.is_empty() {
let (mut data_shreds, mut coding_shreds, _) =
shredder.entries_to_shreds(&slot_entries, is_full_slot, 0);
all_shreds.append(&mut data_shreds);
all_shreds.append(&mut coding_shreds);
}
let num_shreds = all_shreds.len();
self.insert_shreds(all_shreds, None, false)?;
Ok(num_shreds)
}
pub fn get_index(&self, slot: Slot) -> Result<Option<Index>> {
self.index_cf.get(slot)
}
pub fn put_meta_bytes(&self, slot: Slot, bytes: &[u8]) -> Result<()> {
self.meta_cf.put_bytes(slot, bytes)
}
fn find_missing_indexes<C>(
db_iterator: &mut DBRawIterator,
slot: Slot,
first_timestamp: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64>
where
C: Column<Index = (u64, u64)>,
{
if start_index >= end_index || max_missing == 0 {
return vec![];
}
let mut missing_indexes = vec![];
let ticks_since_first_insert =
DEFAULT_TICKS_PER_SECOND * (timestamp() - first_timestamp) / 1000;
db_iterator.seek(&C::key((slot, start_index)));
let mut prev_index = start_index;
'outer: loop {
if !db_iterator.valid() {
for i in prev_index..end_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break;
}
}
break;
}
let (current_slot, index) = C::index(&db_iterator.key().expect("Expect a valid key"));
let current_index = {
if current_slot > slot {
end_index
} else {
index
}
};
let upper_index = cmp::min(current_index, end_index);
let reference_tick = u64::from(Shred::reference_tick_from_data(
&db_iterator.value().expect("couldn't read value"),
));
if ticks_since_first_insert < reference_tick + MAX_TURBINE_DELAY_IN_TICKS {
break 'outer;
}
for i in prev_index..upper_index {
missing_indexes.push(i);
if missing_indexes.len() == max_missing {
break 'outer;
}
}
if current_slot > slot {
break;
}
if current_index >= end_index {
break;
}
prev_index = current_index + 1;
db_iterator.next();
}
missing_indexes
}
pub fn find_missing_data_indexes(
&self,
slot: Slot,
first_timestamp: u64,
start_index: u64,
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
if let Ok(mut db_iterator) = self
.db
.raw_iterator_cf(self.db.cf_handle::<cf::ShredData>())
{
Self::find_missing_indexes::<cf::ShredData>(
&mut db_iterator,
slot,
first_timestamp,
start_index,
end_index,
max_missing,
)
} else {
vec![]
}
}
pub fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_block_time".to_string(), String)
);
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp);
}
self.blocktime_cf.get(slot)
}
pub fn cache_block_time(&self, slot: Slot, timestamp: UnixTimestamp) -> Result<()> {
if !self.is_root(slot) {
return Err(BlockstoreError::SlotNotRooted);
}
self.blocktime_cf.put(slot, ×tamp)
}
pub fn get_first_available_block(&self) -> Result<Slot> {
let mut root_iterator = self.rooted_slot_iterator(self.lowest_slot())?;
Ok(root_iterator.next().unwrap_or_default())
}
pub fn evm_blocks_iterator(
&self,
block_num: evm::BlockNum,
) -> Result<impl Iterator<Item = ((evm::BlockNum, Option<Slot>), evm::BlockHeader)> + '_> {
let blocks_headers = self.evm_blocks_cf.iter(IteratorMode::From(
cf::EvmBlockHeader::as_index(block_num),
IteratorDirection::Forward,
))?;
Ok(blocks_headers.map(move |(block_num, block_header)| {
(
block_num,
self.evm_blocks_cf
.deserialize_protobuf_or_bincode::<evm::BlockHeader>(&block_header)
.unwrap_or_else(|e| {
panic!(
"Could not deserialize BlockHeader for block_num {} slot {:?}: {:?}",
block_num.0, block_num.1, e
)
})
.try_into()
.expect("Convertation should always pass"),
)
}))
}
pub fn get_first_available_evm_block(&self) -> Result<evm::BlockNum> {
Ok(self
.evm_blocks_cf
.iter(IteratorMode::Start)?
.map(|((block, _slot), _)| block)
.next()
.unwrap_or(evm::BlockNum::MAX))
}
pub fn get_last_available_evm_block(&self) -> Result<Option<evm::BlockNum>> {
Ok(self
.evm_blocks_cf
.iter(IteratorMode::End)?
.map(|((block, _slot), _)| block)
.next())
}
pub fn get_confirmed_block_hash(&self, slot: Slot) -> Result<String> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_confirmed_block_hash".to_string(), String)
);
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp);
}
if self.is_root(slot) {
let slot_entries = self.get_slot_entries(slot, 0)?;
if !slot_entries.is_empty() {
let blockhash = get_last_hash(slot_entries.iter())
.unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot));
return Ok(blockhash.to_string());
}
}
Err(BlockstoreError::SlotNotRooted)
}
pub fn get_confirmed_block(
&self,
slot: Slot,
require_previous_blockhash: bool,
) -> Result<ConfirmedBlock> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_confirmed_block".to_string(), String)
);
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot {
return Err(BlockstoreError::SlotCleanedUp);
}
if self.is_root(slot) {
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = match slot_meta_cf.get(slot)? {
Some(slot_meta) => slot_meta,
None => {
info!("SlotMeta not found for rooted slot {}", slot);
return Err(BlockstoreError::SlotCleanedUp);
}
};
let slot_entries = self.get_slot_entries(slot, 0)?;
if !slot_entries.is_empty() {
let slot_transaction_iterator = slot_entries
.iter()
.cloned()
.flat_map(|entry| entry.transactions)
.map(|transaction| {
if let Err(err) = transaction.sanitize() {
warn!(
"Blockstore::get_confirmed_block sanitize failed: {:?}, \
slot: {:?}, \
{:?}",
err, slot, transaction,
);
}
transaction
});
let parent_slot_entries = self
.get_slot_entries(slot_meta.parent_slot, 0)
.unwrap_or_default();
if parent_slot_entries.is_empty() && require_previous_blockhash {
return Err(BlockstoreError::ParentEntriesUnavailable);
}
let previous_blockhash = if !parent_slot_entries.is_empty() {
get_last_hash(parent_slot_entries.iter()).unwrap()
} else {
Hash::default()
};
let blockhash = get_last_hash(slot_entries.iter())
.unwrap_or_else(|| panic!("Rooted slot {:?} must have blockhash", slot));
let rewards = self
.rewards_cf
.get_protobuf_or_bincode::<StoredExtendedRewards>(slot)?
.unwrap_or_default()
.into();
let block_time = self.blocktime_cf.get(slot)?;
let block = ConfirmedBlock {
previous_blockhash: previous_blockhash.to_string(),
blockhash: blockhash.to_string(),
parent_slot: slot_meta.parent_slot,
transactions: self
.map_transactions_to_statuses(slot, slot_transaction_iterator),
rewards,
block_time,
};
return Ok(block);
}
}
Err(BlockstoreError::SlotNotRooted)
}
pub fn get_evm_block(&self, block_number: evm::BlockNum) -> Result<(evm::Block, bool)> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_evm_block".to_string(), String)
);
let mut block_headers = self.read_evm_block_headers(block_number)?;
if block_headers.is_empty() {
return Err(BlockstoreError::SlotCleanedUp);
};
let confirmed_block = block_headers
.iter()
.enumerate()
.find(|(_idx, b)| self.is_root(b.native_chain_slot))
.map(|(idx, _b)| idx);
let block_header = block_headers.remove(confirmed_block.unwrap_or_default());
let mut txs = Vec::new();
for hash in &block_header.transactions {
let tx = self.read_evm_transaction((
*hash,
block_header.block_number,
Some(block_header.native_chain_slot),
))?;
if let Some(tx) = tx {
txs.push((*hash, tx))
} else {
warn!(
"Evm transaction = {}, was cleanedup, while block still exist",
hash
);
};
}
let confirmed = self.is_root(block_header.native_chain_slot);
Ok((
evm::Block {
header: block_header,
transactions: txs,
},
confirmed,
))
}
fn map_transactions_to_statuses<'a>(
&self,
slot: Slot,
iterator: impl Iterator<Item = Transaction> + 'a,
) -> Vec<TransactionWithStatusMeta> {
iterator
.map(|transaction| {
let signature = transaction.signatures[0];
TransactionWithStatusMeta {
transaction,
meta: self
.read_transaction_status((signature, slot))
.ok()
.flatten(),
}
})
.collect()
}
fn initialize_transaction_status_index(&self) -> Result<()> {
self.transaction_status_index_cf
.put(0, &TransactionStatusIndexMeta::default())?;
self.transaction_status_index_cf
.put(1, &TransactionStatusIndexMeta::default())?;
let default_status = TransactionStatusMeta::default().into();
self.transaction_status_cf
.put_protobuf(cf::TransactionStatus::as_index(2), &default_status)?;
self.address_signatures_cf.put(
cf::AddressSignatures::as_index(2),
&AddressSignatureMeta::default(),
)
}
fn toggle_transaction_status_index(
&self,
batch: &mut WriteBatch,
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<Option<u64>> {
let index0 = self.transaction_status_index_cf.get(0)?;
if index0.is_none() {
return Ok(None);
}
let mut index0 = index0.unwrap();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap();
if !index0.frozen && !index1.frozen {
index0.frozen = true;
*w_active_transaction_status_index = 1;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
Ok(None)
} else {
let result = if index0.frozen && to_slot > index0.max_slot {
debug!("Pruning transaction index 0 at slot {}", index0.max_slot);
Some(0)
} else if index1.frozen && to_slot > index1.max_slot {
debug!("Pruning transaction index 1 at slot {}", index1.max_slot);
Some(1)
} else {
None
};
if result.is_some() {
*w_active_transaction_status_index = if index0.frozen { 0 } else { 1 };
if index0.frozen {
index0.max_slot = 0
};
index0.frozen = !index0.frozen;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
if index1.frozen {
index1.max_slot = 0
};
index1.frozen = !index1.frozen;
batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
}
Ok(result)
}
}
fn get_primary_index(
&self,
slot: Slot,
w_active_transaction_status_index: &mut u64,
) -> Result<u64> {
let i = *w_active_transaction_status_index;
let mut index_meta = self.transaction_status_index_cf.get(i)?.unwrap();
if slot > index_meta.max_slot {
assert!(!index_meta.frozen);
index_meta.max_slot = slot;
self.transaction_status_index_cf.put(i, &index_meta)?;
}
Ok(i)
}
pub fn read_transaction_status(
&self,
index: (Signature, Slot),
) -> Result<Option<TransactionStatusMeta>> {
let (signature, slot) = index;
let result = self
.transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((0, signature, slot))?;
if result.is_none() {
Ok(self
.transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((1, signature, slot))?
.and_then(|meta| meta.try_into().ok()))
} else {
Ok(result.and_then(|meta| meta.try_into().ok()))
}
}
pub fn write_transaction_status(
&self,
slot: Slot,
signature: Signature,
writable_keys: Vec<&Pubkey>,
readonly_keys: Vec<&Pubkey>,
status: TransactionStatusMeta,
) -> Result<()> {
let status = status.into();
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
let primary_index = self.get_primary_index(slot, &mut w_active_transaction_status_index)?;
self.transaction_status_cf
.put_protobuf((primary_index, signature, slot), &status)?;
for address in writable_keys {
self.address_signatures_cf.put(
(primary_index, *address, slot, signature),
&AddressSignatureMeta { writeable: true },
)?;
}
for address in readonly_keys {
self.address_signatures_cf.put(
(primary_index, *address, slot, signature),
&AddressSignatureMeta { writeable: false },
)?;
}
Ok(())
}
fn get_transaction_status_with_counter(
&self,
signature: Signature,
) -> Result<(Option<(Slot, TransactionStatusMeta)>, u64)> {
let mut counter = 0;
for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.transaction_status_cf.iter(IteratorMode::From(
(transaction_status_cf_primary_index, signature, 0),
IteratorDirection::Forward,
))?;
for ((i, sig, slot), _data) in index_iterator {
counter += 1;
if i != transaction_status_cf_primary_index || sig != signature {
break;
}
if self.is_root(slot) {
let status = self
.transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((i, sig, slot))?
.and_then(|status| status.try_into().ok())
.map(|status| (slot, status));
return Ok((status, counter));
}
}
}
Ok((None, counter))
}
pub fn get_transaction_status(
&self,
signature: Signature,
) -> Result<Option<(Slot, TransactionStatusMeta)>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_transaction_status".to_string(), String)
);
self.get_transaction_status_with_counter(signature)
.map(|(status, _)| status)
}
pub fn get_confirmed_transaction(
&self,
signature: Signature,
) -> Result<Option<ConfirmedTransaction>> {
datapoint_info!(
"blockstore-rpc-api",
("method", "get_confirmed_transaction".to_string(), String)
);
if let Some((slot, status)) = self.get_transaction_status(signature)? {
let transaction = self
.find_transaction_in_slot(slot, signature)?
.ok_or(BlockstoreError::TransactionStatusSlotMismatch)?;
let block_time = self.get_block_time(slot)?;
Ok(Some(ConfirmedTransaction {
slot,
transaction: TransactionWithStatusMeta {
transaction,
meta: Some(status),
},
block_time,
}))
} else {
Ok(None)
}
}
fn find_transaction_in_slot(
&self,
slot: Slot,
signature: Signature,
) -> Result<Option<Transaction>> {
let slot_entries = self.get_slot_entries(slot, 0)?;
Ok(slot_entries
.iter()
.cloned()
.flat_map(|entry| entry.transactions)
.map(|transaction| {
if let Err(err) = transaction.sanitize() {
warn!(
"Blockstore::find_transaction_in_slot sanitize failed: {:?}, \
slot: {:?}, \
{:?}",
err, slot, transaction,
);
}
transaction
})
.find(|transaction| transaction.signatures[0] == signature))
}
fn find_address_signatures(
&self,
pubkey: Pubkey,
start_slot: Slot,
end_slot: Slot,
) -> Result<Vec<(Slot, Signature)>> {
let mut signatures: Vec<(Slot, Signature)> = vec![];
for transaction_status_cf_primary_index in 0..=1 {
let index_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(
transaction_status_cf_primary_index,
pubkey,
start_slot,
Signature::default(),
),
IteratorDirection::Forward,
))?;
for ((i, address, slot, signature), _) in index_iterator {
if i != transaction_status_cf_primary_index || slot > end_slot || address != pubkey
{
break;
}
if self.is_root(slot) {
signatures.push((slot, signature));
}
}
}
signatures.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap().then(a.1.cmp(&b.1)));
Ok(signatures)
}
pub fn get_confirmed_signatures_for_address(
&self,
pubkey: Pubkey,
start_slot: Slot,
end_slot: Slot,
) -> Result<Vec<Signature>> {
datapoint_info!(
"blockstore-rpc-api",
(
"method",
"get_confirmed_signatures_for_address".to_string(),
String
)
);
self.find_address_signatures(pubkey, start_slot, end_slot)
.map(|signatures| signatures.iter().map(|(_, signature)| *signature).collect())
}
pub fn get_confirmed_signatures_for_address2(
&self,
address: Pubkey,
highest_confirmed_root: Slot,
before: Option<Signature>,
until: Option<Signature>,
limit: usize,
) -> Result<Vec<ConfirmedTransactionStatusWithSignature>> {
datapoint_info!(
"blockstore-rpc-api",
(
"method",
"get_confirmed_signatures_for_address2".to_string(),
String
)
);
let mut get_before_slot_timer = Measure::start("get_before_slot_timer");
let (slot, mut before_excluded_signatures) = match before {
None => (highest_confirmed_root, None),
Some(before) => {
let transaction_status = self.get_transaction_status(before)?;
match transaction_status {
None => return Ok(vec![]),
Some((slot, _)) => {
let confirmed_block =
self.get_confirmed_block(slot, false).map_err(|err| {
BlockstoreError::Io(IoError::new(
ErrorKind::Other,
format!("Unable to get confirmed block: {}", err),
))
})?;
let mut slot_signatures: Vec<_> = confirmed_block
.transactions
.into_iter()
.filter_map(|transaction_with_meta| {
transaction_with_meta
.transaction
.signatures
.into_iter()
.next()
})
.collect();
slot_signatures.sort();
slot_signatures.reverse();
if let Some(pos) = slot_signatures.iter().position(|&x| x == before) {
slot_signatures.truncate(pos + 1);
}
(
slot,
Some(slot_signatures.into_iter().collect::<HashSet<_>>()),
)
}
}
}
};
get_before_slot_timer.stop();
let mut get_until_slot_timer = Measure::start("get_until_slot_timer");
let (lowest_slot, until_excluded_signatures) = match until {
None => (0, HashSet::new()),
Some(until) => {
let transaction_status = self.get_transaction_status(until)?;
match transaction_status {
None => (0, HashSet::new()),
Some((slot, _)) => {
let confirmed_block =
self.get_confirmed_block(slot, false).map_err(|err| {
BlockstoreError::Io(IoError::new(
ErrorKind::Other,
format!("Unable to get confirmed block: {}", err),
))
})?;
let mut slot_signatures: Vec<_> = confirmed_block
.transactions
.into_iter()
.filter_map(|transaction_with_meta| {
transaction_with_meta
.transaction
.signatures
.into_iter()
.next()
})
.collect();
slot_signatures.sort();
slot_signatures.reverse();
if let Some(pos) = slot_signatures.iter().position(|&x| x == until) {
slot_signatures = slot_signatures.split_off(pos);
}
(slot, slot_signatures.into_iter().collect::<HashSet<_>>())
}
}
}
};
get_until_slot_timer.stop();
let first_available_block = self.get_first_available_block()?;
let mut address_signatures = vec![];
let mut get_initial_slot_timer = Measure::start("get_initial_slot_timer");
let mut signatures = self.find_address_signatures(address, slot, slot)?;
signatures.reverse();
if let Some(excluded_signatures) = before_excluded_signatures.take() {
address_signatures.extend(
signatures
.into_iter()
.filter(|(_, signature)| !excluded_signatures.contains(&signature)),
)
} else {
address_signatures.append(&mut signatures);
}
get_initial_slot_timer.stop();
let starting_primary_index = *self.active_transaction_status_index.read().unwrap();
let next_primary_index = if starting_primary_index == 0 { 1 } else { 0 };
let next_max_slot = self
.transaction_status_index_cf
.get(next_primary_index)?
.unwrap()
.max_slot;
let mut starting_primary_index_iter_timer = Measure::start("starting_primary_index_iter");
if slot > next_max_slot {
let mut starting_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(starting_primary_index, address, slot, Signature::default()),
IteratorDirection::Reverse,
))?;
while address_signatures.len() < limit {
if let Some(((i, key_address, slot, signature), _)) = starting_iterator.next() {
if slot == next_max_slot || slot < lowest_slot {
break;
}
if i == starting_primary_index
&& key_address == address
&& slot >= first_available_block
{
if self.is_root(slot) {
address_signatures.push((slot, signature));
}
continue;
}
}
break;
}
if next_max_slot >= lowest_slot {
let mut signatures =
self.find_address_signatures(address, next_max_slot, next_max_slot)?;
signatures.reverse();
address_signatures.append(&mut signatures);
}
}
starting_primary_index_iter_timer.stop();
let mut next_primary_index_iter_timer = Measure::start("next_primary_index_iter_timer");
let mut next_iterator = self.address_signatures_cf.iter(IteratorMode::From(
(next_primary_index, address, slot, Signature::default()),
IteratorDirection::Reverse,
))?;
while address_signatures.len() < limit {
if let Some(((i, key_address, slot, signature), _)) = next_iterator.next() {
if slot == next_max_slot {
continue;
}
if slot < lowest_slot {
break;
}
if i == next_primary_index
&& key_address == address
&& slot >= first_available_block
{
if self.is_root(slot) {
address_signatures.push((slot, signature));
}
continue;
}
}
break;
}
next_primary_index_iter_timer.stop();
let mut address_signatures: Vec<(Slot, Signature)> = address_signatures
.into_iter()
.filter(|(_, signature)| !until_excluded_signatures.contains(&signature))
.collect();
address_signatures.truncate(limit);
let mut get_status_info_timer = Measure::start("get_status_info_timer");
let mut infos = vec![];
for (slot, signature) in address_signatures.into_iter() {
let transaction_status = self.get_transaction_status(signature)?;
let err = match transaction_status {
None => None,
Some((_slot, status)) => status.status.err(),
};
let block_time = self.get_block_time(slot)?;
infos.push(ConfirmedTransactionStatusWithSignature {
signature,
slot,
err,
memo: None,
block_time,
});
}
get_status_info_timer.stop();
datapoint_info!(
"blockstore-get-conf-sigs-for-addr-2",
(
"get_before_slot_us",
get_before_slot_timer.as_us() as i64,
i64
),
(
"get_initial_slot_us",
get_initial_slot_timer.as_us() as i64,
i64
),
(
"starting_primary_index_iter_us",
starting_primary_index_iter_timer.as_us() as i64,
i64
),
(
"next_primary_index_iter_us",
next_primary_index_iter_timer.as_us() as i64,
i64
),
(
"get_status_info_us",
get_status_info_timer.as_us() as i64,
i64
),
(
"get_until_slot_us",
get_until_slot_timer.as_us() as i64,
i64
)
);
Ok(infos)
}
pub fn read_rewards(&self, index: Slot) -> Result<Option<Rewards>> {
self.rewards_cf
.get_protobuf_or_bincode::<Rewards>(index)
.map(|result| result.map(|option| option.into()))
}
pub fn write_rewards(&self, index: Slot, rewards: Rewards) -> Result<()> {
let rewards = rewards.into();
self.rewards_cf.put_protobuf(index, &rewards)
}
pub fn get_recent_perf_samples(&self, num: usize) -> Result<Vec<(Slot, PerfSample)>> {
Ok(self
.db
.iter::<cf::PerfSamples>(IteratorMode::End)?
.take(num)
.map(|(slot, data)| {
let perf_sample = deserialize(&data).unwrap();
(slot, perf_sample)
})
.collect())
}
pub fn write_perf_sample(&self, index: Slot, perf_sample: &PerfSample) -> Result<()> {
self.perf_samples_cf.put(index, perf_sample)
}
pub fn write_evm_block_header(&self, block: &evm::BlockHeader) -> Result<()> {
let proto_block = block.clone().into();
self.evm_blocks_cf.put_protobuf(
(block.block_number, Some(block.native_chain_slot)),
&proto_block,
)?;
self.write_evm_block_id_by_hash(block.native_chain_slot, block.hash(), block.block_number)
}
pub fn read_evm_block_headers(
&self,
block_index: evm::BlockNum,
) -> Result<Vec<evm::BlockHeader>> {
Ok(self
.evm_blocks_iterator(block_index)?
.take_while(|((block_num, _slot), _block)| *block_num == block_index)
.map(|((_block_num, _slot), block)| block)
.collect())
}
pub fn write_evm_block_id_by_hash(
&self,
slot: Slot,
hash: H256,
id: evm_state::BlockNum,
) -> Result<()> {
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
let primary_index = self.get_primary_index(slot, &mut w_active_transaction_status_index)?;
self.evm_blocks_by_hash_cf
.put_protobuf((primary_index, hash), &id)
}
pub fn read_evm_block_id_by_hash(&self, hash: H256) -> Result<Option<evm_state::BlockNum>> {
let result = self
.evm_blocks_by_hash_cf
.get_protobuf_or_bincode::<evm_state::BlockNum>((0, hash))?;
if result.is_none() {
Ok(self
.evm_blocks_by_hash_cf
.get_protobuf_or_bincode::<evm_state::BlockNum>((1, hash))?)
} else {
Ok(result)
}
}
pub fn read_evm_transaction(
&self,
index: (H256, evm::BlockNum, Option<Slot>),
) -> Result<Option<evm::TransactionReceipt>> {
let (hash, block_num, slot) = index;
for slot in slot.map(Some).into_iter().chain(Some(None)) {
for primary_index in 0..=1 {
if let Some(tx) = self
.evm_transactions_cf
.get_protobuf_or_bincode::<evm::TransactionReceipt>(
EvmTransactionReceiptsIndex {
index: primary_index,
hash,
block_num,
slot,
},
)?
.and_then(|meta| meta.try_into().ok())
{
return Ok(Some(tx));
}
}
}
Ok(None)
}
pub fn filter_logs(&self, filter: evm::LogFilter) -> Result<Vec<evm::LogWithLocation>> {
let mut logs = Vec::new();
let masks = filter.bloom_possibilities();
info!("Starting search for logs with filter = {:?}", filter);
for ((block_num, slot), block) in self.evm_blocks_iterator(filter.from_block)? {
trace!("Searching block = {}, slot({:?})", block_num, slot);
if block_num > filter.to_block {
break;
}
if !masks
.iter()
.any(|mask| block.logs_bloom.contains_bloom(mask))
{
trace!(
"Blocks not matching bloom filter blocks_bloom = {:?}, blooms={:?}",
block.logs_bloom,
masks
);
continue;
}
for (id, hash) in block.transactions.iter().enumerate() {
let tx = self.read_evm_transaction((
*hash,
block.block_number,
Some(block.native_chain_slot),
))?;
let tx = if let Some(tx) = tx {
tx
} else {
warn!(
"Evm transaction = {}, was cleanedup, while block still exist",
hash
);
continue;
};
if !masks.iter().any(|mask| tx.logs_bloom.contains_bloom(mask)) {
trace!(
"tx not matching bloom filter blocks_bloom = {:?}, blooms={:?}",
tx.logs_bloom,
masks
);
continue;
}
tx.logs.into_iter().for_each(|log| {
if filter.is_log_match(&log) {
trace!("Adding transaction log to result = {:?}", log);
logs.push(evm::LogWithLocation {
transaction_hash: *hash,
transaction_id: id as u64,
block_num: block.block_number,
data: log.data,
topics: log.topics,
address: log.address,
})
}
});
}
}
Ok(logs)
}
pub fn find_evm_transaction(&self, hash: H256) -> Result<Option<evm::TransactionReceipt>> {
let mut transactions: Vec<_> = self
.evm_transactions_cf
.iter(IteratorMode::From(
EvmTransactionReceiptsIndex {
index: 0,
hash,
block_num: 0,
slot: None,
},
IteratorDirection::Forward,
))?
.take_while(
|(
EvmTransactionReceiptsIndex {
hash: found_hash, ..
},
_data,
)| *found_hash == hash,
)
.collect();
transactions.extend(
self.evm_transactions_cf
.iter(IteratorMode::From(
EvmTransactionReceiptsIndex {
index: 1,
hash,
block_num: 0,
slot: None,
},
IteratorDirection::Forward,
))?
.take_while(
|(
EvmTransactionReceiptsIndex {
hash: found_hash, ..
},
_data,
)| *found_hash == hash,
),
);
let mut confirmed_transaction_data = transactions
.iter()
.find(|(EvmTransactionReceiptsIndex { slot, .. }, _)| {
if let Some(slot) = slot {
if self.is_root(*slot) {
return true;
}
}
false
})
.map(|(_, data)| data);
if confirmed_transaction_data.is_none() {
for (
EvmTransactionReceiptsIndex {
hash, block_num, ..
},
data,
) in &transactions
{
let blocks = if let Ok(b) = self.evm_blocks_iterator(*block_num) {
b
} else {
continue;
};
let confirmed_block_found = blocks
.take_while(|((block_index, _slot), _block)| block_index == block_num)
.any(|(_, header)| {
self.is_root(header.native_chain_slot) && header.transactions.contains(hash)
});
if confirmed_block_found {
confirmed_transaction_data = Some(data);
break;
}
}
}
let transaction_data =
confirmed_transaction_data.or_else(|| transactions.first().map(|(_, data)| data));
Ok(if let Some(data) = transaction_data {
Some(
self.evm_transactions_cf
.deserialize_protobuf_or_bincode::<evm::TransactionReceipt>(&data)?
.try_into()
.map_err(|e| {
BlockstoreError::ProtobufDecodeError(prost::DecodeError::new(e))
})?,
)
} else {
None
})
}
pub fn write_evm_transaction(
&self,
block_num: evm_state::BlockNum,
slot_index: Slot,
hash: H256,
status: evm::TransactionReceipt,
) -> Result<()> {
let mut w_active_transaction_status_index =
self.active_transaction_status_index.write().unwrap();
let index = self.get_primary_index(block_num, &mut w_active_transaction_status_index)?;
let status = status.into();
self.evm_transactions_cf.put_protobuf(
EvmTransactionReceiptsIndex {
index,
hash,
block_num,
slot: Some(slot_index),
},
&status,
)?;
Ok(())
}
pub fn get_slot_entries(&self, slot: Slot, shred_start_index: u64) -> Result<Vec<Entry>> {
self.get_slot_entries_with_shred_info(slot, shred_start_index, false)
.map(|x| x.0)
}
pub fn get_slot_entries_with_shred_info(
&self,
slot: Slot,
start_index: u64,
allow_dead_slots: bool,
) -> Result<(Vec<Entry>, u64, bool)> {
if self.is_dead(slot) && !allow_dead_slots {
return Err(BlockstoreError::DeadSlot);
}
let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?;
if completed_ranges.is_empty() {
return Ok((vec![], 0, false));
}
let slot_meta = slot_meta.unwrap();
let num_shreds = completed_ranges
.last()
.map(|(_, end_index)| u64::from(*end_index) - start_index + 1)
.unwrap_or(0);
let entries: Result<Vec<Vec<Entry>>> = PAR_THREAD_POOL.with(|thread_pool| {
thread_pool.borrow().install(|| {
completed_ranges
.par_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(
slot,
*start_index,
*end_index,
Some(&slot_meta),
)
})
.collect()
})
});
let entries: Vec<Entry> = entries?.into_iter().flatten().collect();
Ok((entries, num_shreds, slot_meta.is_full()))
}
fn get_completed_ranges(
&self,
slot: Slot,
start_index: u64,
) -> Result<(CompletedRanges, Option<SlotMeta>)> {
let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap();
if *lowest_cleanup_slot > slot {
return Err(BlockstoreError::SlotCleanedUp);
}
let slot_meta_cf = self.db.column::<cf::SlotMeta>();
let slot_meta = slot_meta_cf.get(slot)?;
if slot_meta.is_none() {
return Ok((vec![], slot_meta));
}
let slot_meta = slot_meta.unwrap();
let completed_ranges = Self::get_completed_data_ranges(
start_index as u32,
&slot_meta.completed_data_indexes[..],
slot_meta.consumed as u32,
);
Ok((completed_ranges, Some(slot_meta)))
}
fn get_completed_data_ranges(
mut start_index: u32,
completed_data_end_indexes: &[u32],
consumed: u32,
) -> CompletedRanges {
let mut completed_data_ranges = vec![];
let floor = completed_data_end_indexes
.iter()
.position(|i| *i >= start_index)
.unwrap_or_else(|| completed_data_end_indexes.len());
for i in &completed_data_end_indexes[floor as usize..] {
assert!(*i != consumed);
if *i < consumed {
completed_data_ranges.push((start_index, *i));
start_index = *i + 1;
}
}
completed_data_ranges
}
pub fn get_entries_in_data_block(
&self,
slot: Slot,
start_index: u32,
end_index: u32,
slot_meta: Option<&SlotMeta>,
) -> Result<Vec<Entry>> {
let data_shred_cf = self.db.column::<cf::ShredData>();
let data_shreds: Result<Vec<Shred>> = (start_index..=end_index)
.map(|i| {
data_shred_cf
.get_bytes((slot, u64::from(i)))
.and_then(|serialized_shred| {
if serialized_shred.is_none() {
if let Some(slot_meta) = slot_meta {
panic!(
"Shred with
slot: {},
index: {},
consumed: {},
completed_indexes: {:?}
must exist if shred index was included in a range: {} {}",
slot,
i,
slot_meta.consumed,
slot_meta.completed_data_indexes,
start_index,
end_index
);
} else {
return Err(BlockstoreError::InvalidShredData(Box::new(
bincode::ErrorKind::Custom(format!(
"Missing shred for slot {}, index {}",
slot, i
)),
)));
}
}
Shred::new_from_serialized_shred(serialized_shred.unwrap()).map_err(|err| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(
format!(
"Could not reconstruct shred from shred payload: {:?}",
err
),
)))
})
})
})
.collect();
let data_shreds = data_shreds?;
let last_shred = data_shreds.last().unwrap();
assert!(last_shred.data_complete() || last_shred.last_in_slot());
let deshred_payload = Shredder::deshred(&data_shreds).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"Could not reconstruct data block from constituent shreds, error: {:?}",
e
))))
})?;
debug!("{:?} shreds in last FEC set", data_shreds.len(),);
bincode::deserialize::<Vec<Entry>>(&deshred_payload).map_err(|e| {
BlockstoreError::InvalidShredData(Box::new(bincode::ErrorKind::Custom(format!(
"could not reconstruct entries: {:?}",
e
))))
})
}
fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec<Entry> {
let (completed_ranges, slot_meta) = self
.get_completed_ranges(slot, start_index)
.unwrap_or_default();
if completed_ranges.is_empty() {
return vec![];
}
let slot_meta = slot_meta.unwrap();
let entries: Vec<Vec<Entry>> = PAR_THREAD_POOL_ALL_CPUS.with(|thread_pool| {
thread_pool.borrow().install(|| {
completed_ranges
.par_iter()
.map(|(start_index, end_index)| {
self.get_entries_in_data_block(
slot,
*start_index,
*end_index,
Some(&slot_meta),
)
.unwrap_or_default()
})
.collect()
})
});
entries.into_iter().flatten().collect()
}
pub fn get_slots_since(&self, slots: &[u64]) -> Result<HashMap<u64, Vec<u64>>> {
let slot_metas: Result<Vec<Option<SlotMeta>>> =
slots.iter().map(|slot| self.meta(*slot)).collect();
let slot_metas = slot_metas?;
let result: HashMap<u64, Vec<u64>> = slots
.iter()
.zip(slot_metas)
.filter_map(|(height, meta)| {
meta.map(|meta| {
let valid_next_slots: Vec<u64> = meta
.next_slots
.iter()
.cloned()
.filter(|s| !self.is_dead(*s))
.collect();
(*height, valid_next_slots)
})
})
.collect();
Ok(result)
}
pub fn is_root(&self, slot: Slot) -> bool {
matches!(self.db.get::<cf::Root>(slot), Ok(Some(true)))
}
pub fn is_skipped(&self, slot: Slot) -> bool {
let lowest_root = self
.rooted_slot_iterator(0)
.ok()
.and_then(|mut iter| iter.next())
.unwrap_or_default();
match self.db.get::<cf::Root>(slot).ok().flatten() {
Some(_) => false,
None => slot < self.max_root() && slot > lowest_root,
}
}
pub fn set_roots(&self, rooted_slots: &[u64]) -> Result<()> {
let mut write_batch = self.db.batch()?;
for slot in rooted_slots {
write_batch.put::<cf::Root>(*slot, &true)?;
}
self.db.write(write_batch)?;
let mut last_root = self.last_root.write().unwrap();
if *last_root == std::u64::MAX {
*last_root = 0;
}
*last_root = cmp::max(*rooted_slots.iter().max().unwrap(), *last_root);
Ok(())
}
pub fn is_dead(&self, slot: Slot) -> bool {
matches!(
self.db
.get::<cf::DeadSlots>(slot)
.expect("fetch from DeadSlots column family failed"),
Some(true)
)
}
pub fn set_dead_slot(&self, slot: Slot) -> Result<()> {
self.dead_slots_cf.put(slot, &true)
}
pub fn store_duplicate_if_not_existing(
&self,
slot: Slot,
shred1: Vec<u8>,
shred2: Vec<u8>,
) -> Result<()> {
if !self.has_duplicate_shreds_in_slot(slot) {
self.store_duplicate_slot(slot, shred1, shred2)
} else {
Ok(())
}
}
pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)
}
pub fn get_duplicate_slot(&self, slot: u64) -> Option<DuplicateSlotProof> {
self.duplicate_slots_cf
.get(slot)
.expect("fetch from DuplicateSlots column family failed")
}
pub fn is_shred_duplicate(
&self,
slot: u64,
index: u32,
new_shred: &[u8],
is_data: bool,
) -> Option<Vec<u8>> {
let res = if is_data {
self.get_data_shred(slot, index as u64)
.expect("fetch from DuplicateSlots column family failed")
} else {
self.get_coding_shred(slot, index as u64)
.expect("fetch from DuplicateSlots column family failed")
};
res.map(|existing_shred| {
if existing_shred != new_shred {
Some(existing_shred)
} else {
None
}
})
.unwrap_or(None)
}
pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool {
self.duplicate_slots_cf
.get(slot)
.expect("fetch from DuplicateSlots column family failed")
.is_some()
}
pub fn orphans_iterator(&self, slot: Slot) -> Result<impl Iterator<Item = u64> + '_> {
let orphans_iter = self
.db
.iter::<cf::Orphans>(IteratorMode::From(slot, IteratorDirection::Forward))?;
Ok(orphans_iter.map(|(slot, _)| slot))
}
pub fn dead_slots_iterator(&self, slot: Slot) -> Result<impl Iterator<Item = Slot> + '_> {
let dead_slots_iterator = self
.db
.iter::<cf::DeadSlots>(IteratorMode::From(slot, IteratorDirection::Forward))?;
Ok(dead_slots_iterator.map(|(slot, _)| slot))
}
pub fn last_root(&self) -> Slot {
*self.last_root.read().unwrap()
}
pub fn lowest_slot(&self) -> Slot {
for (slot, meta) in self
.slot_meta_iterator(0)
.expect("unable to iterate over meta")
{
if slot > 0 && meta.received > 0 {
return slot;
}
}
self.last_root()
}
pub fn storage_size(&self) -> Result<u64> {
self.db.storage_size()
}
pub fn is_primary_access(&self) -> bool {
self.db.is_primary_access()
}
}
fn update_completed_data_indexes(
is_last_in_data: bool,
new_shred_index: u32,
received_data_shreds: &ShredIndex,
completed_data_indexes: &mut Vec<u32>,
) -> Vec<(u32, u32)> {
let mut first_greater_pos = None;
let mut prev_completed_shred_index = None;
for (i, completed_data_index) in completed_data_indexes.iter().enumerate() {
assert!(
prev_completed_shred_index.is_none()
|| *completed_data_index > prev_completed_shred_index.unwrap()
);
if *completed_data_index > new_shred_index {
first_greater_pos = Some(i);
break;
}
prev_completed_shred_index = Some(*completed_data_index);
}
let mut check_ranges: Vec<u32> = vec![prev_completed_shred_index
.map(|completed_data_shred_index| completed_data_shred_index + 1)
.unwrap_or(0)];
let mut first_greater_data_complete_index =
first_greater_pos.map(|i| completed_data_indexes[i]);
if is_last_in_data {
if first_greater_pos.is_some() {
check_ranges.push(new_shred_index + 1);
}
completed_data_indexes.insert(
first_greater_pos.unwrap_or_else(|| {
first_greater_data_complete_index = Some(new_shred_index);
completed_data_indexes.len()
}),
new_shred_index,
);
}
if first_greater_data_complete_index.is_none() {
return vec![];
}
check_ranges.push(first_greater_data_complete_index.unwrap() + 1);
let mut completed_data_ranges = vec![];
for range in check_ranges.windows(2) {
let mut is_complete = true;
for shred_index in range[0]..range[1] {
if !received_data_shreds.is_present(shred_index as u64) {
is_complete = false;
break;
}
}
if is_complete {
completed_data_ranges.push((range[0], range[1] - 1));
}
}
completed_data_ranges
}
fn update_slot_meta(
is_last_in_slot: bool,
is_last_in_data: bool,
slot_meta: &mut SlotMeta,
index: u32,
new_consumed: u64,
reference_tick: u8,
received_data_shreds: &ShredIndex,
) -> Vec<(u32, u32)> {
let maybe_first_insert = slot_meta.received == 0;
slot_meta.received = cmp::max((u64::from(index) + 1) as u64, slot_meta.received);
if maybe_first_insert && slot_meta.received > 0 {
let slot_time_elapsed = u64::from(reference_tick) * 1000 / DEFAULT_TICKS_PER_SECOND;
slot_meta.first_shred_timestamp = timestamp() - slot_time_elapsed;
}
slot_meta.consumed = new_consumed;
slot_meta.last_index = {
if slot_meta.last_index == std::u64::MAX {
if is_last_in_slot {
u64::from(index)
} else {
std::u64::MAX
}
} else {
slot_meta.last_index
}
};
update_completed_data_indexes(
is_last_in_slot || is_last_in_data,
index,
received_data_shreds,
&mut slot_meta.completed_data_indexes,
)
}
fn get_index_meta_entry<'a>(
db: &Database,
slot: Slot,
index_working_set: &'a mut HashMap<u64, IndexMetaWorkingSetEntry>,
index_meta_time: &mut u64,
) -> &'a mut IndexMetaWorkingSetEntry {
let index_cf = db.column::<cf::Index>();
let mut total_start = Measure::start("Total elapsed");
let res = index_working_set.entry(slot).or_insert_with(|| {
let newly_inserted_meta = index_cf
.get(slot)
.unwrap()
.unwrap_or_else(|| Index::new(slot));
IndexMetaWorkingSetEntry {
index: newly_inserted_meta,
did_insert_occur: false,
}
});
total_start.stop();
*index_meta_time += total_start.as_us();
res
}
fn get_slot_meta_entry<'a>(
db: &Database,
slot_meta_working_set: &'a mut HashMap<u64, SlotMetaWorkingSetEntry>,
slot: Slot,
parent_slot: Slot,
) -> &'a mut SlotMetaWorkingSetEntry {
let meta_cf = db.column::<cf::SlotMeta>();
slot_meta_working_set.entry(slot).or_insert_with(|| {
if let Some(mut meta) = meta_cf.get(slot).expect("Expect database get to succeed") {
let backup = Some(meta.clone());
if is_orphan(&meta) {
meta.parent_slot = parent_slot;
}
SlotMetaWorkingSetEntry::new(Rc::new(RefCell::new(meta)), backup)
} else {
SlotMetaWorkingSetEntry::new(
Rc::new(RefCell::new(SlotMeta::new(slot, parent_slot))),
None,
)
}
})
}
fn get_last_hash<'a>(iterator: impl Iterator<Item = &'a Entry> + 'a) -> Option<Hash> {
iterator.last().map(|entry| entry.hash)
}
fn is_valid_write_to_slot_0(slot_to_write: u64, parent_slot: Slot, last_root: u64) -> bool {
slot_to_write == 0 && last_root == 0 && parent_slot == 0
}
fn send_signals(
new_shreds_signals: &[SyncSender<bool>],
completed_slots_senders: &[SyncSender<Vec<u64>>],
should_signal: bool,
newly_completed_slots: Vec<u64>,
) {
if should_signal {
for signal in new_shreds_signals {
let _ = signal.try_send(true);
}
}
if !completed_slots_senders.is_empty() && !newly_completed_slots.is_empty() {
let mut slots: Vec<_> = (0..completed_slots_senders.len() - 1)
.map(|_| newly_completed_slots.clone())
.collect();
slots.push(newly_completed_slots);
for (signal, slots) in completed_slots_senders.iter().zip(slots.into_iter()) {
let res = signal.try_send(slots);
if let Err(TrySendError::Full(_)) = res {
datapoint_error!(
"blockstore_error",
(
"error",
"Unable to send newly completed slot because channel is full".to_string(),
String
),
);
}
}
}
}
fn commit_slot_meta_working_set(
slot_meta_working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
completed_slots_senders: &[SyncSender<Vec<u64>>],
write_batch: &mut WriteBatch,
) -> Result<(bool, Vec<u64>)> {
let mut should_signal = false;
let mut newly_completed_slots = vec![];
for (slot, slot_meta_entry) in slot_meta_working_set.iter() {
assert!(slot_meta_entry.did_insert_occur);
let meta: &SlotMeta = &RefCell::borrow(&*slot_meta_entry.new_slot_meta);
let meta_backup = &slot_meta_entry.old_slot_meta;
if !completed_slots_senders.is_empty() && is_newly_completed_slot(meta, meta_backup) {
newly_completed_slots.push(*slot);
}
if Some(meta) != meta_backup.as_ref() {
should_signal = should_signal || slot_has_updates(meta, &meta_backup);
write_batch.put::<cf::SlotMeta>(*slot, &meta)?;
}
}
Ok((should_signal, newly_completed_slots))
}
fn find_slot_meta_else_create<'a>(
db: &Database,
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
chained_slots: &'a mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot_index: u64,
) -> Result<Rc<RefCell<SlotMeta>>> {
let result = find_slot_meta_in_cached_state(working_set, chained_slots, slot_index);
if let Some(slot) = result {
Ok(slot)
} else {
find_slot_meta_in_db_else_create(db, slot_index, chained_slots)
}
}
fn find_slot_meta_in_db_else_create(
db: &Database,
slot: Slot,
insert_map: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
) -> Result<Rc<RefCell<SlotMeta>>> {
if let Some(slot_meta) = db.column::<cf::SlotMeta>().get(slot)? {
insert_map.insert(slot, Rc::new(RefCell::new(slot_meta)));
Ok(insert_map.get(&slot).unwrap().clone())
} else {
insert_map.insert(slot, Rc::new(RefCell::new(SlotMeta::new_orphan(slot))));
Ok(insert_map.get(&slot).unwrap().clone())
}
}
fn find_slot_meta_in_cached_state<'a>(
working_set: &'a HashMap<u64, SlotMetaWorkingSetEntry>,
chained_slots: &'a HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot: Slot,
) -> Option<Rc<RefCell<SlotMeta>>> {
working_set
.get(&slot)
.map(|entry| entry.new_slot_meta.clone())
.or_else(|| chained_slots.get(&slot).cloned())
}
fn handle_chaining(
db: &Database,
write_batch: &mut WriteBatch,
working_set: &mut HashMap<u64, SlotMetaWorkingSetEntry>,
) -> Result<()> {
working_set.retain(|_, entry| entry.did_insert_occur);
let mut new_chained_slots = HashMap::new();
let working_set_slots: Vec<_> = working_set.keys().collect();
for slot in working_set_slots {
handle_chaining_for_slot(db, write_batch, working_set, &mut new_chained_slots, *slot)?;
}
for (slot, meta) in new_chained_slots.iter() {
let meta: &SlotMeta = &RefCell::borrow(&*meta);
write_batch.put::<cf::SlotMeta>(*slot, meta)?;
}
Ok(())
}
fn handle_chaining_for_slot(
db: &Database,
write_batch: &mut WriteBatch,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot: Slot,
) -> Result<()> {
let slot_meta_entry = working_set
.get(&slot)
.expect("Slot must exist in the working_set hashmap");
let meta = &slot_meta_entry.new_slot_meta;
let meta_backup = &slot_meta_entry.old_slot_meta;
{
let mut meta_mut = meta.borrow_mut();
let was_orphan_slot = meta_backup.is_some() && is_orphan(meta_backup.as_ref().unwrap());
if slot != 0 {
let prev_slot = meta_mut.parent_slot;
if meta_backup.is_none() || was_orphan_slot {
let prev_slot_meta =
find_slot_meta_else_create(db, working_set, new_chained_slots, prev_slot)?;
chain_new_slot_to_prev_slot(&mut prev_slot_meta.borrow_mut(), slot, &mut meta_mut);
if is_orphan(&RefCell::borrow(&*prev_slot_meta)) {
write_batch.put::<cf::Orphans>(prev_slot, &true)?;
}
}
}
if was_orphan_slot {
write_batch.delete::<cf::Orphans>(slot)?;
}
}
let should_propagate_is_connected =
is_newly_completed_slot(&RefCell::borrow(&*meta), meta_backup)
&& RefCell::borrow(&*meta).is_connected;
if should_propagate_is_connected {
let slot_function = |slot: &mut SlotMeta| {
slot.is_connected = true;
slot.is_full()
};
traverse_children_mut(
db,
slot,
&meta,
working_set,
new_chained_slots,
slot_function,
)?;
}
Ok(())
}
fn traverse_children_mut<F>(
db: &Database,
slot: Slot,
slot_meta: &Rc<RefCell<SlotMeta>>,
working_set: &HashMap<u64, SlotMetaWorkingSetEntry>,
new_chained_slots: &mut HashMap<u64, Rc<RefCell<SlotMeta>>>,
slot_function: F,
) -> Result<()>
where
F: Fn(&mut SlotMeta) -> bool,
{
let mut next_slots: Vec<(u64, Rc<RefCell<SlotMeta>>)> = vec![(slot, slot_meta.clone())];
while !next_slots.is_empty() {
let (_, current_slot) = next_slots.pop().unwrap();
if slot_function(&mut current_slot.borrow_mut()) {
let current_slot = &RefCell::borrow(&*current_slot);
for next_slot_index in current_slot.next_slots.iter() {
let next_slot = find_slot_meta_else_create(
db,
working_set,
new_chained_slots,
*next_slot_index,
)?;
next_slots.push((*next_slot_index, next_slot));
}
}
}
Ok(())
}
fn is_orphan(meta: &SlotMeta) -> bool {
!meta.is_parent_set()
}
fn chain_new_slot_to_prev_slot(
prev_slot_meta: &mut SlotMeta,
current_slot: Slot,
current_slot_meta: &mut SlotMeta,
) {
prev_slot_meta.next_slots.push(current_slot);
current_slot_meta.is_connected = prev_slot_meta.is_connected && prev_slot_meta.is_full();
}
fn is_newly_completed_slot(slot_meta: &SlotMeta, backup_slot_meta: &Option<SlotMeta>) -> bool {
slot_meta.is_full()
&& (backup_slot_meta.is_none()
|| slot_meta.consumed != backup_slot_meta.as_ref().unwrap().consumed)
}
fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option<SlotMeta>) -> bool {
slot_meta.is_connected &&
((slot_meta_backup.is_none() && slot_meta.consumed != 0) ||
(slot_meta_backup.is_some() && slot_meta_backup.as_ref().unwrap().consumed != slot_meta.consumed))
}
pub fn create_new_ledger(
ledger_path: &Path,
evm_state_json: Option<&Path>,
genesis_config: &GenesisConfig,
max_genesis_archive_unpacked_size: u64,
access_type: AccessType,
) -> Result<Hash> {
Blockstore::destroy(ledger_path)?;
genesis_config.generate_evm_state(&ledger_path, evm_state_json)?;
genesis_config.write(&ledger_path)?;
let blockstore = Blockstore::open_with_access_type(ledger_path, access_type, None, false)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
let hashes_per_tick = genesis_config.poh_config.hashes_per_tick.unwrap_or(0);
let entries = create_ticks(ticks_per_slot, hashes_per_tick, genesis_config.hash());
let last_hash = entries.last().unwrap().hash;
let version = solana_sdk::shred_version::version_from_hash(&last_hash);
let shredder = Shredder::new(0, 0, 0.0, Arc::new(Keypair::new()), 0, version)
.expect("Failed to create entry shredder");
let shreds = shredder.entries_to_shreds(&entries, true, 0).0;
assert!(shreds.last().unwrap().last_in_slot());
blockstore.insert_shreds(shreds, None, false)?;
blockstore.set_roots(&[0])?;
drop(blockstore);
let archive_path = ledger_path.join("genesis.tar.bz2");
let args = vec![
"jcfhS",
archive_path.to_str().unwrap(),
"-C",
ledger_path.to_str().unwrap(),
"genesis.bin",
"rocksdb",
"evm-state-genesis",
];
let output = std::process::Command::new("tar")
.args(&args)
.output()
.unwrap();
if !output.status.success() {
use std::str::from_utf8;
error!("tar stdout: {}", from_utf8(&output.stdout).unwrap_or("?"));
error!("tar stderr: {}", from_utf8(&output.stderr).unwrap_or("?"));
return Err(BlockstoreError::Io(IoError::new(
ErrorKind::Other,
format!(
"Error trying to generate snapshot archive: {}",
output.status
),
)));
}
{
let temp_dir = tempfile::tempdir_in(ledger_path).unwrap();
let unpack_check = unpack_genesis_archive(
&archive_path,
&temp_dir.into_path(),
max_genesis_archive_unpacked_size,
);
if let Err(unpack_err) = unpack_check {
let mut error_messages = String::new();
fs::rename(
&ledger_path.join("genesis.tar.bz2"),
ledger_path.join("genesis.tar.bz2.failed"),
)
.unwrap_or_else(|e| {
error_messages += &format!("/failed to stash problematic genesis.tar.bz2: {}", e)
});
fs::rename(
&ledger_path.join("genesis.bin"),
ledger_path.join("genesis.bin.failed"),
)
.unwrap_or_else(|e| {
error_messages += &format!("/failed to stash problematic genesis.bin: {}", e)
});
fs::rename(
&ledger_path.join("rocksdb"),
ledger_path.join("rocksdb.failed"),
)
.unwrap_or_else(|e| {
error_messages += &format!("/failed to stash problematic rocksdb: {}", e)
});
return Err(BlockstoreError::Io(IoError::new(
ErrorKind::Other,
format!(
"Error checking to unpack genesis archive: {}{}",
unpack_err, error_messages
),
)));
}
}
Ok(last_hash)
}
#[macro_export]
macro_rules! tmp_ledger_name {
() => {
&format!("{}-{}", file!(), line!())
};
}
#[macro_export]
macro_rules! get_tmp_ledger_path {
() => {
$crate::blockstore::get_ledger_path_from_name($crate::tmp_ledger_name!())
};
}
pub fn get_ledger_path_from_name(name: &str) -> PathBuf {
use std::env;
let out_dir = env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string());
let keypair = Keypair::new();
let path = [
out_dir,
"ledger".to_string(),
format!("{}-{}", name, keypair.pubkey()),
]
.iter()
.collect();
let _ignored = fs::remove_dir_all(&path);
path
}
#[macro_export]
macro_rules! create_new_tmp_ledger {
($genesis_config:expr) => {
$crate::blockstore::create_new_ledger_from_name(
$crate::tmp_ledger_name!(),
$genesis_config,
$crate::blockstore_db::AccessType::PrimaryOnly,
)
};
}
pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> bool {
if !is_valid_write_to_slot_0(slot, parent_slot, last_root) {
if parent_slot >= slot {
return false;
}
if parent_slot < last_root {
return false;
}
}
true
}
pub fn create_new_ledger_from_name(
name: &str,
genesis_config: &GenesisConfig,
access_type: AccessType,
) -> (PathBuf, Hash) {
let ledger_path = get_ledger_path_from_name(name);
let temp_dir = TempDir::new().unwrap();
let evm_state_json = temp_dir.path().join("evm-state.json");
let _root_hash =
solana_sdk::genesis_config::evm_genesis::generate_evm_state_json(&evm_state_json).unwrap();
let blockhash = create_new_ledger(
&ledger_path,
Some(evm_state_json.as_ref()),
genesis_config,
MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
access_type,
)
.unwrap();
(ledger_path, blockhash)
}
pub fn entries_to_test_shreds(
entries: Vec<Entry>,
slot: Slot,
parent_slot: Slot,
is_full_slot: bool,
version: u16,
) -> Vec<Shred> {
let shredder = Shredder::new(slot, parent_slot, 0.0, Arc::new(Keypair::new()), 0, version)
.expect("Failed to create entry shredder");
shredder.entries_to_shreds(&entries, is_full_slot, 0).0
}
pub fn make_slot_entries(
slot: Slot,
parent_slot: Slot,
num_entries: u64,
) -> (Vec<Shred>, Vec<Entry>) {
let entries = create_ticks(num_entries, 0, Hash::default());
let shreds = entries_to_test_shreds(entries.clone(), slot, parent_slot, true, 0);
(shreds, entries)
}
pub fn make_many_slot_entries(
start_slot: Slot,
num_slots: u64,
entries_per_slot: u64,
) -> (Vec<Shred>, Vec<Entry>) {
let mut shreds = vec![];
let mut entries = vec![];
for slot in start_slot..start_slot + num_slots {
let parent_slot = if slot == 0 { 0 } else { slot - 1 };
let (slot_shreds, slot_entries) = make_slot_entries(slot, parent_slot, entries_per_slot);
shreds.extend(slot_shreds);
entries.extend(slot_entries);
}
(shreds, entries)
}
pub fn make_chaining_slot_entries(
chain: &[u64],
entries_per_slot: u64,
) -> Vec<(Vec<Shred>, Vec<Entry>)> {
let mut slots_shreds_and_entries = vec![];
for (i, slot) in chain.iter().enumerate() {
let parent_slot = {
if *slot == 0 || i == 0 {
0
} else {
chain[i - 1]
}
};
let result = make_slot_entries(*slot, parent_slot, entries_per_slot);
slots_shreds_and_entries.push(result);
}
slots_shreds_and_entries
}
#[cfg(not(unix))]
fn adjust_ulimit_nofile(_enforce_ulimit_nofile: bool) -> Result<()> {
Ok(())
}
#[cfg(unix)]
fn adjust_ulimit_nofile(enforce_ulimit_nofile: bool) -> Result<()> {
let desired_nofile = 500000;
fn get_nofile() -> libc::rlimit {
let mut nofile = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
if unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &mut nofile) } != 0 {
warn!("getrlimit(RLIMIT_NOFILE) failed");
}
nofile
}
let mut nofile = get_nofile();
if nofile.rlim_cur < desired_nofile {
nofile.rlim_cur = desired_nofile;
if unsafe { libc::setrlimit(libc::RLIMIT_NOFILE, &nofile) } != 0 {
error!(
"Unable to increase the maximum open file descriptor limit to {}",
desired_nofile
);
if cfg!(target_os = "macos") {
error!(
"On mac OS you may need to run |sudo launchctl limit maxfiles {} {}| first",
desired_nofile, desired_nofile,
);
}
if enforce_ulimit_nofile {
return Err(BlockstoreError::UnableToSetOpenFileDescriptorLimit);
}
}
nofile = get_nofile();
}
info!("Maximum open file descriptors: {}", nofile.rlim_cur);
Ok(())
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::{
entry::{next_entry, next_entry_mut},
genesis_utils::{create_genesis_config, GenesisConfigInfo},
leader_schedule::{FixedSchedule, LeaderSchedule},
shred::{max_ticks_per_n_shreds, DataShredHeader},
};
use assert_matches::assert_matches;
use bincode::serialize;
use itertools::Itertools;
use rand::{seq::SliceRandom, thread_rng};
use solana_account_decoder::parse_token::UiTokenAmount;
use solana_runtime::bank::{Bank, RewardType};
use solana_sdk::{
hash::{self, hash, Hash},
instruction::CompiledInstruction,
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::Signature,
transaction::TransactionError,
};
use solana_storage_proto::convert::generated;
use solana_transaction_status::{InnerInstructions, Reward, Rewards, TransactionTokenBalance};
use std::time::Duration;
pub(crate) fn make_slot_entries_with_transactions(num_entries: u64) -> Vec<Entry> {
let mut entries: Vec<Entry> = Vec::new();
for x in 0..num_entries {
let transaction = Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[solana_sdk::pubkey::new_rand()],
Hash::default(),
vec![solana_sdk::pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
);
entries.push(next_entry_mut(&mut Hash::default(), 0, vec![transaction]));
let mut tick = create_ticks(1, 0, hash(&serialize(&x).unwrap()));
entries.append(&mut tick);
}
entries
}
#[test]
fn test_create_new_ledger() {
let mint_total = 1_000_000_000_000;
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(mint_total);
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config);
let ledger = Blockstore::open(&ledger_path).unwrap();
let ticks = create_ticks(genesis_config.ticks_per_slot, 0, genesis_config.hash());
let entries = ledger.get_slot_entries(0, 0).unwrap();
assert_eq!(ticks, entries);
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_get_bytes() {
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
assert!(num_entries > 1);
let (mut shreds, _) = make_slot_entries(0, 0, num_entries);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
let last_shred = shreds.pop().unwrap();
assert!(last_shred.index() > 0);
ledger
.insert_shreds(vec![last_shred.clone()], None, false)
.unwrap();
let serialized_shred = ledger
.data_shred_cf
.get_bytes((0, last_shred.index() as u64))
.unwrap()
.unwrap();
let deserialized_shred = Shred::new_from_serialized_shred(serialized_shred).unwrap();
assert_eq!(last_shred, deserialized_shred);
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_write_entries() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path!();
{
let ticks_per_slot = 10;
let num_slots = 10;
let ledger = Blockstore::open(&ledger_path).unwrap();
let mut ticks = vec![];
let mut shreds_per_slot = vec![];
for i in 0..num_slots {
let mut new_ticks = create_ticks(ticks_per_slot, 0, Hash::default());
let num_shreds = ledger
.write_entries(
i,
0,
0,
ticks_per_slot,
Some(i.saturating_sub(1)),
true,
&Arc::new(Keypair::new()),
new_ticks.clone(),
0,
)
.unwrap() as u64;
shreds_per_slot.push(num_shreds);
ticks.append(&mut new_ticks);
}
for i in 0..num_slots {
let meta = ledger.meta(i).unwrap().unwrap();
let num_shreds = shreds_per_slot[i as usize];
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.last_index, num_shreds - 1);
if i == num_slots - 1 {
assert!(meta.next_slots.is_empty());
} else {
assert_eq!(meta.next_slots, vec![i + 1]);
}
if i == 0 {
assert_eq!(meta.parent_slot, 0);
} else {
assert_eq!(meta.parent_slot, i - 1);
}
assert_eq!(
&ticks[(i * ticks_per_slot) as usize..((i + 1) * ticks_per_slot) as usize],
&ledger.get_slot_entries(i, 0).unwrap()[..]
);
}
}
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_put_get_simple() {
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
let meta = SlotMeta::new(0, 1);
ledger.meta_cf.put(0, &meta).unwrap();
let result = ledger
.meta_cf
.get(0)
.unwrap()
.expect("Expected meta object to exist");
assert_eq!(result, meta);
let erasure = vec![1u8; 16];
let erasure_key = (0, 0);
ledger
.code_shred_cf
.put_bytes(erasure_key, &erasure)
.unwrap();
let result = ledger
.code_shred_cf
.get_bytes(erasure_key)
.unwrap()
.expect("Expected erasure object to exist");
assert_eq!(result, erasure);
let data = vec![2u8; 16];
let data_key = (0, 0);
ledger.data_shred_cf.put_bytes(data_key, &data).unwrap();
let result = ledger
.data_shred_cf
.get_bytes(data_key)
.unwrap()
.expect("Expected data object to exist");
assert_eq!(result, data);
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_read_shred_bytes() {
let slot = 0;
let (shreds, _) = make_slot_entries(slot, 0, 100);
let num_shreds = shreds.len() as u64;
let shred_bufs: Vec<_> = shreds.iter().map(|shred| shred.payload.clone()).collect();
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
let mut buf = [0; 4096];
let (_, bytes) = ledger.get_data_shreds(slot, 0, 1, &mut buf).unwrap();
assert_eq!(buf[..bytes], shred_bufs[0][..bytes]);
let (last_index, bytes2) = ledger.get_data_shreds(slot, 0, 2, &mut buf).unwrap();
assert_eq!(last_index, 1);
assert!(bytes2 > bytes);
{
let shred_data_1 = &buf[..bytes];
assert_eq!(shred_data_1, &shred_bufs[0][..bytes]);
let shred_data_2 = &buf[bytes..bytes2];
assert_eq!(shred_data_2, &shred_bufs[1][..bytes2 - bytes]);
}
let mut buf = vec![0; bytes + 1];
let (last_index, bytes3) = ledger.get_data_shreds(slot, 0, 2, &mut buf).unwrap();
assert_eq!(last_index, 0);
assert_eq!(bytes3, bytes);
let mut buf = vec![0; bytes2 - 1];
let (last_index, bytes4) = ledger.get_data_shreds(slot, 0, 2, &mut buf).unwrap();
assert_eq!(last_index, 0);
assert_eq!(bytes4, bytes);
let mut buf = vec![0; bytes * 2];
let (last_index, bytes6) = ledger
.get_data_shreds(slot, num_shreds - 1, num_shreds, &mut buf)
.unwrap();
assert_eq!(last_index, num_shreds - 1);
{
let shred_data = &buf[..bytes6];
assert_eq!(shred_data, &shred_bufs[(num_shreds - 1) as usize][..bytes6]);
}
let (last_index, bytes6) = ledger
.get_data_shreds(slot, num_shreds, num_shreds + 2, &mut buf)
.unwrap();
assert_eq!(last_index, 0);
assert_eq!(bytes6, 0);
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_shred_cleanup_check() {
let slot = 1;
let (shreds, _) = make_slot_entries(slot, 0, 100);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
let mut buf = [0; 4096];
assert!(ledger.get_data_shreds(slot, 0, 1, &mut buf).is_ok());
let max_purge_slot = 1;
ledger
.run_purge(0, max_purge_slot, PurgeType::PrimaryIndex)
.unwrap();
*ledger.lowest_cleanup_slot.write().unwrap() = max_purge_slot;
let mut buf = [0; 4096];
assert!(ledger.get_data_shreds(slot, 0, 1, &mut buf).is_err());
}
#[test]
fn test_insert_data_shreds_basic() {
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
assert!(num_entries > 1);
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
let num_shreds = shreds.len() as u64;
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
assert!(shreds.len() > 1);
let last_shred = shreds.pop().unwrap();
ledger.insert_shreds(vec![last_shred], None, false).unwrap();
assert!(ledger.get_slot_entries(0, 0).unwrap().is_empty());
let meta = ledger
.meta(0)
.unwrap()
.expect("Expected new metadata object to be created");
assert!(meta.consumed == 0 && meta.received == num_shreds);
ledger.insert_shreds(shreds, None, false).unwrap();
let result = ledger.get_slot_entries(0, 0).unwrap();
assert_eq!(result, entries);
let meta = ledger
.meta(0)
.unwrap()
.expect("Expected new metadata object to exist");
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_shreds - 1);
assert!(meta.next_slots.is_empty());
assert!(meta.is_connected);
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_data_shreds_reverse() {
let num_shreds = 10;
let num_entries = max_ticks_per_n_shreds(num_shreds, None);
let (mut shreds, entries) = make_slot_entries(0, 0, num_entries);
let num_shreds = shreds.len() as u64;
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
for i in (0..num_shreds).rev() {
let shred = shreds.pop().unwrap();
ledger.insert_shreds(vec![shred], None, false).unwrap();
let result = ledger.get_slot_entries(0, 0).unwrap();
let meta = ledger
.meta(0)
.unwrap()
.expect("Expected metadata object to exist");
assert_eq!(meta.last_index, num_shreds - 1);
if i != 0 {
assert_eq!(result.len(), 0);
assert!(meta.consumed == 0 && meta.received == num_shreds as u64);
} else {
assert_eq!(meta.parent_slot, 0);
assert_eq!(result, entries);
assert!(meta.consumed == num_shreds as u64 && meta.received == num_shreds as u64);
}
}
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_insert_slots() {
test_insert_data_shreds_slots("test_insert_data_shreds_slots_single", false);
test_insert_data_shreds_slots("test_insert_data_shreds_slots_bulk", true);
}
#[test]
pub fn test_get_slot_entries1() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let entries = create_ticks(8, 0, Hash::default());
let shreds = entries_to_test_shreds(entries[0..4].to_vec(), 1, 0, false, 0);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
let mut shreds1 = entries_to_test_shreds(entries[4..].to_vec(), 1, 0, false, 0);
for (i, b) in shreds1.iter_mut().enumerate() {
b.set_index(8 + i as u32);
}
blockstore
.insert_shreds(shreds1, None, false)
.expect("Expected successful write of shreds");
assert_eq!(
blockstore.get_slot_entries(1, 0).unwrap()[2..4],
entries[2..4],
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
#[ignore]
pub fn test_get_slot_entries2() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_slots = 5_u64;
let mut index = 0;
for slot in 0..num_slots {
let entries = create_ticks(slot + 1, 0, Hash::default());
let last_entry = entries.last().unwrap().clone();
let mut shreds =
entries_to_test_shreds(entries, slot, slot.saturating_sub(1), false, 0);
for b in shreds.iter_mut() {
b.set_index(index);
b.set_slot(slot as u64);
index += 1;
}
blockstore
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert_eq!(
blockstore
.get_slot_entries(slot, u64::from(index - 1))
.unwrap(),
vec![last_entry],
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_slot_entries3() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_slots = 5_u64;
let shreds_per_slot = 5_u64;
let entry_serialized_size =
bincode::serialized_size(&create_ticks(1, 0, Hash::default())).unwrap();
let entries_per_slot =
(shreds_per_slot * PACKET_DATA_SIZE as u64) / entry_serialized_size;
for slot in 0..num_slots {
let entries = create_ticks(entries_per_slot, 0, Hash::default());
let shreds =
entries_to_test_shreds(entries.clone(), slot, slot.saturating_sub(1), false, 0);
assert!(shreds.len() as u64 >= shreds_per_slot);
blockstore
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert_eq!(blockstore.get_slot_entries(slot, 0).unwrap(), entries);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_shreds_consecutive() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let min_entries = max_ticks_per_n_shreds(1, None) + 1;
for i in 0..4 {
let slot = i;
let parent_slot = if i == 0 { 0 } else { i - 1 };
let num_entries = min_entries * (i + 1);
let (shreds, original_entries) = make_slot_entries(slot, parent_slot, num_entries);
let num_shreds = shreds.len() as u64;
assert!(num_shreds > 1);
let mut even_shreds = vec![];
let mut odd_shreds = vec![];
for (i, shred) in shreds.into_iter().enumerate() {
if i % 2 == 0 {
even_shreds.push(shred);
} else {
odd_shreds.push(shred);
}
}
blockstore.insert_shreds(odd_shreds, None, false).unwrap();
assert_eq!(blockstore.get_slot_entries(slot, 0).unwrap(), vec![]);
let meta = blockstore.meta(slot).unwrap().unwrap();
if num_shreds % 2 == 0 {
assert_eq!(meta.received, num_shreds);
} else {
trace!("got here");
assert_eq!(meta.received, num_shreds - 1);
}
assert_eq!(meta.consumed, 0);
if num_shreds % 2 == 0 {
assert_eq!(meta.last_index, num_shreds - 1);
} else {
assert_eq!(meta.last_index, std::u64::MAX);
}
blockstore.insert_shreds(even_shreds, None, false).unwrap();
assert_eq!(
blockstore.get_slot_entries(slot, 0).unwrap(),
original_entries,
);
let meta = blockstore.meta(slot).unwrap().unwrap();
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.parent_slot, parent_slot);
assert_eq!(meta.last_index, num_shreds - 1);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_data_shreds_duplicate() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_unique_entries = 10;
let (mut original_shreds, original_entries) =
make_slot_entries(0, 0, num_unique_entries);
original_shreds.remove(0);
blockstore
.insert_shreds(original_shreds, None, false)
.unwrap();
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), vec![]);
let duplicate_shreds = entries_to_test_shreds(original_entries.clone(), 0, 0, true, 0);
let num_shreds = duplicate_shreds.len() as u64;
blockstore
.insert_shreds(duplicate_shreds, None, false)
.unwrap();
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), original_entries);
let meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(meta.consumed, num_shreds);
assert_eq!(meta.received, num_shreds);
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.last_index, num_shreds - 1);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_data_set_completed_on_insert() {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals { blockstore, .. } =
Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let slot = 0;
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
let entries = create_ticks(num_entries, slot, Hash::default());
let shreds = entries_to_test_shreds(entries, slot, 0, true, 0);
let num_shreds = shreds.len();
assert!(num_shreds > 1);
assert!(blockstore
.insert_shreds(shreds[1..].to_vec(), None, false)
.unwrap()
.0
.is_empty());
assert_eq!(
blockstore
.insert_shreds(vec![shreds[0].clone()], None, false)
.unwrap()
.0,
vec![CompletedDataSetInfo {
slot,
start_index: 0,
end_index: num_shreds as u32 - 1
}]
);
assert!(blockstore
.insert_shreds(shreds, None, false)
.unwrap()
.0
.is_empty());
}
#[test]
pub fn test_new_shreds_signal() {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals {
blockstore: ledger,
ledger_signal_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 50;
let (mut shreds, _) = make_slot_entries(0, 0, entries_per_slot);
let shreds_per_slot = shreds.len() as u64;
ledger
.insert_shreds(vec![shreds.remove(1)], None, false)
.unwrap();
let timer = Duration::new(1, 0);
assert!(recvr.recv_timeout(timer).is_err());
ledger
.insert_shreds(vec![shreds.remove(0)], None, false)
.unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(shreds, None, false).unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
let num_slots = shreds_per_slot;
let mut shreds = vec![];
let mut missing_shreds = vec![];
for slot in 1..num_slots + 1 {
let (mut slot_shreds, _) = make_slot_entries(slot, slot - 1, entries_per_slot);
let missing_shred = slot_shreds.remove(slot as usize - 1);
shreds.extend(slot_shreds);
missing_shreds.push(missing_shred);
}
ledger.insert_shreds(shreds, None, false).unwrap();
assert!(recvr.recv_timeout(timer).is_err());
let shreds: Vec<_> = (1..num_slots + 1)
.flat_map(|slot| {
let (mut shred, _) = make_slot_entries(slot, slot - 1, 1);
shred[0].set_index(2 * num_slots as u32);
shred
})
.collect();
ledger.insert_shreds(shreds, None, false).unwrap();
assert!(recvr.recv_timeout(timer).is_err());
let missing_shreds2 = missing_shreds
.drain((num_slots / 2) as usize..)
.collect_vec();
ledger.insert_shreds(missing_shreds, None, false).unwrap();
assert!(recvr.recv_timeout(timer).is_ok());
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(missing_shreds2, None, false).unwrap();
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_completed_shreds_signal() {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
let (mut shreds, _) = make_slot_entries(0, 0, entries_per_slot);
let shred0 = shreds.remove(0);
ledger.insert_shreds(shreds, None, false).unwrap();
assert!(recvr.try_recv().is_err());
ledger.insert_shreds(vec![shred0], None, false).unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![0]);
}
#[test]
pub fn test_completed_shreds_signal_orphans() {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
let slots = vec![2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot);
let (mut orphan_child, _) = all_shreds.remove(2);
let (mut orphan_shreds, _) = all_shreds.remove(1);
let orphan_child0 = orphan_child.remove(0);
ledger.insert_shreds(orphan_child, None, false).unwrap();
assert!(recvr.try_recv().is_err());
ledger
.insert_shreds(vec![orphan_child0], None, false)
.unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![slots[2]]);
let orphan_shred0 = orphan_shreds.remove(0);
ledger.insert_shreds(orphan_shreds, None, false).unwrap();
assert!(recvr.try_recv().is_err());
ledger
.insert_shreds(vec![orphan_shred0], None, false)
.unwrap();
assert_eq!(recvr.try_recv().unwrap(), vec![slots[1]]);
}
#[test]
pub fn test_completed_shreds_signal_many() {
let ledger_path = get_tmp_ledger_path!();
let BlockstoreSignals {
blockstore: ledger,
completed_slots_receiver: recvr,
..
} = Blockstore::open_with_signal(&ledger_path, None, true).unwrap();
let ledger = Arc::new(ledger);
let entries_per_slot = 10;
let mut slots = vec![2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot);
let disconnected_slot = 4;
let (shreds0, _) = all_shreds.remove(0);
let (shreds1, _) = all_shreds.remove(0);
let (shreds2, _) = all_shreds.remove(0);
let (shreds3, _) = make_slot_entries(disconnected_slot, 1, entries_per_slot);
let mut all_shreds: Vec<_> = vec![shreds0, shreds1, shreds2, shreds3]
.into_iter()
.flatten()
.collect();
all_shreds.shuffle(&mut thread_rng());
ledger.insert_shreds(all_shreds, None, false).unwrap();
let mut result = recvr.try_recv().unwrap();
result.sort_unstable();
slots.push(disconnected_slot);
slots.sort_unstable();
assert_eq!(result, slots);
}
#[test]
pub fn test_handle_chaining_basic() {
let blockstore_path = get_tmp_ledger_path!();
{
let entries_per_slot = 5;
let num_slots = 3;
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let shreds_per_slot = shreds.len() / num_slots as usize;
let shreds1 = shreds
.drain(shreds_per_slot..2 * shreds_per_slot)
.collect_vec();
blockstore.insert_shreds(shreds1, None, false).unwrap();
let s1 = blockstore.meta(1).unwrap().unwrap();
assert!(s1.next_slots.is_empty());
assert!(!s1.is_connected);
assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.last_index, shreds_per_slot as u64 - 1);
let shreds2 = shreds
.drain(shreds_per_slot..2 * shreds_per_slot)
.collect_vec();
blockstore.insert_shreds(shreds2, None, false).unwrap();
let s2 = blockstore.meta(2).unwrap().unwrap();
assert!(s2.next_slots.is_empty());
assert!(!s2.is_connected);
assert_eq!(s2.parent_slot, 1);
assert_eq!(s2.last_index, shreds_per_slot as u64 - 1);
let s1 = blockstore.meta(1).unwrap().unwrap();
assert_eq!(s1.next_slots, vec![2]);
assert!(!s1.is_connected);
assert_eq!(s1.parent_slot, 0);
assert_eq!(s1.last_index, shreds_per_slot as u64 - 1);
blockstore.insert_shreds(shreds, None, false).unwrap();
for i in 0..3 {
let s = blockstore.meta(i).unwrap().unwrap();
if i != 2 {
assert_eq!(s.next_slots, vec![i + 1]);
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
assert!(s.is_connected);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_handle_chaining_missing_slots() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_slots = 30;
let entries_per_slot = 5;
let mut slots = vec![];
let mut missing_slots = vec![];
let mut shreds_per_slot = 2;
for slot in 0..num_slots {
let parent_slot = {
if slot == 0 {
0
} else {
slot - 1
}
};
let (slot_shreds, _) = make_slot_entries(slot, parent_slot, entries_per_slot);
shreds_per_slot = slot_shreds.len();
if slot % 2 == 1 {
slots.extend(slot_shreds);
} else {
missing_slots.extend(slot_shreds);
}
}
blockstore.insert_shreds(slots, None, false).unwrap();
for i in 0..num_slots {
let s = blockstore.meta(i as u64).unwrap().unwrap();
if i % 2 == 0 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
assert_eq!(s.parent_slot, std::u64::MAX);
} else {
assert!(s.next_slots.is_empty());
assert_eq!(s.parent_slot, i - 1);
}
if i == 0 {
assert!(s.is_connected);
} else {
assert!(!s.is_connected);
}
}
blockstore
.insert_shreds(missing_slots, None, false)
.unwrap();
for i in 0..num_slots {
let s = blockstore.meta(i as u64).unwrap().unwrap();
if i != num_slots - 1 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
} else {
assert!(s.next_slots.is_empty());
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
assert!(s.is_connected);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
#[allow(clippy::cognitive_complexity)]
pub fn test_forward_chaining_is_connected() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_slots = 15;
let entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
assert!(entries_per_slot > 1);
let (mut shreds, _) = make_many_slot_entries(0, num_slots, entries_per_slot);
let shreds_per_slot = shreds.len() / num_slots as usize;
assert!(shreds_per_slot > 1);
let mut missing_shreds = vec![];
for slot in 0..num_slots {
let mut shreds_for_slot = shreds.drain(..shreds_per_slot).collect_vec();
if slot % 3 == 0 {
let shred0 = shreds_for_slot.remove(0);
missing_shreds.push(shred0);
blockstore
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
} else {
blockstore
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
}
}
for i in 0..num_slots {
let s = blockstore.meta(i as u64).unwrap().unwrap();
if i as u64 != num_slots - 1 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
} else {
assert!(s.next_slots.is_empty());
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
if i != 0 {
assert!(!s.is_connected);
} else {
assert!(s.is_connected);
}
}
for slot_index in 0..num_slots {
if slot_index % 3 == 0 {
let shred = missing_shreds.remove(0);
blockstore.insert_shreds(vec![shred], None, false).unwrap();
for i in 0..num_slots {
let s = blockstore.meta(i as u64).unwrap().unwrap();
if i != num_slots - 1 {
assert_eq!(s.next_slots, vec![i as u64 + 1]);
} else {
assert!(s.next_slots.is_empty());
}
if i <= slot_index as u64 + 3 {
assert!(s.is_connected);
} else {
assert!(!s.is_connected);
}
if i == 0 {
assert_eq!(s.parent_slot, 0);
} else {
assert_eq!(s.parent_slot, i - 1);
}
assert_eq!(s.last_index, shreds_per_slot as u64 - 1);
}
}
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_get_slots_since() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
assert!(blockstore.get_slots_since(&[0]).unwrap().is_empty());
let mut meta0 = SlotMeta::new(0, 0);
blockstore.meta_cf.put(0, &meta0).unwrap();
let expected: HashMap<u64, Vec<u64>> = vec![(0, vec![])].into_iter().collect();
assert_eq!(blockstore.get_slots_since(&[0]).unwrap(), expected);
meta0.next_slots = vec![1, 2];
blockstore.meta_cf.put(0, &meta0).unwrap();
let expected: HashMap<u64, Vec<u64>> = vec![(0, vec![1, 2])].into_iter().collect();
assert_eq!(blockstore.get_slots_since(&[0]).unwrap(), expected);
assert_eq!(blockstore.get_slots_since(&[0, 1]).unwrap(), expected);
let mut meta3 = SlotMeta::new(3, 1);
meta3.next_slots = vec![10, 5];
blockstore.meta_cf.put(3, &meta3).unwrap();
let expected: HashMap<u64, Vec<u64>> = vec![(0, vec![1, 2]), (3, vec![10, 5])]
.into_iter()
.collect();
assert_eq!(blockstore.get_slots_since(&[0, 1, 3]).unwrap(), expected);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_orphans() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let entries_per_slot = 1;
let (mut shreds, _) = make_many_slot_entries(0, 3, entries_per_slot);
let shreds_per_slot = shreds.len() / 3;
let shreds_for_slot = shreds.drain((shreds_per_slot * 2)..).collect_vec();
blockstore
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
let meta = blockstore
.meta(1)
.expect("Expect database get to succeed")
.unwrap();
assert!(is_orphan(&meta));
assert_eq!(
blockstore.orphans_iterator(0).unwrap().collect::<Vec<_>>(),
vec![1]
);
let shreds_for_slot = shreds.drain(shreds_per_slot..).collect_vec();
blockstore
.insert_shreds(shreds_for_slot, None, false)
.unwrap();
let meta = blockstore
.meta(1)
.expect("Expect database get to succeed")
.unwrap();
assert!(!is_orphan(&meta));
let meta = blockstore
.meta(0)
.expect("Expect database get to succeed")
.unwrap();
assert!(is_orphan(&meta));
assert_eq!(
blockstore.orphans_iterator(0).unwrap().collect::<Vec<_>>(),
vec![0]
);
let (shred4, _) = make_slot_entries(4, 0, 1);
let (shred5, _) = make_slot_entries(5, 1, 1);
blockstore.insert_shreds(shred4, None, false).unwrap();
blockstore.insert_shreds(shred5, None, false).unwrap();
assert_eq!(
blockstore.orphans_iterator(0).unwrap().collect::<Vec<_>>(),
vec![0]
);
blockstore.insert_shreds(shreds, None, false).unwrap();
for i in 0..3 {
let meta = blockstore
.meta(i)
.expect("Expect database get to succeed")
.unwrap();
assert!(!is_orphan(&meta));
}
assert!(blockstore.orphans_cf.is_empty().unwrap())
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
fn test_insert_data_shreds_slots(name: &str, should_bulk_write: bool) {
let blockstore_path = get_ledger_path_from_name(name);
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries = 20_u64;
let mut entries = vec![];
let mut shreds = vec![];
let mut num_shreds_per_slot = 0;
for slot in 0..num_entries {
let parent_slot = {
if slot == 0 {
0
} else {
slot - 1
}
};
let (mut shred, entry) = make_slot_entries(slot, parent_slot, 1);
num_shreds_per_slot = shred.len() as u64;
shred
.iter_mut()
.enumerate()
.for_each(|(_, shred)| shred.set_index(0));
shreds.extend(shred);
entries.extend(entry);
}
let num_shreds = shreds.len();
if should_bulk_write {
blockstore.insert_shreds(shreds, None, false).unwrap();
} else {
for _ in 0..num_shreds {
let shred = shreds.remove(0);
blockstore.insert_shreds(vec![shred], None, false).unwrap();
}
}
for i in 0..num_entries - 1 {
assert_eq!(
blockstore.get_slot_entries(i, 0).unwrap()[0],
entries[i as usize]
);
let meta = blockstore.meta(i).unwrap().unwrap();
assert_eq!(meta.received, 1);
assert_eq!(meta.last_index, 0);
if i != 0 {
assert_eq!(meta.parent_slot, i - 1);
assert_eq!(meta.consumed, 1);
} else {
assert_eq!(meta.parent_slot, 0);
assert_eq!(meta.consumed, num_shreds_per_slot);
}
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes() {
let slot = 0;
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let gap: u64 = 10;
assert!(gap > 3);
let num_entries = max_ticks_per_n_shreds(1, None) + 1;
let entries = create_ticks(num_entries, 0, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0);
let num_shreds = shreds.len();
assert!(num_shreds > 1);
for (i, s) in shreds.iter_mut().enumerate() {
s.set_index(i as u32 * gap as u32);
s.set_slot(slot);
}
blockstore.insert_shreds(shreds, None, false).unwrap();
let expected: Vec<u64> = (1..gap).collect();
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap, gap as usize),
expected
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 1, gap, (gap - 1) as usize),
expected,
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap - 1, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, gap as usize),
vec![gap - 2, gap - 1],
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, gap - 2, gap, 1),
vec![gap - 2],
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap, 1),
vec![1],
);
let mut expected: Vec<u64> = (1..gap).collect();
expected.push(gap + 1);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap + 2) as usize),
expected,
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, gap + 2, (gap - 1) as usize),
&expected[..expected.len() - 1],
);
for i in 0..num_shreds as u64 {
for j in 0..i {
let expected: Vec<u64> = (j..i)
.flat_map(|k| {
let begin = k * gap + 1;
let end = (k + 1) * gap;
begin..end
})
.collect();
assert_eq!(
blockstore.find_missing_data_indexes(
slot,
0,
j * gap,
i * gap,
((i - j) * gap) as usize
),
expected,
);
}
}
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes_timeout() {
let slot = 0;
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let gap: u64 = 10;
let shreds: Vec<_> = (0..64)
.map(|i| {
Shred::new_from_data(
slot,
(i * gap) as u32,
0,
None,
false,
false,
i as u8,
0,
(i * gap) as u32,
)
})
.collect();
blockstore.insert_shreds(shreds, None, false).unwrap();
let empty: Vec<u64> = vec![];
assert_eq!(
blockstore.find_missing_data_indexes(slot, timestamp(), 0, 50, 1),
empty
);
let expected: Vec<_> = (1..=9).collect();
assert_eq!(
blockstore.find_missing_data_indexes(slot, timestamp() - 400, 0, 50, 9),
expected
);
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_find_missing_data_indexes_sanity() {
let slot = 0;
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let empty: Vec<u64> = vec![];
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 0, 0, 1),
empty
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 5, 5, 1),
empty
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 4, 3, 1),
empty
);
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, 1, 2, 0),
empty
);
let entries = create_ticks(100, 0, Hash::default());
let mut shreds = entries_to_test_shreds(entries, slot, 0, true, 0);
assert!(shreds.len() > 2);
shreds.drain(2..);
const ONE: u64 = 1;
const OTHER: u64 = 4;
shreds[0].set_index(ONE as u32);
shreds[1].set_index(OTHER as u32);
blockstore.insert_shreds(shreds, None, false).unwrap();
const STARTS: u64 = OTHER * 2;
const END: u64 = OTHER * 3;
const MAX: usize = 10;
for start in 0..STARTS {
let result = blockstore.find_missing_data_indexes(
slot, 0, start,
END,
MAX,
);
let expected: Vec<u64> = (start..END).filter(|i| *i != ONE && *i != OTHER).collect();
assert_eq!(result, expected);
}
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_no_missing_shred_indexes() {
let slot = 0;
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries = 10;
let entries = create_ticks(num_entries, 0, Hash::default());
let shreds = entries_to_test_shreds(entries, slot, 0, true, 0);
let num_shreds = shreds.len();
blockstore.insert_shreds(shreds, None, false).unwrap();
let empty: Vec<u64> = vec![];
for i in 0..num_shreds as u64 {
for j in 0..i {
assert_eq!(
blockstore.find_missing_data_indexes(slot, 0, j, i, (i - j) as usize),
empty
);
}
}
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_should_insert_data_shred() {
solana_logger::setup();
let (mut shreds, _) = make_slot_entries(0, 0, 200);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let last_root = RwLock::new(0);
blockstore
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();
blockstore
.insert_shreds(shreds[8..9].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.received, 9);
let shred7 = {
if shreds[7].is_data() {
shreds[7].set_last_in_slot();
shreds[7].clone()
} else {
panic!("Shred in unexpected format")
}
};
assert_eq!(
blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false),
false
);
assert!(blockstore.has_duplicate_shreds_in_slot(0));
let mut shred8 = shreds[8].clone();
blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
if shred8.is_data() {
shred8.set_slot(slot_meta.last_index + 1);
} else {
panic!("Shred in unexpected format")
}
assert_eq!(
blockstore.should_insert_data_shred(&shred7, &slot_meta, &last_root, None, false),
false
);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_is_data_shred_present() {
let (shreds, _) = make_slot_entries(0, 0, 200);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let index_cf = blockstore.db.column::<cf::Index>();
blockstore
.insert_shreds(shreds[0..5].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, 5);
assert!(Blockstore::is_data_shred_present(
&shreds[1],
&slot_meta,
index.data(),
));
blockstore
.insert_shreds(shreds[6..7].to_vec(), None, false)
.unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
let index = index_cf.get(0).unwrap().unwrap();
assert!(Blockstore::is_data_shred_present(
&shreds[6],
&slot_meta,
index.data()
),);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_check_cache_coding_shred() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let slot = 1;
let (shred, coding) = Shredder::new_coding_shred_header(slot, 11, 11, 11, 11, 10, 0);
let coding_shred =
Shred::new_empty_from_header(shred, DataShredHeader::default(), coding);
let mut erasure_metas = HashMap::new();
let mut index_working_set = HashMap::new();
let mut just_received_coding_shreds = HashMap::new();
let mut index_meta_time = 0;
assert!(blockstore.check_cache_coding_shred(
coding_shred.clone(),
&mut erasure_metas,
&mut index_working_set,
&mut just_received_coding_shreds,
&mut index_meta_time,
&|_shred| {
panic!("no dupes");
},
false,
));
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
assert!(!blockstore.check_cache_coding_shred(
coding_shred,
&mut erasure_metas,
&mut index_working_set,
&mut just_received_coding_shreds,
&mut index_meta_time,
&|_shred| {
counter.fetch_add(1, Ordering::Relaxed);
},
false,
));
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
}
#[test]
pub fn test_should_insert_coding_shred() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let last_root = RwLock::new(0);
let slot = 1;
let (mut shred, coding) =
Shredder::new_coding_shred_header(slot, 11, 11, 11, 11, 10, 0);
let coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
blockstore
.insert_shreds(vec![coding_shred.clone()], None, false)
.unwrap();
{
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
shred.index += 1;
{
let coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
let index = coding_shred.coding_header.position - 1;
coding_shred.set_index(index as u32);
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = 0;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.coding_header.num_coding_shreds = coding_shred.coding_header.position;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
{
let mut coding_shred = Shred::new_empty_from_header(
shred.clone(),
DataShredHeader::default(),
coding.clone(),
);
coding_shred.common_header.fec_set_index = std::u32::MAX - 1;
coding_shred.coding_header.num_coding_shreds = 3;
coding_shred.common_header.index = std::u32::MAX - 1;
coding_shred.coding_header.position = 0;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
coding_shred.coding_header.num_coding_shreds = 2000;
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
coding_shred.coding_header.num_coding_shreds = 2;
assert!(Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
blockstore
.insert_shreds(vec![coding_shred], None, false)
.unwrap();
}
{
let mut coding_shred =
Shred::new_empty_from_header(shred, DataShredHeader::default(), coding);
coding_shred.set_slot(*last_root.read().unwrap());
assert!(!Blockstore::should_insert_coding_shred(
&coding_shred,
&last_root
));
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_insert_multiple_is_last() {
solana_logger::setup();
let (shreds, _) = make_slot_entries(0, 0, 20);
let num_shreds = shreds.len() as u64;
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, num_shreds);
assert_eq!(slot_meta.received, num_shreds);
assert_eq!(slot_meta.last_index, num_shreds - 1);
assert!(slot_meta.is_full());
let (shreds, _) = make_slot_entries(0, 0, 22);
blockstore.insert_shreds(shreds, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.consumed, num_shreds);
assert_eq!(slot_meta.received, num_shreds);
assert_eq!(slot_meta.last_index, num_shreds - 1);
assert!(slot_meta.is_full());
assert!(blockstore.has_duplicate_shreds_in_slot(0));
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_slot_data_iterator() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let shreds_per_slot = 10;
let slots = vec![2, 4, 8, 12];
let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot);
let slot_8_shreds = all_shreds[2].0.clone();
for (slot_shreds, _) in all_shreds {
blockstore.insert_shreds(slot_shreds, None, false).unwrap();
}
let shred_iter = blockstore.slot_data_iterator(5, 0).unwrap();
let result: Vec<_> = shred_iter.collect();
assert_eq!(result, vec![]);
let shred_iter = blockstore.slot_data_iterator(8, 0).unwrap();
let result: Vec<Shred> = shred_iter
.filter_map(|(_, bytes)| Shred::new_from_serialized_shred(bytes.to_vec()).ok())
.collect();
assert_eq!(result.len(), slot_8_shreds.len());
assert_eq!(result, slot_8_shreds);
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_set_roots() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let chained_slots = vec![0, 2, 4, 7, 12, 15];
assert_eq!(blockstore.last_root(), 0);
blockstore.set_roots(&chained_slots).unwrap();
assert_eq!(blockstore.last_root(), 15);
for i in chained_slots {
assert!(blockstore.is_root(i));
}
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_is_skipped() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let roots = vec![2, 4, 7, 12, 15];
blockstore.set_roots(&roots).unwrap();
for i in 0..20 {
if i < 2 || roots.contains(&i) || i > 15 {
assert!(!blockstore.is_skipped(i));
} else {
assert!(blockstore.is_skipped(i));
}
}
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_iter_bounds() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore
.slot_meta_iterator(5)
.unwrap()
.for_each(|_| panic!());
drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_completed_data_ranges() {
let completed_data_end_indexes = vec![2, 4, 9, 11];
let start_index = 0;
let consumed = 1;
assert_eq!(
Blockstore::get_completed_data_ranges(
start_index,
&completed_data_end_indexes[..],
consumed
),
vec![]
);
let start_index = 0;
let consumed = 3;
assert_eq!(
Blockstore::get_completed_data_ranges(
start_index,
&completed_data_end_indexes[..],
consumed
),
vec![(0, 2)]
);
for i in 0..completed_data_end_indexes.len() {
for j in i..completed_data_end_indexes.len() {
let start_index = completed_data_end_indexes[i];
let consumed = completed_data_end_indexes[j] + 1;
let mut expected = vec![(start_index, start_index)];
expected.extend(
completed_data_end_indexes[i..=j]
.windows(2)
.map(|end_indexes| (end_indexes[0] + 1, end_indexes[1])),
);
assert_eq!(
Blockstore::get_completed_data_ranges(
start_index,
&completed_data_end_indexes[..],
consumed
),
expected
);
}
}
}
#[test]
fn test_get_slot_entries_with_shred_count_corruption() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_ticks = 8;
let entries = create_ticks(num_ticks, 0, Hash::default());
let slot = 1;
let shreds = entries_to_test_shreds(entries, slot, 0, false, 0);
let next_shred_index = shreds.len();
blockstore
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert_eq!(
blockstore.get_slot_entries(slot, 0).unwrap().len() as u64,
num_ticks
);
let shreds = vec![Shred::new_from_data(
slot,
next_shred_index as u32,
1,
Some(&[1, 1, 1]),
true,
true,
0,
0,
next_shred_index as u32,
)];
blockstore
.insert_shreds(shreds, None, false)
.expect("Expected successful write of shreds");
assert!(blockstore.get_slot_entries(slot, 0).is_err());
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_no_insert_but_modify_slot_meta() {
let (shreds0, _) = make_slot_entries(0, 0, 200);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore
.insert_shreds(shreds0[0..5].to_vec(), None, false)
.unwrap();
let (mut shreds2, _) = make_slot_entries(2, 0, 200);
let (mut shreds3, _) = make_slot_entries(3, 0, 200);
shreds2.push(shreds0[1].clone());
shreds3.insert(0, shreds0[1].clone());
blockstore.insert_shreds(shreds2, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.next_slots, vec![2]);
blockstore.insert_shreds(shreds3, None, false).unwrap();
let slot_meta = blockstore.meta(0).unwrap().unwrap();
assert_eq!(slot_meta.next_slots, vec![2, 3]);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_trusted_insert_shreds() {
let (shreds1, _) = make_slot_entries(1, 0, 1);
let blockstore_path = get_tmp_ledger_path!();
let last_root = 100;
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_roots(&[last_root]).unwrap();
blockstore
.insert_shreds(shreds1[..].to_vec(), None, false)
.unwrap();
assert!(blockstore.get_data_shred(1, 0).unwrap().is_none());
blockstore
.insert_shreds(shreds1[..].to_vec(), None, true)
.unwrap();
assert!(blockstore.get_data_shred(1, 0).unwrap().is_some());
}
}
#[test]
fn test_get_confirmed_block() {
let slot = 10;
let entries = make_slot_entries_with_transactions(100);
let blockhash = get_last_hash(entries.iter()).unwrap();
let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0);
let more_shreds = entries_to_test_shreds(entries.clone(), slot + 1, slot, true, 0);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
ledger.insert_shreds(shreds, None, false).unwrap();
ledger.insert_shreds(more_shreds, None, false).unwrap();
ledger.set_roots(&[slot - 1, slot, slot + 1]).unwrap();
let parent_meta = SlotMeta {
parent_slot: std::u64::MAX,
..SlotMeta::default()
};
ledger
.put_meta_bytes(slot - 1, &serialize(&parent_meta).unwrap())
.unwrap();
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
.iter()
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
for (i, _account_key) in transaction.message.account_keys.iter().enumerate() {
pre_balances.push(i as u64 * 10);
post_balances.push(i as u64 * 11);
}
let signature = transaction.signatures[0];
let status = TransactionStatusMeta {
status: Ok(()),
fee: 42,
pre_balances: pre_balances.clone(),
post_balances: post_balances.clone(),
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
}
.into();
ledger
.transaction_status_cf
.put_protobuf((0, signature, slot), &status)
.unwrap();
let status = TransactionStatusMeta {
status: Ok(()),
fee: 42,
pre_balances: pre_balances.clone(),
post_balances: post_balances.clone(),
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
}
.into();
ledger
.transaction_status_cf
.put_protobuf((0, signature, slot + 1), &status)
.unwrap();
TransactionWithStatusMeta {
transaction,
meta: Some(TransactionStatusMeta {
status: Ok(()),
fee: 42,
pre_balances,
post_balances,
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
}),
}
})
.collect();
let confirmed_block_err = ledger.get_confirmed_block(slot - 1, true).unwrap_err();
assert_matches!(confirmed_block_err, BlockstoreError::SlotNotRooted);
let confirmed_block_err = ledger.get_confirmed_block(slot, true).unwrap_err();
assert_matches!(
confirmed_block_err,
BlockstoreError::ParentEntriesUnavailable
);
let confirmed_block = ledger.get_confirmed_block(slot, false).unwrap();
assert_eq!(confirmed_block.transactions.len(), 100);
let expected_block = ConfirmedBlock {
transactions: expected_transactions.clone(),
parent_slot: slot - 1,
blockhash: blockhash.to_string(),
previous_blockhash: Hash::default().to_string(),
rewards: vec![],
block_time: None,
};
assert_eq!(confirmed_block, expected_block);
let confirmed_block = ledger.get_confirmed_block(slot + 1, true).unwrap();
assert_eq!(confirmed_block.transactions.len(), 100);
let mut expected_block = ConfirmedBlock {
transactions: expected_transactions,
parent_slot: slot,
blockhash: blockhash.to_string(),
previous_blockhash: blockhash.to_string(),
rewards: vec![],
block_time: None,
};
assert_eq!(confirmed_block, expected_block);
let not_root = ledger.get_confirmed_block(slot + 2, true).unwrap_err();
assert_matches!(not_root, BlockstoreError::SlotNotRooted);
let timestamp = 1_576_183_541;
ledger.blocktime_cf.put(slot + 1, ×tamp).unwrap();
expected_block.block_time = Some(timestamp);
let confirmed_block = ledger.get_confirmed_block(slot + 1, true).unwrap();
assert_eq!(confirmed_block, expected_block);
drop(ledger);
Blockstore::destroy(&ledger_path).expect("Expected successful database destruction");
}
#[test]
fn test_persist_transaction_status() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let transaction_status_cf = blockstore.db.column::<cf::TransactionStatus>();
let pre_balances_vec = vec![1, 2, 3];
let post_balances_vec = vec![3, 2, 1];
let inner_instructions_vec = vec![InnerInstructions {
index: 0,
instructions: vec![CompiledInstruction::new(1, &(), vec![0])],
}];
let log_messages_vec = vec![String::from("Test message\n")];
let pre_token_balances_vec = vec![];
let post_token_balances_vec = vec![];
assert!(transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((
0,
Signature::default(),
0
))
.unwrap()
.is_none());
let status = TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Err(
TransactionError::AccountNotFound,
),
fee: 5u64,
pre_balances: pre_balances_vec.clone(),
post_balances: post_balances_vec.clone(),
inner_instructions: Some(inner_instructions_vec.clone()),
log_messages: Some(log_messages_vec.clone()),
pre_token_balances: Some(pre_token_balances_vec.clone()),
post_token_balances: Some(post_token_balances_vec.clone()),
}
.into();
assert!(transaction_status_cf
.put_protobuf((0, Signature::default(), 0), &status,)
.is_ok());
let TransactionStatusMeta {
status,
fee,
pre_balances,
post_balances,
inner_instructions,
log_messages,
pre_token_balances,
post_token_balances,
} = transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((
0,
Signature::default(),
0,
))
.unwrap()
.unwrap()
.try_into()
.unwrap();
assert_eq!(status, Err(TransactionError::AccountNotFound));
assert_eq!(fee, 5u64);
assert_eq!(pre_balances, pre_balances_vec);
assert_eq!(post_balances, post_balances_vec);
assert_eq!(inner_instructions.unwrap(), inner_instructions_vec);
assert_eq!(log_messages.unwrap(), log_messages_vec);
assert_eq!(pre_token_balances.unwrap(), pre_token_balances_vec);
assert_eq!(post_token_balances.unwrap(), post_token_balances_vec);
let status = TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Ok(()),
fee: 9u64,
pre_balances: pre_balances_vec.clone(),
post_balances: post_balances_vec.clone(),
inner_instructions: Some(inner_instructions_vec.clone()),
log_messages: Some(log_messages_vec.clone()),
pre_token_balances: Some(pre_token_balances_vec.clone()),
post_token_balances: Some(post_token_balances_vec.clone()),
}
.into();
assert!(transaction_status_cf
.put_protobuf((0, Signature::new(&[2u8; 64]), 9), &status,)
.is_ok());
let TransactionStatusMeta {
status,
fee,
pre_balances,
post_balances,
inner_instructions,
log_messages,
pre_token_balances,
post_token_balances,
} = transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((
0,
Signature::new(&[2u8; 64]),
9,
))
.unwrap()
.unwrap()
.try_into()
.unwrap();
assert_eq!(status, Ok(()));
assert_eq!(fee, 9u64);
assert_eq!(pre_balances, pre_balances_vec);
assert_eq!(post_balances, post_balances_vec);
assert_eq!(inner_instructions.unwrap(), inner_instructions_vec);
assert_eq!(log_messages.unwrap(), log_messages_vec);
assert_eq!(pre_token_balances.unwrap(), pre_token_balances_vec);
assert_eq!(post_token_balances.unwrap(), post_token_balances_vec);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
#[allow(clippy::cognitive_complexity)]
fn test_transaction_status_index() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let transaction_status_index_cf = blockstore.db.column::<cf::TransactionStatusIndex>();
let slot0 = 10;
assert!(transaction_status_index_cf.get(0).unwrap().is_some());
assert!(transaction_status_index_cf.get(1).unwrap().is_some());
for _ in 0..5 {
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
slot0,
Signature::new(&random_bytes),
vec![&Pubkey::new(&random_bytes[0..32])],
vec![&Pubkey::new(&random_bytes[32..])],
TransactionStatusMeta::default(),
)
.unwrap();
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot0,
frozen: false,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta::default()
);
let first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_status_entry.0, 0);
assert_eq!(first_status_entry.2, slot0);
let first_address_entry = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
cf::AddressSignatures::as_index(0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_address_entry.0, 0);
assert_eq!(first_address_entry.2, slot0);
blockstore.run_purge(0, 8, PurgeType::PrimaryIndex).unwrap();
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot0,
frozen: true,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta::default()
);
let slot1 = 20;
for _ in 0..5 {
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
blockstore
.write_transaction_status(
slot1,
Signature::new(&random_bytes),
vec![&Pubkey::new(&random_bytes[0..32])],
vec![&Pubkey::new(&random_bytes[32..])],
TransactionStatusMeta::default(),
)
.unwrap();
}
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot0,
frozen: true,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot1,
frozen: false,
}
);
let first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_status_entry.0, 0);
assert_eq!(first_status_entry.2, 10);
let first_address_entry = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
cf::AddressSignatures::as_index(0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_address_entry.0, 0);
assert_eq!(first_address_entry.2, slot0);
let index1_first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(1),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(index1_first_status_entry.0, 1);
assert_eq!(index1_first_status_entry.2, slot1);
let index1_first_address_entry = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
cf::AddressSignatures::as_index(1),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(index1_first_address_entry.0, 1);
assert_eq!(index1_first_address_entry.2, slot1);
blockstore
.run_purge(0, 18, PurgeType::PrimaryIndex)
.unwrap();
assert_eq!(
transaction_status_index_cf.get(0).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: 0,
frozen: false,
}
);
assert_eq!(
transaction_status_index_cf.get(1).unwrap().unwrap(),
TransactionStatusIndexMeta {
max_slot: slot1,
frozen: true,
}
);
let first_status_entry = blockstore
.db
.iter::<cf::TransactionStatus>(IteratorMode::From(
cf::TransactionStatus::as_index(0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_status_entry.0, 1);
assert_eq!(first_status_entry.2, slot1);
let first_address_entry = blockstore
.db
.iter::<cf::AddressSignatures>(IteratorMode::From(
cf::AddressSignatures::as_index(0),
IteratorDirection::Forward,
))
.unwrap()
.next()
.unwrap()
.0;
assert_eq!(first_address_entry.0, 1);
assert_eq!(first_address_entry.2, slot1);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_transaction_status() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let transaction_status_cf = blockstore.db.column::<cf::TransactionStatus>();
let pre_balances_vec = vec![1, 2, 3];
let post_balances_vec = vec![3, 2, 1];
let status = TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Ok(()),
fee: 42u64,
pre_balances: pre_balances_vec,
post_balances: post_balances_vec,
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
}
.into();
let signature1 = Signature::new(&[1u8; 64]);
let signature2 = Signature::new(&[2u8; 64]);
let signature3 = Signature::new(&[3u8; 64]);
let signature4 = Signature::new(&[4u8; 64]);
let signature5 = Signature::new(&[5u8; 64]);
let signature6 = Signature::new(&[6u8; 64]);
transaction_status_cf
.put_protobuf((0, signature2, 1), &status)
.unwrap();
transaction_status_cf
.put_protobuf((0, signature2, 2), &status)
.unwrap();
transaction_status_cf
.put_protobuf((0, signature4, 0), &status)
.unwrap();
transaction_status_cf
.put_protobuf((0, signature4, 1), &status)
.unwrap();
transaction_status_cf
.put_protobuf((0, signature5, 0), &status)
.unwrap();
transaction_status_cf
.put_protobuf((0, signature5, 1), &status)
.unwrap();
transaction_status_cf
.put_protobuf((1, signature4, 1), &status)
.unwrap();
transaction_status_cf
.put_protobuf((1, signature4, 2), &status)
.unwrap();
transaction_status_cf
.put_protobuf((1, signature5, 0), &status)
.unwrap();
transaction_status_cf
.put_protobuf((1, signature5, 1), &status)
.unwrap();
blockstore.set_roots(&[2]).unwrap();
if let (Some((slot, _status)), counter) = blockstore
.get_transaction_status_with_counter(signature2)
.unwrap()
{
assert_eq!(slot, 2);
assert_eq!(counter, 2);
}
if let (Some((slot, _status)), counter) = blockstore
.get_transaction_status_with_counter(signature4)
.unwrap()
{
assert_eq!(slot, 2);
assert_eq!(counter, 5);
}
let (status, counter) = blockstore
.get_transaction_status_with_counter(signature5)
.unwrap();
assert_eq!(status, None);
assert_eq!(counter, 6);
let (status, counter) = blockstore
.get_transaction_status_with_counter(signature1)
.unwrap();
assert_eq!(status, None);
assert_eq!(counter, 2);
let (status, counter) = blockstore
.get_transaction_status_with_counter(signature3)
.unwrap();
assert_eq!(status, None);
assert_eq!(counter, 2);
let (status, counter) = blockstore
.get_transaction_status_with_counter(signature6)
.unwrap();
assert_eq!(status, None);
assert_eq!(counter, 2);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_confirmed_transaction() {
let slot = 2;
let entries = make_slot_entries_with_transactions(5);
let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0);
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
blockstore.insert_shreds(shreds, None, false).unwrap();
blockstore.set_roots(&[slot - 1, slot]).unwrap();
let expected_transactions: Vec<TransactionWithStatusMeta> = entries
.iter()
.cloned()
.filter(|entry| !entry.is_tick())
.flat_map(|entry| entry.transactions)
.map(|transaction| {
let mut pre_balances: Vec<u64> = vec![];
let mut post_balances: Vec<u64> = vec![];
for (i, _account_key) in transaction.message.account_keys.iter().enumerate() {
pre_balances.push(i as u64 * 10);
post_balances.push(i as u64 * 11);
}
let inner_instructions = Some(vec![InnerInstructions {
index: 0,
instructions: vec![CompiledInstruction::new(1, &(), vec![0])],
}]);
let log_messages = Some(vec![String::from("Test message\n")]);
let pre_token_balances = Some(vec![]);
let post_token_balances = Some(vec![]);
let signature = transaction.signatures[0];
let status = TransactionStatusMeta {
status: Ok(()),
fee: 42,
pre_balances: pre_balances.clone(),
post_balances: post_balances.clone(),
inner_instructions: inner_instructions.clone(),
log_messages: log_messages.clone(),
pre_token_balances: pre_token_balances.clone(),
post_token_balances: post_token_balances.clone(),
}
.into();
blockstore
.transaction_status_cf
.put_protobuf((0, signature, slot), &status)
.unwrap();
TransactionWithStatusMeta {
transaction,
meta: Some(TransactionStatusMeta {
status: Ok(()),
fee: 42,
pre_balances,
post_balances,
inner_instructions,
log_messages,
pre_token_balances,
post_token_balances,
}),
}
})
.collect();
for transaction in expected_transactions.clone() {
let signature = transaction.transaction.signatures[0];
assert_eq!(
blockstore.get_confirmed_transaction(signature).unwrap(),
Some(ConfirmedTransaction {
slot,
transaction,
block_time: None
})
);
}
blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap();
*blockstore.lowest_cleanup_slot.write().unwrap() = slot;
for TransactionWithStatusMeta { transaction, .. } in expected_transactions {
let signature = transaction.signatures[0];
assert_eq!(
blockstore.get_confirmed_transaction(signature).unwrap(),
None,
);
}
}
#[test]
fn test_empty_transaction_status() {
let blockstore_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_roots(&[0]).unwrap();
assert_eq!(
blockstore
.get_confirmed_transaction(Signature::default())
.unwrap(),
None
);
}
#[test]
fn test_get_confirmed_signatures_for_address() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let address0 = solana_sdk::pubkey::new_rand();
let address1 = solana_sdk::pubkey::new_rand();
let slot0 = 10;
for x in 1..5 {
let signature = Signature::new(&[x; 64]);
blockstore
.write_transaction_status(
slot0,
signature,
vec![&address0],
vec![&address1],
TransactionStatusMeta::default(),
)
.unwrap();
}
blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap();
let slot1 = 20;
for x in 5..9 {
let signature = Signature::new(&[x; 64]);
blockstore
.write_transaction_status(
slot1,
signature,
vec![&address0],
vec![&address1],
TransactionStatusMeta::default(),
)
.unwrap();
}
blockstore.set_roots(&[slot0, slot1]).unwrap();
let all0 = blockstore
.get_confirmed_signatures_for_address(address0, 0, 50)
.unwrap();
assert_eq!(all0.len(), 8);
for x in 1..9 {
let expected_signature = Signature::new(&[x; 64]);
assert_eq!(all0[x as usize - 1], expected_signature);
}
assert_eq!(
blockstore
.get_confirmed_signatures_for_address(address0, 20, 50)
.unwrap()
.len(),
4
);
assert_eq!(
blockstore
.get_confirmed_signatures_for_address(address0, 0, 10)
.unwrap()
.len(),
4
);
assert!(blockstore
.get_confirmed_signatures_for_address(address0, 1, 5)
.unwrap()
.is_empty());
assert_eq!(
blockstore
.get_confirmed_signatures_for_address(address0, 1, 15)
.unwrap()
.len(),
4
);
let all1 = blockstore
.get_confirmed_signatures_for_address(address1, 0, 50)
.unwrap();
assert_eq!(all1.len(), 8);
for x in 1..9 {
let expected_signature = Signature::new(&[x; 64]);
assert_eq!(all1[x as usize - 1], expected_signature);
}
blockstore
.run_purge(0, 10, PurgeType::PrimaryIndex)
.unwrap();
assert_eq!(
blockstore
.get_confirmed_signatures_for_address(address0, 0, 50)
.unwrap()
.len(),
4
);
assert_eq!(
blockstore
.get_confirmed_signatures_for_address(address0, 20, 50)
.unwrap()
.len(),
4
);
assert!(blockstore
.get_confirmed_signatures_for_address(address0, 0, 10)
.unwrap()
.is_empty());
assert!(blockstore
.get_confirmed_signatures_for_address(address0, 1, 5)
.unwrap()
.is_empty());
assert_eq!(
blockstore
.get_confirmed_signatures_for_address(address0, 1, 25)
.unwrap()
.len(),
4
);
for slot in (21..25).rev() {
let random_bytes: Vec<u8> = (0..64).map(|_| rand::random::<u8>()).collect();
let signature = Signature::new(&random_bytes);
blockstore
.write_transaction_status(
slot,
signature,
vec![&address0],
vec![&address1],
TransactionStatusMeta::default(),
)
.unwrap();
}
blockstore.set_roots(&[21, 22, 23, 24]).unwrap();
let mut past_slot = 0;
for (slot, _) in blockstore.find_address_signatures(address0, 1, 25).unwrap() {
assert!(slot >= past_slot);
past_slot = slot;
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_get_confirmed_signatures_for_address2() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
fn make_slot_entries_with_transaction_addresses(addresses: &[Pubkey]) -> Vec<Entry> {
let mut entries: Vec<Entry> = Vec::new();
for address in addresses {
let transaction = Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[*address],
Hash::default(),
vec![solana_sdk::pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
);
entries.push(next_entry_mut(&mut Hash::default(), 0, vec![transaction]));
let mut tick = create_ticks(1, 0, hash(&serialize(address).unwrap()));
entries.append(&mut tick);
}
entries
}
let address0 = solana_sdk::pubkey::new_rand();
let address1 = solana_sdk::pubkey::new_rand();
for slot in 2..=8 {
let entries = make_slot_entries_with_transaction_addresses(&[
address0, address1, address0, address1,
]);
let shreds = entries_to_test_shreds(entries.clone(), slot, slot - 1, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
for (i, entry) in entries.iter().enumerate() {
if slot == 4 && i == 2 {
blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap();
}
for transaction in &entry.transactions {
assert_eq!(transaction.signatures.len(), 1);
blockstore
.write_transaction_status(
slot,
transaction.signatures[0],
transaction.message.account_keys.iter().collect(),
vec![],
TransactionStatusMeta::default(),
)
.unwrap();
}
}
}
blockstore.set_roots(&[1, 2, 4, 5, 6, 7, 8]).unwrap();
let highest_confirmed_root = 8;
let all0 = blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
None,
None,
usize::MAX,
)
.unwrap();
assert_eq!(all0.len(), 12);
let all1 = blockstore
.get_confirmed_signatures_for_address2(
address1,
highest_confirmed_root,
None,
None,
usize::MAX,
)
.unwrap();
assert_eq!(all1.len(), 12);
assert!(all0 != all1);
for i in 0..all0.len() {
let results = blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
if i == 0 {
None
} else {
Some(all0[i - 1].signature)
},
None,
1,
)
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
}
for i in 0..all0.len() {
let results = blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
if i == 0 {
None
} else {
Some(all0[i - 1].signature)
},
if i == all0.len() - 1 || i == all0.len() {
None
} else {
Some(all0[i + 1].signature)
},
10,
)
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0], all0[i], "Unexpected result for {}", i);
}
assert!(blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
Some(all0[all0.len() - 1].signature),
None,
1,
)
.unwrap()
.is_empty());
assert!(blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
None,
Some(all0[0].signature),
2,
)
.unwrap()
.is_empty());
assert!(all0.len() % 3 == 0);
for i in (0..all0.len()).step_by(3) {
let results = blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
if i == 0 {
None
} else {
Some(all0[i - 1].signature)
},
None,
3,
)
.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0], all0[i]);
assert_eq!(results[1], all0[i + 1]);
assert_eq!(results[2], all0[i + 2]);
}
for i in (0..all1.len()).step_by(2) {
let results = blockstore
.get_confirmed_signatures_for_address2(
address1,
highest_confirmed_root,
if i == 0 {
None
} else {
Some(all1[i - 1].signature)
},
None,
2,
)
.unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].slot, results[1].slot);
assert!(results[0].signature >= results[1].signature);
assert_eq!(results[0], all1[i]);
assert_eq!(results[1], all1[i + 1]);
}
let results = blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
Some(all1[0].signature),
None,
usize::MAX,
)
.unwrap();
assert!(!results.is_empty());
let results2 = blockstore
.get_confirmed_signatures_for_address2(
address0,
highest_confirmed_root,
Some(all1[0].signature),
Some(all1[4].signature),
usize::MAX,
)
.unwrap();
assert!(results2.len() < results.len());
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
#[allow(clippy::same_item_push)]
fn test_get_last_hash() {
let mut entries: Vec<Entry> = vec![];
let empty_entries_iterator = entries.iter();
assert!(get_last_hash(empty_entries_iterator).is_none());
let mut prev_hash = hash::hash(&[42u8]);
for _ in 0..10 {
let entry = next_entry(&prev_hash, 1, vec![]);
prev_hash = entry.hash;
entries.push(entry);
}
let entries_iterator = entries.iter();
assert_eq!(get_last_hash(entries_iterator).unwrap(), entries[9].hash);
}
#[test]
fn test_map_transactions_to_statuses() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let transaction_status_cf = blockstore.db.column::<cf::TransactionStatus>();
let slot = 0;
let mut transactions: Vec<Transaction> = vec![];
for x in 0..4 {
let transaction = Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[solana_sdk::pubkey::new_rand()],
Hash::default(),
vec![solana_sdk::pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
);
let status = TransactionStatusMeta {
status: solana_sdk::transaction::Result::<()>::Err(
TransactionError::AccountNotFound,
),
fee: x,
pre_balances: vec![],
post_balances: vec![],
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![]),
post_token_balances: Some(vec![]),
}
.into();
transaction_status_cf
.put_protobuf((0, transaction.signatures[0], slot), &status)
.unwrap();
transactions.push(transaction);
}
transactions.push(Transaction::new_with_compiled_instructions(
&[&Keypair::new()],
&[solana_sdk::pubkey::new_rand()],
Hash::default(),
vec![solana_sdk::pubkey::new_rand()],
vec![CompiledInstruction::new(1, &(), vec![0])],
));
let map = blockstore.map_transactions_to_statuses(slot, transactions.into_iter());
assert_eq!(map.len(), 5);
for (x, m) in map.iter().take(4).enumerate() {
assert_eq!(m.meta.as_ref().unwrap().fee, x as u64);
}
assert_eq!(map[4].meta, None);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_write_get_perf_samples() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let num_entries: usize = 10;
let mut perf_samples: Vec<(Slot, PerfSample)> = vec![];
for x in 1..num_entries + 1 {
perf_samples.push((
x as u64 * 50,
PerfSample {
num_transactions: 1000 + x as u64,
num_slots: 50,
sample_period_secs: 20,
},
));
}
for (slot, sample) in perf_samples.iter() {
blockstore.write_perf_sample(*slot, sample).unwrap();
}
for x in 0..num_entries {
let mut expected_samples = perf_samples[num_entries - 1 - x..].to_vec();
expected_samples.sort_by(|a, b| b.0.cmp(&a.0));
assert_eq!(
blockstore.get_recent_perf_samples(x + 1).unwrap(),
expected_samples
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_lowest_slot() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
for i in 0..10 {
let slot = i;
let (shreds, _) = make_slot_entries(slot, 0, 1);
blockstore.insert_shreds(shreds, None, false).unwrap();
}
assert_eq!(blockstore.lowest_slot(), 1);
blockstore.run_purge(0, 5, PurgeType::PrimaryIndex).unwrap();
assert_eq!(blockstore.lowest_slot(), 6);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_recovery() {
let slot = 1;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100, 1.0);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore
.insert_shreds(coding_shreds, Some(&leader_schedule_cache), false)
.unwrap();
let shred_bufs: Vec<_> = data_shreds
.iter()
.map(|shred| shred.payload.clone())
.collect();
for (s, buf) in data_shreds.iter().zip(shred_bufs) {
assert_eq!(
blockstore
.get_data_shred(s.slot(), s.index() as u64)
.unwrap()
.unwrap(),
buf
);
}
verify_index_integrity(&blockstore, slot);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_index_integrity() {
let slot = 1;
let num_entries = 100;
let (data_shreds, coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, num_entries, 1.0);
assert!(data_shreds.len() > 3);
assert!(coding_shreds.len() > 3);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let all_shreds: Vec<_> = data_shreds
.iter()
.cloned()
.chain(coding_shreds.iter().cloned())
.collect();
blockstore
.insert_shreds(all_shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
blockstore
.insert_shreds(coding_shreds.clone(), Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
blockstore
.insert_shreds(
coding_shreds[..coding_shreds.len() - 1].to_vec(),
Some(&leader_schedule_cache),
false,
)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
let shreds: Vec<_> = data_shreds[..data_shreds.len() - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() - 1].iter().cloned())
.collect();
blockstore
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
let shreds: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
blockstore
.insert_shreds(shreds, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..]
.iter()
.cloned()
.chain(coding_shreds[coding_shreds.len() / 2 - 1..].iter().cloned())
.collect();
blockstore
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blockstore
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 1].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 1..data_shreds.len() / 2]
.iter()
.cloned()
.chain(
coding_shreds[coding_shreds.len() / 2 - 1..coding_shreds.len() / 2]
.iter()
.cloned(),
)
.collect();
blockstore
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blockstore
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
let shreds1: Vec<_> = data_shreds[..data_shreds.len() / 2 - 2]
.iter()
.cloned()
.chain(coding_shreds[..coding_shreds.len() / 2 - 2].iter().cloned())
.collect();
let shreds2: Vec<_> = data_shreds[data_shreds.len() / 2 - 2..data_shreds.len() / 2 - 1]
.iter()
.cloned()
.chain(
coding_shreds[coding_shreds.len() / 2 - 2..coding_shreds.len() / 2 - 1]
.iter()
.cloned(),
)
.collect();
blockstore
.insert_shreds(shreds1, Some(&leader_schedule_cache), false)
.unwrap();
blockstore
.insert_shreds(shreds2, Some(&leader_schedule_cache), false)
.unwrap();
verify_index_integrity(&blockstore, slot);
blockstore.purge_and_compact_slots(0, slot);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
fn setup_erasure_shreds(
slot: u64,
parent_slot: u64,
num_entries: u64,
erasure_rate: f32,
) -> (Vec<Shred>, Vec<Shred>, Arc<LeaderScheduleCache>) {
let entries = make_slot_entries_with_transactions(num_entries);
let leader_keypair = Arc::new(Keypair::new());
let shredder = Shredder::new(
slot,
parent_slot,
erasure_rate,
leader_keypair.clone(),
0,
0,
)
.expect("Failed in creating shredder");
let (data_shreds, coding_shreds, _) = shredder.entries_to_shreds(&entries, true, 0);
let genesis_config = create_genesis_config(2).genesis_config;
let bank = Arc::new(Bank::new(&genesis_config));
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
let fixed_schedule = FixedSchedule {
leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![
leader_keypair.pubkey()
])),
start_epoch: 0,
};
leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));
(data_shreds, coding_shreds, Arc::new(leader_schedule_cache))
}
fn verify_index_integrity(blockstore: &Blockstore, slot: u64) {
let index = blockstore.get_index(slot).unwrap().unwrap();
let data_iter = blockstore.slot_data_iterator(slot, 0).unwrap();
let mut num_data = 0;
for ((slot, index), _) in data_iter {
num_data += 1;
assert!(blockstore.get_data_shred(slot, index).unwrap().is_some());
}
let num_data_in_index = index.data().num_shreds();
assert_eq!(num_data_in_index, num_data);
let coding_iter = blockstore.slot_coding_iterator(slot, 0).unwrap();
let mut num_coding = 0;
for ((slot, index), _) in coding_iter {
num_coding += 1;
assert!(blockstore.get_coding_shred(slot, index).unwrap().is_some());
}
let num_coding_in_index = index.coding().num_shreds();
assert_eq!(num_coding_in_index, num_coding);
}
#[test]
fn test_duplicate_slot() {
let slot = 0;
let entries1 = make_slot_entries_with_transactions(1);
let entries2 = make_slot_entries_with_transactions(1);
let leader_keypair = Arc::new(Keypair::new());
let shredder =
Shredder::new(slot, 0, 1.0, leader_keypair, 0, 0).expect("Failed in creating shredder");
let (shreds, _, _) = shredder.entries_to_shreds(&entries1, true, 0);
let (duplicate_shreds, _, _) = shredder.entries_to_shreds(&entries2, true, 0);
let shred = shreds[0].clone();
let duplicate_shred = duplicate_shreds[0].clone();
let non_duplicate_shred = shred.clone();
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore
.insert_shreds(vec![shred.clone()], None, false)
.unwrap();
assert!(!blockstore.has_duplicate_shreds_in_slot(slot));
assert_eq!(
blockstore.is_shred_duplicate(
slot,
0,
&duplicate_shred.payload,
duplicate_shred.is_data()
),
Some(shred.payload.clone())
);
assert!(blockstore
.is_shred_duplicate(
slot,
0,
&non_duplicate_shred.payload,
duplicate_shred.is_data()
)
.is_none());
blockstore
.store_duplicate_slot(slot, shred.payload.clone(), duplicate_shred.payload.clone())
.unwrap();
assert!(blockstore.has_duplicate_shreds_in_slot(slot));
let duplicate_proof = blockstore.get_duplicate_slot(slot).unwrap();
assert_eq!(duplicate_proof.shred1, shred.payload);
assert_eq!(duplicate_proof.shred2, duplicate_shred.payload);
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_clear_unconfirmed_slot() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let unconfirmed_slot = 9;
let unconfirmed_child_slot = 10;
let slots = vec![2, unconfirmed_slot, unconfirmed_child_slot];
let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1)
.into_iter()
.flat_map(|x| x.0)
.collect();
blockstore.insert_shreds(shreds, None, false).unwrap();
assert!(blockstore
.get_data_shred(unconfirmed_slot, 0)
.unwrap()
.is_some());
assert!(blockstore
.get_data_shred(unconfirmed_slot, 1)
.unwrap()
.is_none());
blockstore.set_dead_slot(unconfirmed_slot).unwrap();
blockstore.clear_unconfirmed_slot(unconfirmed_slot);
assert!(!blockstore.is_dead(unconfirmed_slot));
assert_eq!(
blockstore
.meta(unconfirmed_slot)
.unwrap()
.unwrap()
.next_slots,
vec![unconfirmed_child_slot]
);
assert!(blockstore
.get_data_shred(unconfirmed_slot, 0)
.unwrap()
.is_none());
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_update_completed_data_indexes() {
let mut completed_data_indexes: Vec<u32> = vec![];
let mut shred_index = ShredIndex::default();
for i in 0..10 {
shred_index.set_present(i as u64, true);
assert_eq!(
update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes),
vec![(i, i)]
);
assert_eq!(completed_data_indexes, (0..=i).collect::<Vec<u32>>());
}
}
#[test]
fn test_update_completed_data_indexes_out_of_order() {
let mut completed_data_indexes = vec![];
let mut shred_index = ShredIndex::default();
shred_index.set_present(4, true);
assert!(
update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert!(completed_data_indexes.is_empty());
shred_index.set_present(2, true);
assert!(
update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert!(completed_data_indexes.is_empty());
shred_index.set_present(3, true);
assert!(
update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes)
.is_empty()
);
assert_eq!(completed_data_indexes, vec![3]);
shred_index.set_present(1, true);
assert_eq!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes),
vec![(2, 3)]
);
assert_eq!(completed_data_indexes, vec![1, 3]);
shred_index.set_present(0, true);
assert_eq!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
vec![(0, 0), (1, 1)]
);
assert_eq!(completed_data_indexes, vec![0, 1, 3]);
}
#[test]
fn test_rewards_protobuf_backward_compatability() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let rewards: Rewards = (0..100)
.map(|i| Reward {
pubkey: solana_sdk::pubkey::new_rand().to_string(),
lamports: 42 + i,
post_balance: std::u64::MAX,
reward_type: Some(RewardType::Fee),
})
.collect();
let protobuf_rewards: generated::Rewards = rewards.into();
let deprecated_rewards: StoredExtendedRewards = protobuf_rewards.clone().into();
for slot in 0..2 {
let data = serialize(&deprecated_rewards).unwrap();
blockstore.rewards_cf.put_bytes(slot, &data).unwrap();
}
for slot in 2..4 {
blockstore
.rewards_cf
.put_protobuf(slot, &protobuf_rewards)
.unwrap();
}
for slot in 0..4 {
assert_eq!(
blockstore
.rewards_cf
.get_protobuf_or_bincode::<StoredExtendedRewards>(slot)
.unwrap()
.unwrap(),
protobuf_rewards
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_transaction_status_protobuf_backward_compatability() {
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
let status = TransactionStatusMeta {
status: Ok(()),
fee: 42,
pre_balances: vec![1, 2, 3],
post_balances: vec![1, 2, 3],
inner_instructions: Some(vec![]),
log_messages: Some(vec![]),
pre_token_balances: Some(vec![TransactionTokenBalance {
account_index: 0,
mint: Pubkey::new_unique().to_string(),
ui_token_amount: UiTokenAmount {
ui_amount: Some(1.1),
decimals: 1,
amount: "11".to_string(),
ui_amount_string: "1.1".to_string(),
},
}]),
post_token_balances: Some(vec![TransactionTokenBalance {
account_index: 0,
mint: Pubkey::new_unique().to_string(),
ui_token_amount: UiTokenAmount {
ui_amount: None,
decimals: 1,
amount: "11".to_string(),
ui_amount_string: "1.1".to_string(),
},
}]),
};
let deprecated_status: StoredTransactionStatusMeta = status.clone().into();
let protobuf_status: generated::TransactionStatusMeta = status.into();
for slot in 0..2 {
let data = serialize(&deprecated_status).unwrap();
blockstore
.transaction_status_cf
.put_bytes((0, Signature::default(), slot), &data)
.unwrap();
}
for slot in 2..4 {
blockstore
.transaction_status_cf
.put_protobuf((0, Signature::default(), slot), &protobuf_status)
.unwrap();
}
for slot in 0..4 {
assert_eq!(
blockstore
.transaction_status_cf
.get_protobuf_or_bincode::<StoredTransactionStatusMeta>((
0,
Signature::default(),
slot
))
.unwrap()
.unwrap(),
protobuf_status
);
}
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
#[test]
fn test_remove_shred_data_complete_flag() {
let (mut shreds, entries) = make_slot_entries(0, 0, 1);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
shreds[0].unset_data_complete();
ledger.insert_shreds(shreds, None, false).unwrap();
let stored_shred = &ledger.get_data_shreds_for_slot(0, 0).unwrap()[0];
assert!(!stored_shred.data_complete());
assert!(stored_shred.last_in_slot());
assert_eq!(entries, ledger.get_any_valid_slot_entries(0, 0));
}
fn make_large_tx_entry(num_txs: usize) -> Entry {
let txs: Vec<_> = (0..num_txs)
.into_iter()
.map(|_| {
let keypair0 = Keypair::new();
let to = solana_sdk::pubkey::new_rand();
solana_sdk::system_transaction::transfer(&keypair0, &to, 1, Hash::default())
})
.collect();
Entry::new(&Hash::default(), 1, txs)
}
#[test]
fn erasure_multiple_config() {
solana_logger::setup();
let slot = 1;
let parent = 0;
let num_txs = 20;
let entry = make_large_tx_entry(num_txs);
let shreds = entries_to_test_shreds(vec![entry], slot, parent, true, 0);
assert!(shreds.len() > 1);
let ledger_path = get_tmp_ledger_path!();
let ledger = Blockstore::open(&ledger_path).unwrap();
let coding1 = Shredder::generate_coding_shreds(slot, 0.5f32, &shreds, 0x42, usize::MAX);
let coding2 = Shredder::generate_coding_shreds(slot, 1.0f32, &shreds, 0x42, usize::MAX);
for shred in &shreds {
info!("shred {:?}", shred);
}
for shred in &coding1 {
info!("coding1 {:?}", shred);
}
for shred in &coding2 {
info!("coding2 {:?}", shred);
}
ledger
.insert_shreds(shreds[..shreds.len() - 2].to_vec(), None, false)
.unwrap();
ledger
.insert_shreds(vec![coding1[0].clone(), coding2[1].clone()], None, false)
.unwrap();
assert!(ledger.has_duplicate_shreds_in_slot(slot));
}
#[test]
fn test_large_num_coding() {
solana_logger::setup();
let slot = 1;
let (_data_shreds, mut coding_shreds, leader_schedule_cache) =
setup_erasure_shreds(slot, 0, 100, 1.0);
let blockstore_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&blockstore_path).unwrap();
coding_shreds[1].coding_header.num_coding_shreds = u16::MAX;
blockstore
.insert_shreds(
vec![coding_shreds[1].clone()],
Some(&leader_schedule_cache),
false,
)
.unwrap();
let res = blockstore.get_coding_shreds_for_slot(slot, 0).unwrap();
assert!(res.is_empty());
}
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
}