use std::fmt;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Weak};
use std::sync::{Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use crate::style::ProgressStyle;
use crate::utils::{duration_to_secs, secs_to_duration, Estimate};
use console::Term;
#[derive(Clone, Debug)]
struct ProgressDrawState {
pub lines: Vec<String>,
pub orphan_lines: usize,
pub finished: bool,
pub force_draw: bool,
pub move_cursor: bool,
pub ts: Instant,
}
#[derive(Debug)]
enum Status {
InProgress,
DoneVisible,
DoneHidden,
}
enum ProgressDrawTargetKind {
Term(Term, Option<ProgressDrawState>, Option<Duration>),
Remote(usize, Mutex<Sender<(usize, ProgressDrawState)>>),
Hidden,
}
pub struct ProgressDrawTarget {
kind: ProgressDrawTargetKind,
}
impl ProgressDrawTarget {
pub fn stdout() -> ProgressDrawTarget {
ProgressDrawTarget::to_term(Term::buffered_stdout(), 15)
}
pub fn stderr() -> ProgressDrawTarget {
ProgressDrawTarget::to_term(Term::buffered_stderr(), 15)
}
pub fn stdout_with_hz(refresh_rate: u64) -> ProgressDrawTarget {
ProgressDrawTarget::to_term(Term::buffered_stdout(), refresh_rate)
}
pub fn stderr_with_hz(refresh_rate: u64) -> ProgressDrawTarget {
ProgressDrawTarget::to_term(Term::buffered_stderr(), refresh_rate)
}
pub fn stdout_nohz() -> ProgressDrawTarget {
ProgressDrawTarget::to_term(Term::buffered_stdout(), None)
}
pub fn stderr_nohz() -> ProgressDrawTarget {
ProgressDrawTarget::to_term(Term::buffered_stderr(), None)
}
pub fn to_term(term: Term, refresh_rate: impl Into<Option<u64>>) -> ProgressDrawTarget {
let rate = refresh_rate.into().map(|x| Duration::from_millis(1000 / x));
ProgressDrawTarget {
kind: ProgressDrawTargetKind::Term(term, None, rate),
}
}
pub fn hidden() -> ProgressDrawTarget {
ProgressDrawTarget {
kind: ProgressDrawTargetKind::Hidden,
}
}
pub fn is_hidden(&self) -> bool {
match self.kind {
ProgressDrawTargetKind::Hidden => true,
ProgressDrawTargetKind::Term(ref term, ..) => !term.is_term(),
_ => false,
}
}
fn apply_draw_state(&mut self, draw_state: ProgressDrawState) -> io::Result<()> {
if self.is_hidden() {
return Ok(());
}
match self.kind {
ProgressDrawTargetKind::Term(ref term, ref mut last_state, rate) => {
let last_draw = last_state.as_ref().map(|x| x.ts);
if draw_state.finished
|| draw_state.force_draw
|| rate.is_none()
|| last_draw.is_none()
|| last_draw.unwrap().elapsed() > rate.unwrap()
{
if let Some(ref last_state) = *last_state {
if !draw_state.lines.is_empty() && draw_state.move_cursor {
last_state.move_cursor(term)?;
} else {
last_state.clear_term(term)?;
}
}
draw_state.draw_to_term(term)?;
term.flush()?;
*last_state = Some(draw_state);
}
}
ProgressDrawTargetKind::Remote(idx, ref chan) => {
return chan
.lock()
.unwrap()
.send((idx, draw_state))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
}
ProgressDrawTargetKind::Hidden => {}
}
Ok(())
}
fn disconnect(&self) {
match self.kind {
ProgressDrawTargetKind::Term(_, _, _) => {}
ProgressDrawTargetKind::Remote(idx, ref chan) => {
chan.lock()
.unwrap()
.send((
idx,
ProgressDrawState {
lines: vec![],
orphan_lines: 0,
finished: true,
force_draw: false,
move_cursor: false,
ts: Instant::now(),
},
))
.ok();
}
ProgressDrawTargetKind::Hidden => {}
};
}
}
impl ProgressDrawState {
pub fn clear_term(&self, term: &Term) -> io::Result<()> {
term.clear_last_lines(self.lines.len() - self.orphan_lines)
}
pub fn move_cursor(&self, term: &Term) -> io::Result<()> {
term.move_cursor_up(self.lines.len() - self.orphan_lines)
}
pub fn draw_to_term(&self, term: &Term) -> io::Result<()> {
for line in &self.lines {
term.write_line(line)?;
}
Ok(())
}
}
pub(crate) struct ProgressState {
pub(crate) style: ProgressStyle,
pub(crate) pos: u64,
pub(crate) len: u64,
pub(crate) tick: u64,
pub(crate) started: Instant,
draw_target: ProgressDrawTarget,
width: Option<u16>,
message: String,
prefix: String,
draw_delta: u64,
draw_next: u64,
status: Status,
est: Estimate,
tick_thread: Option<thread::JoinHandle<()>>,
steady_tick: u64,
}
impl ProgressState {
pub fn current_tick_str(&self) -> &str {
if self.is_finished() {
self.style.get_final_tick_str()
} else {
self.style.get_tick_str(self.tick)
}
}
pub fn is_finished(&self) -> bool {
match self.status {
Status::InProgress => false,
Status::DoneVisible => true,
Status::DoneHidden => true,
}
}
pub fn should_render(&self) -> bool {
match self.status {
Status::DoneHidden => false,
_ => true,
}
}
pub fn fraction(&self) -> f32 {
let pct = match (self.pos, self.len) {
(_, 0) => 1.0,
(0, _) => 0.0,
(pos, len) => pos as f32 / len as f32,
};
pct.max(0.0).min(1.0)
}
pub fn position(&self) -> (u64, u64) {
(self.pos, self.len)
}
pub fn message(&self) -> &str {
&self.message
}
pub fn prefix(&self) -> &str {
&self.prefix
}
pub fn width(&self) -> usize {
if let Some(width) = self.width {
width as usize
} else {
Term::stderr().size().1 as usize
}
}
pub fn avg_time_per_step(&self) -> Duration {
self.est.time_per_step()
}
pub fn eta(&self) -> Duration {
if self.len == !0 || self.is_finished() {
return Duration::new(0, 0);
}
let t = duration_to_secs(self.avg_time_per_step());
secs_to_duration(t * self.len.saturating_sub(self.pos) as f64 + 0.75)
}
pub fn per_sec(&self) -> u64 {
let avg_time = self.avg_time_per_step().as_nanos();
if avg_time == 0 {
0
} else {
(1_000_000_000 / avg_time) as u64
}
}
}
#[derive(Clone)]
pub struct ProgressBar {
state: Arc<RwLock<ProgressState>>,
}
impl fmt::Debug for ProgressBar {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProgressBar").finish()
}
}
impl ProgressBar {
pub fn new(len: u64) -> ProgressBar {
ProgressBar::with_draw_target(len, ProgressDrawTarget::stderr())
}
pub fn hidden() -> ProgressBar {
ProgressBar::with_draw_target(!0, ProgressDrawTarget::hidden())
}
pub fn with_draw_target(len: u64, target: ProgressDrawTarget) -> ProgressBar {
ProgressBar {
state: Arc::new(RwLock::new(ProgressState {
style: ProgressStyle::default_bar(),
draw_target: target,
width: None,
message: "".into(),
prefix: "".into(),
pos: 0,
len,
tick: 0,
draw_delta: 0,
draw_next: 0,
status: Status::InProgress,
started: Instant::now(),
est: Estimate::new(),
tick_thread: None,
steady_tick: 0,
})),
}
}
pub fn with_style(self, style: ProgressStyle) -> ProgressBar {
self.state.write().unwrap().style = style;
self
}
pub fn new_spinner() -> ProgressBar {
let rv = ProgressBar::new(!0);
rv.set_style(ProgressStyle::default_spinner());
rv
}
pub fn set_style(&self, style: ProgressStyle) {
self.state.write().unwrap().style = style;
}
pub fn enable_steady_tick(&self, ms: u64) {
let mut state = self.state.write().unwrap();
state.steady_tick = ms;
if state.tick_thread.is_some() {
return;
}
let state_arc = Arc::downgrade(&self.state);
state.tick_thread = Some(thread::spawn(move || loop {
thread::sleep(Duration::from_millis(ms));
if let Some(state_arc) = state_arc.upgrade() {
let mut state = state_arc.write().unwrap();
if state.is_finished() || state.steady_tick == 0 {
state.steady_tick = 0;
state.tick_thread = None;
break;
}
if state.tick != 0 {
state.tick = state.tick.saturating_add(1);
}
draw_state(&mut state).ok();
} else {
break;
}
}));
::std::mem::drop(state);
self.tick();
}
pub fn disable_steady_tick(&self) {
self.enable_steady_tick(0);
}
pub fn set_draw_delta(&self, n: u64) {
let mut state = self.state.write().unwrap();
state.draw_delta = n;
state.draw_next = state.pos.saturating_add(state.draw_delta);
}
pub fn tick(&self) {
self.update_and_draw(|state| {
if state.steady_tick == 0 || state.tick == 0 {
state.tick = state.tick.saturating_add(1);
}
});
}
pub fn inc(&self, delta: u64) {
self.update_and_draw(|state| {
state.pos = state.pos.saturating_add(delta);
if state.steady_tick == 0 || state.tick == 0 {
state.tick = state.tick.saturating_add(1);
}
})
}
pub fn is_hidden(&self) -> bool {
self.state.read().unwrap().draw_target.is_hidden()
}
pub fn is_finished(&self) -> bool {
self.state.read().unwrap().is_finished()
}
pub fn println<I: Into<String>>(&self, msg: I) {
let mut state = self.state.write().unwrap();
let mut lines: Vec<String> = msg.into().lines().map(Into::into).collect();
let orphan_lines = lines.len();
if state.should_render() {
lines.extend(state.style.format_state(&*state));
}
let draw_state = ProgressDrawState {
lines,
orphan_lines,
finished: state.is_finished(),
force_draw: true,
move_cursor: false,
ts: Instant::now(),
};
state.draw_target.apply_draw_state(draw_state).ok();
}
pub fn set_position(&self, pos: u64) {
self.update_and_draw(|state| {
state.draw_next = pos;
state.pos = pos;
if state.steady_tick == 0 || state.tick == 0 {
state.tick = state.tick.saturating_add(1);
}
})
}
pub fn set_length(&self, len: u64) {
self.update_and_draw(|state| {
state.len = len;
})
}
pub fn inc_length(&self, delta: u64) {
self.update_and_draw(|state| {
state.len = state.len.saturating_add(delta);
})
}
pub fn set_prefix(&self, prefix: &str) {
let prefix = prefix.to_string();
self.update_and_draw(|state| {
state.prefix = prefix;
if state.steady_tick == 0 || state.tick == 0 {
state.tick = state.tick.saturating_add(1);
}
})
}
pub fn set_message(&self, msg: &str) {
let msg = msg.to_string();
self.update_and_draw(|state| {
state.message = msg;
if state.steady_tick == 0 || state.tick == 0 {
state.tick = state.tick.saturating_add(1);
}
})
}
pub fn downgrade(&self) -> WeakProgressBar {
WeakProgressBar {
state: Arc::downgrade(&self.state),
}
}
pub fn reset_eta(&self) {
self.update_and_draw(|state| {
state.est.reset();
});
}
pub fn reset_elapsed(&self) {
self.update_and_draw(|state| {
state.started = Instant::now();
});
}
pub fn reset(&self) {
self.reset_eta();
self.reset_elapsed();
self.update_and_draw(|state| {
state.draw_next = 0;
state.pos = 0;
state.status = Status::InProgress;
});
}
pub fn finish(&self) {
self.update_and_draw(|state| {
state.pos = state.len;
state.draw_next = state.pos;
state.status = Status::DoneVisible;
});
}
pub fn finish_at_current_pos(&self) {
self.update_and_draw(|state| {
state.draw_next = state.pos;
state.status = Status::DoneVisible;
});
}
pub fn finish_with_message(&self, msg: &str) {
let msg = msg.to_string();
self.update_and_draw(|state| {
state.message = msg;
state.pos = state.len;
state.draw_next = state.pos;
state.status = Status::DoneVisible;
});
}
pub fn finish_and_clear(&self) {
self.update_and_draw(|state| {
state.pos = state.len;
state.draw_next = state.pos;
state.status = Status::DoneHidden;
});
}
pub fn abandon(&self) {
self.update_and_draw(|state| {
state.status = Status::DoneVisible;
});
}
pub fn abandon_with_message(&self, msg: &str) {
let msg = msg.to_string();
self.update_and_draw(|state| {
state.message = msg;
state.status = Status::DoneVisible;
});
}
pub fn set_draw_target(&self, target: ProgressDrawTarget) {
let mut state = self.state.write().unwrap();
state.draw_target.disconnect();
state.draw_target = target;
}
pub fn wrap_iter<It: Iterator>(&self, it: It) -> ProgressBarIter<It> {
ProgressBarIter {
bar: self.clone(),
it,
}
}
pub fn wrap_read<R: io::Read>(&self, read: R) -> ProgressBarWrap<R> {
ProgressBarWrap {
bar: self.clone(),
wrap: read,
}
}
pub fn wrap_write<W: io::Write>(&self, write: W) -> ProgressBarWrap<W> {
ProgressBarWrap {
bar: self.clone(),
wrap: write,
}
}
fn update_and_draw<F: FnOnce(&mut ProgressState)>(&self, f: F) {
let mut draw = false;
{
let mut state = self.state.write().unwrap();
let old_pos = state.pos;
f(&mut state);
let new_pos = state.pos;
if new_pos != old_pos {
state.est.record_step(new_pos);
}
if new_pos >= state.draw_next {
state.draw_next = new_pos.saturating_add(state.draw_delta);
draw = true;
}
}
if draw {
self.draw().ok();
}
}
fn draw(&self) -> io::Result<()> {
draw_state(&mut self.state.write().unwrap())
}
pub fn position(&self) -> u64 {
self.state.read().unwrap().pos
}
pub fn length(&self) -> u64 {
self.state.read().unwrap().len
}
}
fn draw_state(state: &mut ProgressState) -> io::Result<()> {
if state.draw_target.is_hidden() {
return Ok(());
}
let draw_state = ProgressDrawState {
lines: if state.should_render() {
state.style.format_state(&*state)
} else {
vec![]
},
orphan_lines: 0,
finished: state.is_finished(),
force_draw: false,
move_cursor: false,
ts: Instant::now(),
};
state.draw_target.apply_draw_state(draw_state)
}
#[derive(Clone)]
pub struct WeakProgressBar {
state: Weak<RwLock<ProgressState>>,
}
impl WeakProgressBar {
pub fn upgrade(&self) -> Option<ProgressBar> {
self.state.upgrade().map(|state| ProgressBar { state })
}
}
impl Drop for ProgressState {
fn drop(&mut self) {
if self.is_finished() {
return;
}
self.status = Status::DoneHidden;
if self.pos >= self.draw_next {
self.draw_next = self.pos.saturating_add(self.draw_delta);
draw_state(self).ok();
}
}
}
#[test]
fn test_pbar_zero() {
let pb = ProgressBar::new(0);
assert_eq!(pb.state.read().unwrap().fraction(), 1.0);
}
#[test]
fn test_pbar_maxu64() {
let pb = ProgressBar::new(!0);
assert_eq!(pb.state.read().unwrap().fraction(), 0.0);
}
#[test]
fn test_pbar_overflow() {
let pb = ProgressBar::new(1);
pb.set_draw_target(ProgressDrawTarget::hidden());
pb.inc(2);
pb.finish();
}
#[test]
fn test_get_position() {
let pb = ProgressBar::new(1);
pb.set_draw_target(ProgressDrawTarget::hidden());
pb.inc(2);
let pos = pb.position();
assert_eq!(pos, 2);
}
#[test]
fn test_weak_pb() {
let pb = ProgressBar::new(0);
let weak = pb.downgrade();
assert!(weak.upgrade().is_some());
::std::mem::drop(pb);
assert!(weak.upgrade().is_none());
}
struct MultiObject {
done: bool,
draw_state: Option<ProgressDrawState>,
}
struct MultiProgressState {
objects: Vec<MultiObject>,
ordering: Vec<usize>,
draw_target: ProgressDrawTarget,
move_cursor: bool,
}
pub struct MultiProgress {
state: RwLock<MultiProgressState>,
joining: AtomicBool,
tx: Sender<(usize, ProgressDrawState)>,
rx: Receiver<(usize, ProgressDrawState)>,
}
impl fmt::Debug for MultiProgress {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MultiProgress").finish()
}
}
unsafe impl Sync for MultiProgress {}
impl Default for MultiProgress {
fn default() -> MultiProgress {
MultiProgress::with_draw_target(ProgressDrawTarget::stderr())
}
}
impl MultiProgress {
pub fn new() -> MultiProgress {
MultiProgress::default()
}
pub fn with_draw_target(draw_target: ProgressDrawTarget) -> MultiProgress {
let (tx, rx) = channel();
MultiProgress {
state: RwLock::new(MultiProgressState {
objects: vec![],
ordering: vec![],
draw_target,
move_cursor: false,
}),
joining: AtomicBool::new(false),
tx,
rx,
}
}
pub fn set_draw_target(&self, target: ProgressDrawTarget) {
let mut state = self.state.write().unwrap();
state.draw_target.disconnect();
state.draw_target = target;
}
pub fn set_move_cursor(&self, move_cursor: bool) {
self.state.write().unwrap().move_cursor = move_cursor;
}
pub fn add(&self, pb: ProgressBar) -> ProgressBar {
let mut state = self.state.write().unwrap();
let object_idx = state.objects.len();
state.objects.push(MultiObject {
done: false,
draw_state: None,
});
state.ordering.push(object_idx);
pb.set_draw_target(ProgressDrawTarget {
kind: ProgressDrawTargetKind::Remote(object_idx, Mutex::new(self.tx.clone())),
});
pb
}
pub fn insert(&self, index: usize, pb: ProgressBar) -> ProgressBar {
let mut state = self.state.write().unwrap();
let object_idx = state.objects.len();
state.objects.push(MultiObject {
done: false,
draw_state: None,
});
if index > state.ordering.len() {
state.ordering.push(object_idx);
} else {
state.ordering.insert(index, object_idx);
}
pb.set_draw_target(ProgressDrawTarget {
kind: ProgressDrawTargetKind::Remote(object_idx, Mutex::new(self.tx.clone())),
});
pb
}
pub fn join(&self) -> io::Result<()> {
self.join_impl(false)
}
pub fn join_and_clear(&self) -> io::Result<()> {
self.join_impl(true)
}
fn is_done(&self) -> bool {
let state = self.state.read().unwrap();
if state.objects.is_empty() {
return true;
}
for obj in &state.objects {
if !obj.done {
return false;
}
}
true
}
fn join_impl(&self, clear: bool) -> io::Result<()> {
if self.joining.load(Ordering::Acquire) {
panic!("Already joining!");
}
self.joining.store(true, Ordering::Release);
let move_cursor = self.state.read().unwrap().move_cursor;
const MAX_GROUP_SIZE: usize = 32;
let mut recv_peek = None;
let mut grouped = 0usize;
let mut orphan_lines: Vec<String> = Vec::new();
while !self.is_done() {
let (idx, draw_state) = if let Some(peeked) = recv_peek.take() {
peeked
} else {
self.rx.recv().unwrap()
};
let ts = draw_state.ts;
let force_draw = draw_state.finished || draw_state.force_draw;
let mut state = self.state.write().unwrap();
if draw_state.finished {
state.objects[idx].done = true;
}
let lines = if draw_state.orphan_lines > 0 {
let split = draw_state.lines.split_at(draw_state.orphan_lines);
orphan_lines.extend_from_slice(split.0);
split.1.to_vec()
} else {
draw_state.lines
};
let draw_state = ProgressDrawState {
lines,
orphan_lines: 0,
..draw_state
};
state.objects[idx].draw_state = Some(draw_state);
if state.draw_target.is_hidden() {
continue;
}
debug_assert!(recv_peek.is_none());
if grouped >= MAX_GROUP_SIZE {
grouped = 0;
} else if let Ok(state) = self.rx.try_recv() {
recv_peek = Some(state);
grouped += 1;
continue;
} else {
grouped = 0;
}
let mut lines = vec![];
let orphan_lines_count = orphan_lines.len();
lines.append(&mut orphan_lines);
for index in state.ordering.iter() {
let obj = &state.objects[*index];
if let Some(ref draw_state) = obj.draw_state {
lines.extend_from_slice(&draw_state.lines[..]);
}
}
let finished = !state.objects.iter().any(|ref x| !x.done);
state.draw_target.apply_draw_state(ProgressDrawState {
lines,
orphan_lines: orphan_lines_count,
force_draw: force_draw || orphan_lines_count > 0,
move_cursor,
finished,
ts,
})?;
}
if clear {
let mut state = self.state.write().unwrap();
state.draw_target.apply_draw_state(ProgressDrawState {
lines: vec![],
orphan_lines: 0,
finished: true,
force_draw: true,
move_cursor,
ts: Instant::now(),
})?;
}
self.joining.store(false, Ordering::Release);
Ok(())
}
}
#[derive(Debug)]
pub struct ProgressBarIter<I> {
bar: ProgressBar,
it: I,
}
impl<I: Iterator> Iterator for ProgressBarIter<I> {
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
let item = self.it.next();
if item.is_some() {
self.bar.inc(1);
}
item
}
}
#[derive(Debug)]
pub struct ProgressBarWrap<W> {
bar: ProgressBar,
wrap: W,
}
impl<R: io::Read> io::Read for ProgressBarWrap<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let inc = self.wrap.read(buf)?;
self.bar.inc(inc as u64);
Ok(inc)
}
}
impl<S: io::Seek> io::Seek for ProgressBarWrap<S> {
fn seek(&mut self, f: io::SeekFrom) -> io::Result<u64> {
self.wrap.seek(f).map(|pos| {
self.bar.set_position(pos);
pos
})
}
}
impl<W: io::Write> io::Write for ProgressBarWrap<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.wrap.write(buf).map(|inc| {
self.bar.inc(inc as u64);
inc
})
}
fn flush(&mut self) -> io::Result<()> {
self.wrap.flush()
}
fn write_vectored(&mut self, bufs: &[io::IoSlice]) -> io::Result<usize> {
self.wrap.write_vectored(bufs).map(|inc| {
self.bar.inc(inc as u64);
inc
})
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.wrap.write_all(buf).map(|()| {
self.bar.inc(buf.len() as u64);
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn late_pb_drop() {
let pb = ProgressBar::new(10);
let mpb = MultiProgress::new();
mpb.add(pb.clone());
}
#[test]
fn it_can_wrap_a_reader() {
let bytes = &b"I am an implementation of io::Read"[..];
let pb = ProgressBar::new(bytes.len() as u64);
let mut reader = pb.wrap_read(bytes);
let mut writer = Vec::new();
io::copy(&mut reader, &mut writer).unwrap();
assert_eq!(writer, bytes);
}
#[test]
fn it_can_wrap_a_writer() {
let bytes = b"implementation of io::Read";
let mut reader = &bytes[..];
let pb = ProgressBar::new(bytes.len() as u64);
let writer = Vec::new();
let mut writer = pb.wrap_write(writer);
io::copy(&mut reader, &mut writer).unwrap();
assert_eq!(writer.wrap, bytes);
}
#[test]
fn progress_bar_sync_send() {
let _: Box<dyn Sync> = Box::new(ProgressBar::new(1));
let _: Box<dyn Send> = Box::new(ProgressBar::new(1));
let _: Box<dyn Sync> = Box::new(MultiProgress::new());
let _: Box<dyn Send> = Box::new(MultiProgress::new());
}
}