#![allow(clippy::integer_arithmetic)]
use log::*;
use serde::{Deserialize, Serialize};
use solana_sdk::{
clock::{Slot, UnixTimestamp},
pubkey::Pubkey,
signature::Signature,
sysvar::is_sysvar_id,
transaction::{Transaction, TransactionError},
};
use solana_storage_proto::convert::generated;
use solana_storage_proto::convert::generated_evm;
use solana_storage_proto::convert::tx_by_addr;
use solana_transaction_status::{
ConfirmedBlock, ConfirmedTransaction, ConfirmedTransactionStatusWithSignature, Reward,
TransactionByAddrInfo, TransactionConfirmationStatus, TransactionStatus, TransactionStatusMeta,
TransactionWithStatusMeta,
};
use std::{collections::HashMap, convert::TryInto};
use thiserror::Error;
#[macro_use]
extern crate serde_derive;
mod access_token;
mod bigtable;
mod compression;
mod root_ca_certificate;
#[derive(Debug, Error)]
pub enum Error {
#[error("BigTable: {0}")]
BigTableError(bigtable::Error),
#[error("I/O Error: {0}")]
IoError(std::io::Error),
#[error("Transaction encoded is not supported")]
UnsupportedTransactionEncoding,
#[error("Block not found: {0}")]
BlockNotFound(Slot),
#[error("Signature not found")]
SignatureNotFound,
}
impl std::convert::From<bigtable::Error> for Error {
fn from(err: bigtable::Error) -> Self {
Self::BigTableError(err)
}
}
impl std::convert::From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::IoError(err)
}
}
pub type Result<T> = std::result::Result<T, Error>;
fn slot_to_key(slot: Slot) -> String {
format!("{:016x}", slot)
}
fn key_to_slot(key: &str) -> Option<Slot> {
match Slot::from_str_radix(key, 16) {
Ok(slot) => Some(slot),
Err(err) => {
warn!("Failed to parse object key as a slot: {}: {}", key, err);
None
}
}
}
#[derive(Serialize, Deserialize)]
struct StoredConfirmedBlock {
previous_blockhash: String,
blockhash: String,
parent_slot: Slot,
transactions: Vec<StoredConfirmedBlockTransaction>,
rewards: StoredConfirmedBlockRewards,
block_time: Option<UnixTimestamp>,
}
impl From<ConfirmedBlock> for StoredConfirmedBlock {
fn from(confirmed_block: ConfirmedBlock) -> Self {
let ConfirmedBlock {
previous_blockhash,
blockhash,
parent_slot,
transactions,
rewards,
block_time,
} = confirmed_block;
Self {
previous_blockhash,
blockhash,
parent_slot,
transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
block_time,
}
}
}
impl From<StoredConfirmedBlock> for ConfirmedBlock {
fn from(confirmed_block: StoredConfirmedBlock) -> Self {
let StoredConfirmedBlock {
previous_blockhash,
blockhash,
parent_slot,
transactions,
rewards,
block_time,
} = confirmed_block;
Self {
previous_blockhash,
blockhash,
parent_slot,
transactions: transactions.into_iter().map(|tx| tx.into()).collect(),
rewards: rewards.into_iter().map(|reward| reward.into()).collect(),
block_time,
}
}
}
#[derive(Serialize, Deserialize)]
struct StoredConfirmedBlockTransaction {
transaction: Transaction,
meta: Option<StoredConfirmedBlockTransactionStatusMeta>,
}
impl From<TransactionWithStatusMeta> for StoredConfirmedBlockTransaction {
fn from(value: TransactionWithStatusMeta) -> Self {
Self {
transaction: value.transaction,
meta: value.meta.map(|meta| meta.into()),
}
}
}
impl From<StoredConfirmedBlockTransaction> for TransactionWithStatusMeta {
fn from(value: StoredConfirmedBlockTransaction) -> Self {
Self {
transaction: value.transaction,
meta: value.meta.map(|meta| meta.into()),
}
}
}
#[derive(Serialize, Deserialize)]
struct StoredConfirmedBlockTransactionStatusMeta {
err: Option<TransactionError>,
fee: u64,
pre_balances: Vec<u64>,
post_balances: Vec<u64>,
}
impl From<StoredConfirmedBlockTransactionStatusMeta> for TransactionStatusMeta {
fn from(value: StoredConfirmedBlockTransactionStatusMeta) -> Self {
let StoredConfirmedBlockTransactionStatusMeta {
err,
fee,
pre_balances,
post_balances,
} = value;
let status = match &err {
None => Ok(()),
Some(err) => Err(err.clone()),
};
Self {
status,
fee,
pre_balances,
post_balances,
inner_instructions: None,
log_messages: None,
pre_token_balances: None,
post_token_balances: None,
}
}
}
impl From<TransactionStatusMeta> for StoredConfirmedBlockTransactionStatusMeta {
fn from(value: TransactionStatusMeta) -> Self {
let TransactionStatusMeta {
status,
fee,
pre_balances,
post_balances,
..
} = value;
Self {
err: status.err(),
fee,
pre_balances,
post_balances,
}
}
}
type StoredConfirmedBlockRewards = Vec<StoredConfirmedBlockReward>;
#[derive(Serialize, Deserialize)]
struct StoredConfirmedBlockReward {
pubkey: String,
lamports: i64,
}
impl From<StoredConfirmedBlockReward> for Reward {
fn from(value: StoredConfirmedBlockReward) -> Self {
let StoredConfirmedBlockReward { pubkey, lamports } = value;
Self {
pubkey,
lamports,
post_balance: 0,
reward_type: None,
}
}
}
impl From<Reward> for StoredConfirmedBlockReward {
fn from(value: Reward) -> Self {
let Reward {
pubkey, lamports, ..
} = value;
Self { pubkey, lamports }
}
}
#[derive(Serialize, Deserialize)]
struct TransactionInfo {
slot: Slot,
index: u32,
err: Option<TransactionError>,
memo: Option<String>,
}
impl From<TransactionInfo> for TransactionStatus {
fn from(transaction_info: TransactionInfo) -> Self {
let TransactionInfo { slot, err, .. } = transaction_info;
let status = match &err {
None => Ok(()),
Some(err) => Err(err.clone()),
};
Self {
slot,
confirmations: None,
status,
err,
confirmation_status: Some(TransactionConfirmationStatus::Finalized),
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
struct LegacyTransactionByAddrInfo {
pub signature: Signature,
pub err: Option<TransactionError>,
pub index: u32,
pub memo: Option<String>,
}
impl From<LegacyTransactionByAddrInfo> for TransactionByAddrInfo {
fn from(legacy: LegacyTransactionByAddrInfo) -> Self {
let LegacyTransactionByAddrInfo {
signature,
err,
index,
memo,
} = legacy;
Self {
signature,
err,
index,
memo,
block_time: None,
}
}
}
#[derive(Clone)]
pub struct LedgerStorage {
connection: bigtable::BigTableConnection,
}
impl LedgerStorage {
pub async fn new(read_only: bool, timeout: Option<std::time::Duration>) -> Result<Self> {
let connection =
bigtable::BigTableConnection::new("solana-ledger", read_only, timeout).await?;
Ok(Self { connection })
}
pub async fn get_first_available_block(&self) -> Result<Option<Slot>> {
let mut bigtable = self.connection.client();
let blocks = bigtable.get_row_keys("blocks", None, None, 1).await?;
if blocks.is_empty() {
return Ok(None);
}
Ok(key_to_slot(&blocks[0]))
}
pub async fn get_confirmed_blocks(&self, start_slot: Slot, limit: usize) -> Result<Vec<Slot>> {
let mut bigtable = self.connection.client();
let blocks = bigtable
.get_row_keys("blocks", Some(slot_to_key(start_slot)), None, limit as i64)
.await?;
Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
}
pub async fn get_confirmed_block(&self, slot: Slot) -> Result<ConfirmedBlock> {
let mut bigtable = self.connection.client();
let block_cell_data = bigtable
.get_protobuf_or_bincode_cell::<StoredConfirmedBlock, generated::ConfirmedBlock>(
"blocks",
slot_to_key(slot),
)
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::BlockNotFound(slot),
_ => err.into(),
})?;
Ok(match block_cell_data {
bigtable::CellData::Bincode(block) => block.into(),
bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
bigtable::Error::ObjectCorrupt(format!("blocks/{}", slot_to_key(slot)))
})?,
})
}
pub async fn get_confirmed_block_hash(&self, slot: Slot) -> Result<String> {
let mut bigtable = self.connection.client();
let block = bigtable
.get_bincode_cell::<StoredConfirmedBlock>("blocks", slot_to_key(slot))
.await?;
Ok(block.blockhash)
}
pub async fn get_signature_status(&self, signature: &Signature) -> Result<TransactionStatus> {
let mut bigtable = self.connection.client();
let transaction_info = bigtable
.get_bincode_cell::<TransactionInfo>("tx", signature.to_string())
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::SignatureNotFound,
_ => err.into(),
})?;
Ok(transaction_info.into())
}
pub async fn get_confirmed_transaction(
&self,
signature: &Signature,
) -> Result<Option<ConfirmedTransaction>> {
let mut bigtable = self.connection.client();
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", signature.to_string())
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::SignatureNotFound,
_ => err.into(),
})?;
let block = self.get_confirmed_block(slot).await?;
match block.transactions.into_iter().nth(index as usize) {
None => {
warn!("Transaction info for {} is corrupt", signature);
Ok(None)
}
Some(bucket_block_transaction) => {
if bucket_block_transaction.transaction.signatures[0] != *signature {
warn!(
"Transaction info or confirmed block for {} is corrupt",
signature
);
Ok(None)
} else {
Ok(Some(ConfirmedTransaction {
slot,
transaction: bucket_block_transaction,
block_time: block.block_time,
}))
}
}
}
}
pub async fn get_confirmed_signatures_for_address(
&self,
address: &Pubkey,
before_signature: Option<&Signature>,
until_signature: Option<&Signature>,
limit: usize,
) -> Result<
Vec<(
ConfirmedTransactionStatusWithSignature,
u32,
)>,
> {
let mut bigtable = self.connection.client();
let address_prefix = format!("{}/", address);
let (first_slot, before_transaction_index) = match before_signature {
None => (Slot::MAX, 0),
Some(before_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", before_signature.to_string())
.await?;
(slot, index)
}
};
let (last_slot, until_transaction_index) = match until_signature {
None => (0, u32::MAX),
Some(until_signature) => {
let TransactionInfo { slot, index, .. } = bigtable
.get_bincode_cell("tx", until_signature.to_string())
.await?;
(slot, index)
}
};
let mut infos = vec![];
let starting_slot_tx_len = bigtable
.get_protobuf_or_bincode_cell::<Vec<LegacyTransactionByAddrInfo>, tx_by_addr::TransactionByAddr>(
"tx-by-addr",
format!("{}{}", address_prefix, slot_to_key(!first_slot)),
)
.await
.map(|cell_data| {
match cell_data {
bigtable::CellData::Bincode(tx_by_addr) => tx_by_addr.len(),
bigtable::CellData::Protobuf(tx_by_addr) => tx_by_addr.tx_by_addrs.len(),
}
})
.unwrap_or(0);
let tx_by_addr_data = bigtable
.get_row_data(
"tx-by-addr",
Some(format!("{}{}", address_prefix, slot_to_key(!first_slot))),
Some(format!("{}{}", address_prefix, slot_to_key(!last_slot))),
limit as i64 + starting_slot_tx_len as i64,
)
.await?;
'outer: for (row_key, data) in tx_by_addr_data {
let slot = !key_to_slot(&row_key[address_prefix.len()..]).ok_or_else(|| {
bigtable::Error::ObjectCorrupt(format!(
"Failed to convert key to slot: tx-by-addr/{}",
row_key
))
})?;
let deserialized_cell_data = bigtable::deserialize_protobuf_or_bincode_cell_data::<
Vec<LegacyTransactionByAddrInfo>,
tx_by_addr::TransactionByAddr,
>(&data, "tx-by-addr", row_key.clone())?;
let mut cell_data: Vec<TransactionByAddrInfo> = match deserialized_cell_data {
bigtable::CellData::Bincode(tx_by_addr) => {
tx_by_addr.into_iter().map(|legacy| legacy.into()).collect()
}
bigtable::CellData::Protobuf(tx_by_addr) => {
tx_by_addr.try_into().map_err(|error| {
bigtable::Error::ObjectCorrupt(format!(
"Failed to deserialize: {}: tx-by-addr/{}",
error,
row_key.clone()
))
})?
}
};
cell_data.reverse();
for tx_by_addr_info in cell_data.into_iter() {
if slot == first_slot && tx_by_addr_info.index >= before_transaction_index {
continue;
}
if slot == last_slot && tx_by_addr_info.index <= until_transaction_index {
continue;
}
infos.push((
ConfirmedTransactionStatusWithSignature {
signature: tx_by_addr_info.signature,
slot,
err: tx_by_addr_info.err,
memo: tx_by_addr_info.memo,
block_time: tx_by_addr_info.block_time,
},
tx_by_addr_info.index,
));
if infos.len() >= limit {
break 'outer;
}
}
}
Ok(infos)
}
pub async fn upload_confirmed_block(
&self,
slot: Slot,
confirmed_block: ConfirmedBlock,
) -> Result<()> {
let mut bytes_written = 0;
let mut by_addr: HashMap<&Pubkey, Vec<TransactionByAddrInfo>> = HashMap::new();
let mut tx_cells = vec![];
for (index, transaction_with_meta) in confirmed_block.transactions.iter().enumerate() {
let TransactionWithStatusMeta { meta, transaction } = transaction_with_meta;
let err = meta.as_ref().and_then(|meta| meta.status.clone().err());
let index = index as u32;
let signature = transaction.signatures[0];
for address in &transaction.message.account_keys {
if !is_sysvar_id(&address) {
by_addr
.entry(address)
.or_default()
.push(TransactionByAddrInfo {
signature,
err: err.clone(),
index,
memo: None,
block_time: confirmed_block.block_time,
});
}
}
tx_cells.push((
signature.to_string(),
TransactionInfo {
slot,
index,
err,
memo: None,
},
));
}
let tx_by_addr_cells: Vec<_> = by_addr
.into_iter()
.map(|(address, transaction_info_by_addr)| {
(
format!("{}/{}", address, slot_to_key(!slot)),
tx_by_addr::TransactionByAddr {
tx_by_addrs: transaction_info_by_addr
.into_iter()
.map(|by_addr| by_addr.into())
.collect(),
},
)
})
.collect();
if !tx_cells.is_empty() {
bytes_written += self
.connection
.put_bincode_cells_with_retry::<TransactionInfo>("tx", &tx_cells)
.await?;
}
if !tx_by_addr_cells.is_empty() {
bytes_written += self
.connection
.put_protobuf_cells_with_retry::<tx_by_addr::TransactionByAddr>(
"tx-by-addr",
&tx_by_addr_cells,
)
.await?;
}
let num_transactions = confirmed_block.transactions.len();
let blocks_cells = [(slot_to_key(slot), confirmed_block.into())];
bytes_written += self
.connection
.put_protobuf_cells_with_retry::<generated::ConfirmedBlock>("blocks", &blocks_cells)
.await?;
info!(
"uploaded block for slot {}: {} transactions, {} bytes",
slot, num_transactions, bytes_written
);
Ok(())
}
pub async fn get_evm_first_available_block(&self) -> Result<Option<evm_state::BlockNum>> {
let mut bigtable = self.connection.client();
let blocks = bigtable.get_row_keys("evm-blocks", None, None, 1).await?;
if blocks.is_empty() {
return Ok(None);
}
Ok(key_to_slot(&blocks[0]))
}
pub async fn get_evm_confirmed_blocks(
&self,
start_block: evm_state::BlockNum,
limit: usize,
) -> Result<Vec<evm_state::BlockNum>> {
let mut bigtable = self.connection.client();
let blocks = bigtable
.get_row_keys(
"evm-blocks",
Some(slot_to_key(start_block)),
None,
limit as i64,
)
.await?;
Ok(blocks.into_iter().filter_map(|s| key_to_slot(&s)).collect())
}
pub async fn get_evm_block_by_hash(
&self,
block_hash: evm_state::H256,
) -> Result<evm_state::BlockNum> {
let mut bigtable = self.connection.client();
let block = bigtable
.get_bincode_cell::<u64>("evm-blocks-by-hash", evm_rpc::Hex(block_hash).to_string())
.await?;
Ok(block)
}
pub async fn get_evm_confirmed_block_header(
&self,
block_num: evm_state::BlockNum,
) -> Result<evm_state::BlockHeader> {
let mut bigtable = self.connection.client();
let block_cell_data = bigtable
.get_protobuf_or_bincode_cell::<evm_state::BlockHeader, generated_evm::EvmBlockHeader>(
"evm-block",
slot_to_key(block_num),
)
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::BlockNotFound(block_num),
_ => err.into(),
})?;
Ok(match block_cell_data {
bigtable::CellData::Bincode(block) => block,
bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
bigtable::Error::ObjectCorrupt(format!("evm-blocks/{}", slot_to_key(block_num)))
})?,
})
}
pub async fn get_evm_confirmed_full_block(
&self,
block_num: evm_state::BlockNum,
) -> Result<evm_state::Block> {
let mut bigtable = self.connection.client();
let block_cell_data = bigtable
.get_protobuf_or_bincode_cell::<evm_state::Block, generated_evm::EvmFullBlock>(
"evm-full-blocks",
slot_to_key(block_num),
)
.await
.map_err(|err| match err {
bigtable::Error::RowNotFound => Error::BlockNotFound(block_num),
_ => err.into(),
})?;
Ok(match block_cell_data {
bigtable::CellData::Bincode(block) => block,
bigtable::CellData::Protobuf(block) => block.try_into().map_err(|_err| {
bigtable::Error::ObjectCorrupt(format!(
"evm-full-blocks/{}",
slot_to_key(block_num)
))
})?,
})
}
pub async fn get_evm_confirmed_receipt(
&self,
hash: &evm_state::H256,
) -> Result<Option<evm_state::TransactionReceipt>> {
let mut bigtable = self.connection.client();
let tx_cell = bigtable
.get_protobuf_or_bincode_cell::<evm_state::TransactionReceipt, generated_evm::TransactionReceipt>(
"evm-tx",
evm_rpc::Hex(*hash).to_string(),
)
.await
.map(Some)
.or_else(|err| match err {
bigtable::Error::RowNotFound => Ok(None),
_ => Err(err),
})?;
let tx_cell = if let Some(tx_cell) = tx_cell {
tx_cell
} else {
return Ok(None);
};
Ok(Some(match tx_cell {
bigtable::CellData::Bincode(tx) => tx,
bigtable::CellData::Protobuf(tx) => tx.try_into().map_err(|_err| {
bigtable::Error::ObjectCorrupt(format!(
"evm-tx/{}",
evm_rpc::Hex(*hash).to_string()
))
})?,
}))
}
pub async fn upload_evm_block(
&self,
block_num: evm_state::BlockNum,
full_block: evm_state::Block,
) -> Result<()> {
let mut bytes_written = 0;
let mut tx_cells = vec![];
for (hash, tx) in full_block.transactions.iter() {
tx_cells.push((evm_rpc::Hex(*hash).to_string(), tx.clone().into()));
}
if !tx_cells.is_empty() {
bytes_written += self
.connection
.put_protobuf_cells_with_retry::<generated_evm::TransactionReceipt>(
"evm-tx", &tx_cells,
)
.await?;
}
let num_transactions = full_block.transactions.len();
let block_by_hash_cells = [(
evm_rpc::Hex(full_block.header.hash()).to_string(),
full_block.header.block_number,
)];
bytes_written += self
.connection
.put_bincode_cells_with_retry("evm-blocks-by-hash", &block_by_hash_cells)
.await?;
let block_header_cells = [(slot_to_key(block_num), full_block.header.clone().into())];
bytes_written += self
.connection
.put_protobuf_cells_with_retry::<generated_evm::EvmBlockHeader>(
"evm-blocks",
&block_header_cells,
)
.await?;
let blocks_cells = [(slot_to_key(block_num), full_block.into())];
bytes_written += self
.connection
.put_protobuf_cells_with_retry::<generated_evm::EvmFullBlock>(
"evm-full-blocks",
&blocks_cells,
)
.await?;
info!(
"uploaded block num {}: {} transactions, {} bytes",
block_num, num_transactions, bytes_written
);
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_slot_to_key() {
assert_eq!(slot_to_key(0), "0000000000000000");
assert_eq!(slot_to_key(!0), "ffffffffffffffff");
}
}