bookdata/goodreads/
review.rsuse parquet_derive::ParquetRecordWriter;
pub use serde::Deserialize;
use crate::arrow::*;
use crate::ids::index::IdIndex;
use crate::parsing::dates::*;
use crate::parsing::*;
use crate::prelude::*;
use super::ids::load_id_links;
use super::ids::BookLinkMap;
use super::users::load_user_index;
const OUT_FILE: &'static str = "gr-reviews.parquet";
#[derive(Deserialize)]
pub struct RawReview {
pub user_id: String,
pub book_id: String,
pub review_id: String,
pub rating: f32,
pub review_text: String,
pub n_votes: i32,
pub date_added: String,
pub date_updated: String,
pub read_at: String,
pub started_at: String,
}
#[derive(ParquetRecordWriter)]
pub struct ReviewRecord {
pub rec_id: u32,
pub review_id: i64,
pub user_id: i32,
pub book_id: i32,
pub work_id: Option<i32>,
pub cluster: i32,
pub rating: Option<f32>,
pub review: String,
pub n_votes: i32,
pub added: f32,
pub updated: f32,
}
pub struct ReviewWriter {
writer: TableWriter<ReviewRecord>,
users: IdIndex<String>,
books: BookLinkMap,
n_recs: u32,
}
impl ReviewWriter {
pub fn open() -> Result<ReviewWriter> {
let writer = TableWriter::open(OUT_FILE)?;
let users = load_user_index()?.freeze();
let books = load_id_links()?;
Ok(ReviewWriter {
writer,
users,
books,
n_recs: 0,
})
}
}
impl DataSink for ReviewWriter {
fn output_files(&self) -> Vec<PathBuf> {
path_list(&[OUT_FILE])
}
}
impl ObjectWriter<RawReview> for ReviewWriter {
fn write_object(&mut self, row: RawReview) -> Result<()> {
self.n_recs += 1;
let rec_id = self.n_recs;
let user_id = self.users.intern_owned(row.user_id)?;
let book_id: i32 = row.book_id.parse()?;
let (rev_hi, rev_lo) = decode_hex_i64_pair(&row.review_id)?;
let review_id = rev_hi ^ rev_lo;
let link = self
.books
.get(&book_id)
.ok_or_else(|| anyhow!("unknown book ID"))?;
self.writer.write_object(ReviewRecord {
rec_id,
review_id,
user_id,
book_id,
work_id: link.work_id,
cluster: link.cluster,
review: row.review_text,
rating: if row.rating > 0.0 {
Some(row.rating)
} else {
None
},
n_votes: row.n_votes,
added: parse_gr_date(&row.date_added).map(check_ts("added", 2000))?,
updated: parse_gr_date(&row.date_updated).map(check_ts("updated", 2000))?,
})?;
Ok(())
}
fn finish(self) -> Result<usize> {
info!(
"wrote {} records for {} users, closing output",
self.n_recs,
self.users.len()
);
self.writer.finish()
}
}