use std::error::Error;
use std::io::prelude::*;
use std::io::Lines;
use std::marker::PhantomData;
use std::path::Path;
use std::str::FromStr;
use anyhow::Result;
use indicatif::ProgressBar;
use log::*;
use serde::de::DeserializeOwned;
use super::compress::open_gzin_progress;
use super::ObjectWriter;
pub struct LineProcessor {
reader: Box<dyn BufRead>,
}
pub struct Records<R> {
lines: Lines<Box<dyn BufRead>>,
phantom: PhantomData<R>,
}
impl<R: FromStr> Iterator for Records<R>
where
R::Err: 'static + Error + Send + Sync,
{
type Item = Result<R>;
fn next(&mut self) -> Option<Self::Item> {
self.lines.next().map(|l| {
let line = l?;
let rec = line.parse()?;
Ok(rec)
})
}
}
pub struct JSONRecords<R> {
lines: Lines<Box<dyn BufRead>>,
phantom: PhantomData<R>,
}
impl<R: DeserializeOwned> Iterator for JSONRecords<R> {
type Item = Result<R>;
fn next(&mut self) -> Option<Self::Item> {
self.lines.next().map(|l| {
let line = l?;
let rec = serde_json::from_str(&line)?;
Ok(rec)
})
}
}
impl LineProcessor {
pub fn open_gzip(path: &Path, pb: ProgressBar) -> Result<LineProcessor> {
let read = open_gzin_progress(path, pb)?;
Ok(LineProcessor {
reader: Box::new(read),
})
}
#[allow(dead_code)]
pub fn lines(self) -> Lines<Box<dyn BufRead>> {
self.reader.lines()
}
pub fn records<R: FromStr>(self) -> Records<R> {
Records {
lines: self.reader.lines(),
phantom: PhantomData,
}
}
pub fn json_records<R: DeserializeOwned>(self) -> JSONRecords<R> {
JSONRecords {
lines: self.reader.lines(),
phantom: PhantomData,
}
}
pub fn process_json<W, R>(self, writer: &mut W) -> Result<usize>
where
R: DeserializeOwned,
W: ObjectWriter<R>,
{
let mut line_no = 0;
for line in self.json_records() {
line_no += 1;
let obj: R = line.map_err(|e| {
error!("error parsing line {}: {:?}", line_no, e);
e
})?;
writer.write_object(obj).map_err(|e| {
error!("error writing line {}: {:?}", line_no, e);
e
})?;
}
debug!("read {} lines", line_no);
Ok(line_no)
}
}