bookdata/goodreads/
interaction.rsuse hashbrown::HashSet;
use parquet_derive::ParquetRecordWriter;
use serde::Deserialize;
use crate::arrow::*;
use crate::goodreads::users::save_user_index;
use crate::ids::index::IdIndex;
use crate::parsing::dates::*;
use crate::parsing::*;
use crate::prelude::*;
pub const OUT_FILE: BDPath<'static> = BDPath::new("goodreads/gr-interactions.parquet");
#[derive(Deserialize)]
pub struct RawInteraction {
pub user_id: String,
pub book_id: String,
pub review_id: String,
#[serde(alias = "isRead")]
pub is_read: bool,
pub rating: f32,
pub date_added: String,
pub date_updated: String,
pub read_at: String,
pub started_at: String,
}
#[derive(ParquetRecordWriter)]
pub struct IntRecord {
pub rec_id: u32,
pub review_id: i64,
pub user_id: i32,
pub book_id: i32,
pub is_read: u8,
pub rating: Option<f32>,
pub added: f32,
pub updated: f32,
pub read_started: Option<f32>,
pub read_finished: Option<f32>,
}
pub struct IntWriter {
writer: TableWriter<IntRecord>,
users: IdIndex<String>,
review_ids: HashSet<i64>,
n_recs: u32,
}
impl IntWriter {
pub fn open() -> Result<IntWriter> {
let writer = TableWriter::open(OUT_FILE.resolve()?)?;
Ok(IntWriter {
writer,
users: IdIndex::new(),
review_ids: HashSet::new(),
n_recs: 0,
})
}
}
impl DataSink for IntWriter {
fn output_files(&self) -> Vec<PathBuf> {
path_list(&[])
}
}
impl ObjectWriter<RawInteraction> for IntWriter {
fn write_object(&mut self, row: RawInteraction) -> Result<()> {
self.n_recs += 1;
let rec_id = self.n_recs;
let user_id = self.users.intern_owned(row.user_id)?;
let book_id: i32 = row.book_id.parse()?;
let (rev_hi, rev_lo) = decode_hex_i64_pair(&row.review_id)?;
let review_id = rev_hi ^ rev_lo;
if !self.review_ids.insert(review_id) {
warn!("review id {} duplicated ({})", review_id, row.review_id);
}
self.writer.write_object(IntRecord {
rec_id,
review_id,
user_id,
book_id,
is_read: row.is_read as u8,
rating: if row.rating > 0.0 {
Some(row.rating)
} else {
None
},
added: parse_gr_date(&row.date_added).map(check_ts("added", 2000))?,
updated: parse_gr_date(&row.date_updated).map(check_ts("updated", 2000))?,
read_started: trim_opt(&row.started_at)
.map(parse_gr_date)
.transpose()?
.map(check_ts("started", 1900)),
read_finished: trim_opt(&row.read_at)
.map(parse_gr_date)
.transpose()?
.map(check_ts("finished", 1900)),
})?;
Ok(())
}
fn finish(self) -> Result<usize> {
info!(
"wrote {} records for {} users, closing output",
self.n_recs,
self.users.len()
);
let res = self.writer.finish()?;
save_user_index(&self.users)?;
Ok(res)
}
}