1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//! The `pubsub` module implements a threaded subscription service on client RPC request

use crate::{
    rpc_pubsub::{RpcSolPubSub, RpcSolPubSubImpl},
    rpc_subscriptions::RpcSubscriptions,
};
use jsonrpc_pubsub::{PubSubHandler, Session};
use jsonrpc_ws_server::{RequestContext, ServerBuilder};
use std::{
    net::SocketAddr,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread::{self, sleep, Builder, JoinHandle},
    time::Duration,
};

#[derive(Debug, Clone)]
pub struct PubSubConfig {
    pub enable_vote_subscription: bool,

    // See the corresponding fields in
    // https://github.com/paritytech/ws-rs/blob/be4d47575bae55c60d9f51b47480d355492a94fc/src/lib.rs#L131
    // for a complete description of each field in this struct
    pub max_connections: usize,
    pub max_fragment_size: usize,
    pub max_in_buffer_capacity: usize,
    pub max_out_buffer_capacity: usize,
}

impl Default for PubSubConfig {
    fn default() -> Self {
        Self {
            enable_vote_subscription: false,
            max_connections: 1000, // Arbitrary, default of 100 is too low
            max_fragment_size: 50 * 1024, // 50KB
            max_in_buffer_capacity: 50 * 1024, // 50KB
            max_out_buffer_capacity: 15 * 1024 * 1024, // max account size (10MB), then 5MB extra for base64 encoding overhead/etc
        }
    }
}

pub struct PubSubService {
    thread_hdl: JoinHandle<()>,
}

impl PubSubService {
    pub fn new(
        pubsub_config: PubSubConfig,
        subscriptions: &Arc<RpcSubscriptions>,
        pubsub_addr: SocketAddr,
        exit: &Arc<AtomicBool>,
    ) -> Self {
        info!("rpc_pubsub bound to {:?}", pubsub_addr);
        let rpc = RpcSolPubSubImpl::new(subscriptions.clone());
        let exit_ = exit.clone();

        // TODO: Once https://github.com/paritytech/jsonrpc/pull/594 lands, use
        // `ServerBuilder::max_in_buffer_capacity()` and `Server::max_out_buffer_capacity() methods
        // instead of only `ServerBuilder::max_payload`
        let max_payload = *[
            pubsub_config.max_fragment_size,
            pubsub_config.max_in_buffer_capacity,
            pubsub_config.max_out_buffer_capacity,
        ]
        .iter()
        .max()
        .unwrap();
        info!("rpc_pubsub max_payload: {}", max_payload);

        let thread_hdl = Builder::new()
            .name("solana-pubsub".to_string())
            .spawn(move || {
                let mut io = PubSubHandler::default();
                io.extend_with(rpc.to_delegate());

                let server = ServerBuilder::with_meta_extractor(io, |context: &RequestContext| {
                    info!("New pubsub connection");
                    let session = Arc::new(Session::new(context.sender()));
                    session.on_drop(|| {
                        info!("Pubsub connection dropped");
                    });
                    session
                })
                .max_connections(pubsub_config.max_connections)
                .max_payload(max_payload)
                .start(&pubsub_addr);

                if let Err(e) = server {
                    warn!(
                        "Pubsub service unavailable error: {:?}. \n\
                           Also, check that port {} is not already in use by another application",
                        e,
                        pubsub_addr.port()
                    );
                    return;
                }
                while !exit_.load(Ordering::Relaxed) {
                    sleep(Duration::from_millis(100));
                }
                server.unwrap().close();
            })
            .unwrap();
        Self { thread_hdl }
    }

    pub fn close(self) -> thread::Result<()> {
        self.join()
    }

    pub fn join(self) -> thread::Result<()> {
        self.thread_hdl.join()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank;
    use solana_runtime::{
        bank::Bank,
        bank_forks::BankForks,
        commitment::BlockCommitmentCache,
        genesis_utils::{create_genesis_config, GenesisConfigInfo},
    };
    use std::{
        net::{IpAddr, Ipv4Addr},
        sync::RwLock,
    };

    #[test]
    fn test_pubsub_new() {
        let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0);
        let exit = Arc::new(AtomicBool::new(false));
        let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
        let bank = Bank::new(&genesis_config);
        let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
        let optimistically_confirmed_bank =
            OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
        let subscriptions = Arc::new(RpcSubscriptions::new(
            &exit,
            bank_forks,
            Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())),
            optimistically_confirmed_bank,
        ));
        let pubsub_service =
            PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr, &exit);
        let thread = pubsub_service.thread_hdl.thread();
        assert_eq!(thread.name().unwrap(), "solana-pubsub");
    }
}