1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use solana_ledger::blockstore::Blockstore;
use solana_runtime::commitment::BlockCommitmentCache;
use std::{
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;
const LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY: usize = 100;
pub struct BigTableUploadService {
thread: JoinHandle<()>,
}
impl BigTableUploadService {
pub fn new(
runtime_handle: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) -> Self {
info!("Starting BigTable upload service");
let thread = Builder::new()
.name("bigtable-upload".to_string())
.spawn(move || {
Self::run(
runtime_handle,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
exit,
)
})
.unwrap();
Self { thread }
}
fn run(
runtime: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) {
let mut start_slot = 0;
let mut start_evm_block = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let end_slot = block_commitment_cache
.read()
.unwrap()
.highest_confirmed_root()
.saturating_sub(LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY as u64);
if end_slot <= start_slot {
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}
let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks(
blockstore.clone(),
bigtable_ledger_storage.clone(),
start_slot,
Some(end_slot),
true,
false,
exit.clone(),
));
match result {
Ok(()) => start_slot = end_slot,
Err(err) => {
warn!("bigtable: upload_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2));
}
}
let end_block = blockstore
.get_last_available_evm_block()
.unwrap_or_default()
.unwrap_or_default();
if end_block <= start_evm_block {
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}
let result =
runtime.block_on(solana_ledger::bigtable_upload::upload_evm_confirmed_blocks(
blockstore.clone(),
bigtable_ledger_storage.clone(),
start_evm_block,
Some(end_block),
false,
false,
exit.clone(),
));
match result {
Ok(not_confirmed_blocks) => {
start_evm_block = end_block - not_confirmed_blocks;
}
Err(err) => {
warn!("bigtable: upload_evm_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2));
}
}
}
}
pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}