bookdata/goodreads/
interaction.rs1use hashbrown::HashSet;
3use parquet_derive::ParquetRecordWriter;
4use serde::Deserialize;
5
6use crate::arrow::*;
7use crate::goodreads::users::save_user_index;
8use crate::ids::index::IdIndex;
9use crate::parsing::dates::*;
10use crate::parsing::*;
11use crate::prelude::*;
12
13pub const OUT_FILE: BDPath<'static> = BDPath::new("goodreads/gr-interactions.parquet");
14
15#[derive(Deserialize)]
17pub struct RawInteraction {
18 pub user_id: String,
19 pub book_id: String,
20 pub review_id: String,
21 #[serde(alias = "isRead")]
22 pub is_read: bool,
23 pub rating: f32,
24 pub date_added: String,
25 pub date_updated: String,
26 pub read_at: String,
27 pub started_at: String,
28}
29
30#[derive(ParquetRecordWriter)]
36pub struct IntRecord {
37 pub rec_id: u32,
38 pub review_id: i64,
45 pub user_id: i32,
46 pub book_id: i32,
47 pub is_read: u8,
48 pub rating: Option<f32>,
49 pub added: f32,
50 pub updated: f32,
51 pub read_started: Option<f32>,
52 pub read_finished: Option<f32>,
53}
54
55pub struct IntWriter {
57 writer: TableWriter<IntRecord>,
58 users: IdIndex<String>,
59 review_ids: HashSet<i64>,
60 n_recs: u32,
61}
62
63impl IntWriter {
64 pub fn open() -> Result<IntWriter> {
66 let writer = TableWriter::open(OUT_FILE.resolve()?)?;
67 Ok(IntWriter {
68 writer,
69 users: IdIndex::new(),
70 review_ids: HashSet::new(),
71 n_recs: 0,
72 })
73 }
74}
75
76impl DataSink for IntWriter {
77 fn output_files(&self) -> Vec<PathBuf> {
78 path_list(&[])
79 }
80}
81
82impl ObjectWriter<RawInteraction> for IntWriter {
83 fn write_object(&mut self, row: RawInteraction) -> Result<()> {
85 self.n_recs += 1;
86 let rec_id = self.n_recs;
87 let user_id = self.users.intern_owned(row.user_id)?;
88 let book_id: i32 = row.book_id.parse()?;
89 let (rev_hi, rev_lo) = decode_hex_i64_pair(&row.review_id)?;
90 let review_id = rev_hi ^ rev_lo;
91 if !self.review_ids.insert(review_id) {
92 warn!("review id {} duplicated ({})", review_id, row.review_id);
93 }
94
95 self.writer.write_object(IntRecord {
96 rec_id,
97 review_id,
98 user_id,
99 book_id,
100 is_read: row.is_read as u8,
101 rating: if row.rating > 0.0 {
102 Some(row.rating)
103 } else {
104 None
105 },
106 added: parse_gr_date(&row.date_added).map(check_ts("added", 2000))?,
107 updated: parse_gr_date(&row.date_updated).map(check_ts("updated", 2000))?,
108 read_started: trim_opt(&row.started_at)
109 .map(parse_gr_date)
110 .transpose()?
111 .map(check_ts("started", 1900)),
112 read_finished: trim_opt(&row.read_at)
113 .map(parse_gr_date)
114 .transpose()?
115 .map(check_ts("finished", 1900)),
116 })?;
117
118 Ok(())
119 }
120
121 fn finish(self) -> Result<usize> {
123 info!(
124 "wrote {} records for {} users, closing output",
125 self.n_recs,
126 self.users.len()
127 );
128 let res = self.writer.finish()?;
129 save_user_index(&self.users)?;
130 Ok(res)
131 }
132}