1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use crate::{
rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl},
rpc_subscriptions::RpcSubscriptions,
};
use jsonrpc_pubsub::{PubSubHandler, Session};
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
};
#[derive(Debug, Clone)]
pub struct PubSubConfig {
pub enable_vote_subscription: bool,
pub max_connections: usize,
pub max_fragment_size: usize,
pub max_in_buffer_capacity: usize,
pub max_out_buffer_capacity: usize,
}
impl Default for PubSubConfig {
fn default() -> Self {
Self {
enable_vote_subscription: false,
max_connections: 1000,
max_fragment_size: 50 * 1024,
max_in_buffer_capacity: 50 * 1024,
max_out_buffer_capacity: 15 * 1024 * 1024,
}
}
}
pub struct PubSubService {
thread_hdl: JoinHandle<()>,
}
impl PubSubService {
pub fn new(
pubsub_config: PubSubConfig,
subscriptions: &Arc<RpcSubscriptions>,
pubsub_addr: SocketAddr,
exit: &Arc<AtomicBool>,
) -> Self {
info!("rpc_pubsub bound to {:?}", pubsub_addr);
let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
let exit_ = exit.clone();
let max_payload = *[
pubsub_config.max_fragment_size,
pubsub_config.max_in_buffer_capacity,
pubsub_config.max_out_buffer_capacity,
]
.iter()
.max()
.unwrap();
info!("rpc_pubsub max_payload: {}", max_payload);
let thread_hdl = Builder::new()
.name("solana-pubsub".to_string())
.spawn(move || {
let mut io = PubSubHandler::default();
io.extend_with(rpc.to_delegate());
let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
info!("New pubsub connection");
let session = Arc::new(Session::new(context.sender()));
session.on_drop(|| {
info!("Pubsub connection dropped");
});
session
})
.max_connections(pubsub_config.max_connections)
.max_payload(max_payload)
.start(&pubsub_addr);
if let Err(e) = server {
warn!(
"Pubsub service unavailable error: {:?}. \n\
Also, check that port {} is not already in use by another application",
e,
pubsub_addr.port()
);
return;
}
while !exit_.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
server.unwrap().close();
})
.unwrap();
Self { thread_hdl }
}
pub fn close(self) -> thread::Result<()> {
self.join()
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank;
use solana_runtime::{
bank::Bank,
bank_forks::BankForks,
commitment::BlockCommitmentCache,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use std::{
net::{IpAddr, Ipv4Addr},
sync::RwLock,
};
#[test]
fn test_pubsub_new() {
let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks,
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
optimistically_confirmed_bank,
));
let pubsub_service =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
let thread = pubsub_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-pubsub");
}
}