use crate::{
crds::{Crds, VersionedCrdsValue},
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{CrdsFilter, CrdsGossipPull, ProcessPullStats},
crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE},
crds_value::{CrdsData, CrdsValue, CrdsValueLabel},
duplicate_shred::{self, DuplicateShredIndex, LeaderScheduleFn, MAX_DUPLICATE_SHREDS},
};
use rayon::ThreadPool;
use solana_ledger::shred::Shred;
use solana_sdk::{
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
};
use std::collections::{HashMap, HashSet};
pub const CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS: usize = 500;
pub struct CrdsGossip {
pub crds: Crds,
pub id: Pubkey,
pub shred_version: u16,
pub push: CrdsGossipPush,
pub pull: CrdsGossipPull,
}
impl Default for CrdsGossip {
fn default() -> Self {
CrdsGossip {
crds: Crds::default(),
id: Pubkey::default(),
shred_version: 0,
push: CrdsGossipPush::default(),
pull: CrdsGossipPull::default(),
}
}
}
impl CrdsGossip {
pub fn set_self(&mut self, id: &Pubkey) {
self.id = *id;
}
pub fn set_shred_version(&mut self, shred_version: u16) {
self.shred_version = shred_version;
}
pub fn process_push_message(
&mut self,
from: &Pubkey,
values: Vec<CrdsValue>,
now: u64,
) -> Vec<VersionedCrdsValue> {
values
.into_iter()
.filter_map(|val| {
let res = self
.push
.process_push_message(&mut self.crds, from, val, now);
if let Ok(Some(val)) = res {
self.pull
.record_old_hash(val.value_hash, val.local_timestamp);
Some(val)
} else {
None
}
})
.collect()
}
pub fn prune_received_cache(
&mut self,
labels: Vec<CrdsValueLabel>,
stakes: &HashMap<Pubkey, u64>,
) -> HashMap<Pubkey, HashSet<Pubkey>> {
let id = &self.id;
let push = &mut self.push;
let mut prune_map: HashMap<Pubkey, HashSet<_>> = HashMap::new();
for origin in labels.iter().map(|k| k.pubkey()) {
let peers = push.prune_received_cache(id, &origin, stakes);
for from in peers {
prune_map.entry(from).or_default().insert(origin);
}
}
prune_map
}
pub fn process_push_messages(&mut self, pending_push_messages: Vec<(CrdsValue, u64)>) {
for (push_message, timestamp) in pending_push_messages {
let _ =
self.push
.process_push_message(&mut self.crds, &self.id, push_message, timestamp);
}
}
pub fn new_push_messages(
&mut self,
pending_push_messages: Vec<(CrdsValue, u64)>,
now: u64,
) -> (Pubkey, HashMap<Pubkey, Vec<CrdsValue>>) {
self.process_push_messages(pending_push_messages);
let push_messages = self.push.new_push_messages(&self.crds, now);
(self.id, push_messages)
}
pub(crate) fn push_duplicate_shred(
&mut self,
keypair: &Keypair,
shred: &Shred,
other_payload: &[u8],
leader_schedule: Option<impl LeaderScheduleFn>,
max_payload_size: usize,
) -> Result<(), duplicate_shred::Error> {
let pubkey = keypair.pubkey();
let shred_slot = shred.slot();
if self
.crds
.get_records(&pubkey)
.any(|value| match &value.value.data {
CrdsData::DuplicateShred(_, value) => value.slot == shred_slot,
_ => false,
})
{
return Ok(());
}
let chunks = duplicate_shred::from_shred(
shred.clone(),
pubkey,
Vec::from(other_payload),
leader_schedule,
timestamp(),
max_payload_size,
)?;
let mut num_dup_shreds = 0;
let offset = self
.crds
.get_records(&pubkey)
.filter_map(|value| match &value.value.data {
CrdsData::DuplicateShred(ix, value) => {
num_dup_shreds += 1;
Some((value.wallclock, *ix))
}
_ => None,
})
.min()
.map(|(_ , ix)| ix)
.unwrap_or(0);
let offset = if num_dup_shreds < MAX_DUPLICATE_SHREDS {
num_dup_shreds
} else {
offset
};
let entries = chunks
.enumerate()
.map(|(k, chunk)| {
let index = (offset + k as DuplicateShredIndex) % MAX_DUPLICATE_SHREDS;
let data = CrdsData::DuplicateShred(index, chunk);
CrdsValue::new_signed(data, keypair)
})
.collect();
self.process_push_message(&pubkey, entries, timestamp());
Ok(())
}
pub fn process_prune_msg(
&self,
peer: &Pubkey,
destination: &Pubkey,
origin: &[Pubkey],
wallclock: u64,
now: u64,
) -> Result<(), CrdsGossipError> {
let expired = now > wallclock + self.push.prune_timeout;
if expired {
return Err(CrdsGossipError::PruneMessageTimeout);
}
if self.id == *destination {
self.push.process_prune_msg(&self.id, peer, origin);
Ok(())
} else {
Err(CrdsGossipError::BadPruneDestination)
}
}
pub fn refresh_push_active_set(
&mut self,
stakes: &HashMap<Pubkey, u64>,
gossip_validators: Option<&HashSet<Pubkey>>,
) {
self.push.refresh_push_active_set(
&self.crds,
stakes,
gossip_validators,
&self.id,
self.shred_version,
self.pull.pull_request_time.len(),
CRDS_GOSSIP_NUM_ACTIVE,
)
}
pub fn new_pull_request(
&self,
thread_pool: &ThreadPool,
now: u64,
gossip_validators: Option<&HashSet<Pubkey>>,
stakes: &HashMap<Pubkey, u64>,
bloom_size: usize,
) -> Result<(Pubkey, Vec<CrdsFilter>, CrdsValue), CrdsGossipError> {
self.pull.new_pull_request(
thread_pool,
&self.crds,
&self.id,
self.shred_version,
now,
gossip_validators,
stakes,
bloom_size,
)
}
pub fn mark_pull_request_creation_time(&mut self, from: &Pubkey, now: u64) {
self.pull.mark_pull_request_creation_time(from, now)
}
pub fn process_pull_requests<I>(&mut self, callers: I, now: u64)
where
I: IntoIterator<Item = CrdsValue>,
{
self.pull
.process_pull_requests(&mut self.crds, callers, now);
}
pub fn generate_pull_responses(
&self,
filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize,
now: u64,
) -> Vec<Vec<CrdsValue>> {
self.pull
.generate_pull_responses(&self.crds, filters, output_size_limit, now)
}
pub fn filter_pull_responses(
&self,
timeouts: &HashMap<Pubkey, u64>,
response: Vec<CrdsValue>,
now: u64,
process_pull_stats: &mut ProcessPullStats,
) -> (Vec<VersionedCrdsValue>, Vec<VersionedCrdsValue>, Vec<Hash>) {
self.pull
.filter_pull_responses(&self.crds, timeouts, response, now, process_pull_stats)
}
pub fn process_pull_responses(
&mut self,
from: &Pubkey,
responses: Vec<VersionedCrdsValue>,
responses_expired_timeout: Vec<VersionedCrdsValue>,
failed_inserts: Vec<Hash>,
now: u64,
process_pull_stats: &mut ProcessPullStats,
) {
let success = self.pull.process_pull_responses(
&mut self.crds,
from,
responses,
responses_expired_timeout,
failed_inserts,
now,
process_pull_stats,
);
self.push.push_pull_responses(success, now);
}
pub fn make_timeouts_test(&self) -> HashMap<Pubkey, u64> {
self.make_timeouts(&HashMap::new(), self.pull.crds_timeout)
}
pub fn make_timeouts(
&self,
stakes: &HashMap<Pubkey, u64>,
epoch_ms: u64,
) -> HashMap<Pubkey, u64> {
self.pull.make_timeouts(&self.id, stakes, epoch_ms)
}
pub fn purge(
&mut self,
thread_pool: &ThreadPool,
now: u64,
timeouts: &HashMap<Pubkey, u64>,
) -> usize {
let mut rv = 0;
if now > self.push.msg_timeout {
let min = now - self.push.msg_timeout;
self.push.purge_old_pending_push_messages(&self.crds, min);
}
if now > 5 * self.push.msg_timeout {
let min = now - 5 * self.push.msg_timeout;
self.push.purge_old_received_cache(min);
}
if now > self.pull.crds_timeout {
let min = self.pull.crds_timeout;
assert_eq!(timeouts[&self.id], std::u64::MAX);
assert_eq!(timeouts[&Pubkey::default()], min);
rv = self
.pull
.purge_active(thread_pool, &mut self.crds, now, &timeouts);
}
if now > 5 * self.pull.crds_timeout {
let min = now - 5 * self.pull.crds_timeout;
self.pull.purge_purged(min);
}
self.pull.purge_failed_inserts(now);
rv
}
pub(crate) fn mock_clone(&self) -> Self {
Self {
crds: self.crds.clone(),
push: self.push.mock_clone(),
pull: self.pull.clone(),
..*self
}
}
}
pub fn get_stake<S: std::hash::BuildHasher>(id: &Pubkey, stakes: &HashMap<Pubkey, u64, S>) -> f32 {
let bal = f64::from(u32::max_value()).min(*stakes.get(id).unwrap_or(&0) as f64);
1_f32.max((bal as f32).ln())
}
pub fn get_weight(max_weight: f32, time_since_last_selected: u32, stake: f32) -> f32 {
let mut weight = time_since_last_selected as f32 * stake;
if weight.is_infinite() {
weight = max_weight;
}
1.0_f32.max(weight.min(max_weight))
}
#[cfg(test)]
mod test {
use super::*;
use crate::contact_info::ContactInfo;
use crate::crds_value::CrdsData;
use solana_sdk::hash::hash;
use solana_sdk::timing::timestamp;
#[test]
fn test_prune_errors() {
let mut crds_gossip = CrdsGossip {
id: Pubkey::new(&[0; 32]),
..CrdsGossip::default()
};
let id = crds_gossip.id;
let ci = ContactInfo::new_localhost(&Pubkey::new(&[1; 32]), 0);
let prune_pubkey = Pubkey::new(&[2; 32]);
crds_gossip
.crds
.insert(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ci.clone())),
0,
)
.unwrap();
crds_gossip.refresh_push_active_set(&HashMap::new(), None);
let now = timestamp();
let mut res = crds_gossip.process_prune_msg(
&ci.id,
&Pubkey::new(hash(&[1; 32]).as_ref()),
&[prune_pubkey],
now,
now,
);
assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination));
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, now);
res.unwrap();
let timeout = now + crds_gossip.push.prune_timeout * 2;
res = crds_gossip.process_prune_msg(&ci.id, &id, &[prune_pubkey], now, timeout);
assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout));
}
}