use std::io::prelude::*;
use std::io::{self, copy};
use std::mem::drop;
use std::thread::{spawn, JoinHandle};
use anyhow::Result;
use friendly::bytes;
use log::*;
use os_pipe::{pipe, PipeReader, PipeWriter};
use crate::util::timing::Timer;
pub struct ThreadRead {
read: PipeReader,
handle: Option<JoinHandle<io::Result<u64>>>,
}
pub struct ThreadWrite {
write: Option<PipeWriter>,
handle: Option<JoinHandle<io::Result<u64>>>,
}
impl ThreadRead {
pub fn new<R: Read + Send + 'static>(chan: R) -> Result<ThreadRead> {
let (read, writer) = pipe()?;
let jh = spawn(move || {
let mut src = chan;
let mut dst = writer;
let timer = Timer::new();
let res = copy(&mut src, &mut dst);
if let Ok(size) = res {
info!("copied {} in {}", bytes(size), timer);
}
res
});
Ok(ThreadRead {
read,
handle: Some(jh),
})
}
}
impl Read for ThreadRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let size = self.read.read(buf)?;
if size == 0 {
if let Some(h) = self.handle.take() {
let res = h.join().expect("thread error");
let sz = res?;
debug!("thread copied {} bytes", sz);
}
}
Ok(size)
}
}
impl ThreadWrite {
pub fn new<W: Write + Send + 'static>(chan: W) -> Result<ThreadWrite> {
let (read, writer) = pipe()?;
let jh = spawn(move || {
let mut src = read;
let mut dst = chan;
copy(&mut src, &mut dst)
});
Ok(ThreadWrite {
write: Some(writer),
handle: Some(jh),
})
}
fn do_close(&mut self) -> io::Result<u64> {
if let Some(mut write) = self.write.take() {
write.flush()?;
drop(write);
}
if let Some(h) = self.handle.take() {
let res = h.join().expect("thread error");
let sz = res?;
debug!("thread copied {} bytes", sz);
Ok(sz)
} else {
Err(io::ErrorKind::BrokenPipe.into())
}
}
#[allow(dead_code)]
pub fn close(mut self) -> io::Result<u64> {
self.do_close()
}
}
impl Drop for ThreadWrite {
fn drop(&mut self) {
if self.write.is_some() || self.handle.is_some() {
self.do_close().expect("unclosed background thread failed");
}
}
}
impl Write for ThreadWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Some(ref mut write) = self.write {
write.write(buf)
} else {
Err(io::ErrorKind::BrokenPipe.into())
}
}
fn flush(&mut self) -> io::Result<()> {
if let Some(ref mut write) = self.write {
write.flush()
} else {
Err(io::ErrorKind::BrokenPipe.into())
}
}
}