bookdata/goodreads/
interaction.rs

1//! GoodReads interaction record schemas and processing.
2use hashbrown::HashSet;
3use parquet_derive::ParquetRecordWriter;
4use serde::Deserialize;
5
6use crate::arrow::*;
7use crate::goodreads::users::save_user_index;
8use crate::ids::index::IdIndex;
9use crate::parsing::dates::*;
10use crate::parsing::*;
11use crate::prelude::*;
12
13pub const OUT_FILE: BDPath<'static> = BDPath::new("goodreads/gr-interactions.parquet");
14
15/// Interaction records we read from JSON.
16#[derive(Deserialize)]
17pub struct RawInteraction {
18    pub user_id: String,
19    pub book_id: String,
20    pub review_id: String,
21    #[serde(alias = "isRead")]
22    pub is_read: bool,
23    pub rating: f32,
24    pub date_added: String,
25    pub date_updated: String,
26    pub read_at: String,
27    pub started_at: String,
28}
29
30/// GoodReads interaction records as actually written to the table.
31///
32/// This struct is written to `gr-interactions.parquet` and records actual interaction data.
33/// Timestamps are UNIX timestamps recorded as 64-bit integers; they do not use a Parquet
34/// timestamp time, due to out-of-range values causing problems when loaded into Python.
35#[derive(ParquetRecordWriter)]
36pub struct IntRecord {
37    pub rec_id: u32,
38    /// The review ID.
39    ///
40    /// This is derived from the hexadecimal review ID by interpreting the hexadecimal-encoded
41    /// review ID from the source data as two big-endian i64s and XORing them.  The import
42    /// process checks that this does not result in duplicate review IDs, and emits warnings
43    /// if any are encountered.
44    pub review_id: i64,
45    pub user_id: i32,
46    pub book_id: i32,
47    pub is_read: u8,
48    pub rating: Option<f32>,
49    pub added: f32,
50    pub updated: f32,
51    pub read_started: Option<f32>,
52    pub read_finished: Option<f32>,
53}
54
55/// Object writer to transform and write GoodReads interactions
56pub struct IntWriter {
57    writer: TableWriter<IntRecord>,
58    users: IdIndex<String>,
59    review_ids: HashSet<i64>,
60    n_recs: u32,
61}
62
63impl IntWriter {
64    /// Open a new output
65    pub fn open() -> Result<IntWriter> {
66        let writer = TableWriter::open(OUT_FILE.resolve()?)?;
67        Ok(IntWriter {
68            writer,
69            users: IdIndex::new(),
70            review_ids: HashSet::new(),
71            n_recs: 0,
72        })
73    }
74}
75
76impl DataSink for IntWriter {
77    fn output_files(&self) -> Vec<PathBuf> {
78        path_list(&[])
79    }
80}
81
82impl ObjectWriter<RawInteraction> for IntWriter {
83    /// Write a single interaction to the output
84    fn write_object(&mut self, row: RawInteraction) -> Result<()> {
85        self.n_recs += 1;
86        let rec_id = self.n_recs;
87        let user_id = self.users.intern_owned(row.user_id)?;
88        let book_id: i32 = row.book_id.parse()?;
89        let (rev_hi, rev_lo) = decode_hex_i64_pair(&row.review_id)?;
90        let review_id = rev_hi ^ rev_lo;
91        if !self.review_ids.insert(review_id) {
92            warn!("review id {} duplicated ({})", review_id, row.review_id);
93        }
94
95        self.writer.write_object(IntRecord {
96            rec_id,
97            review_id,
98            user_id,
99            book_id,
100            is_read: row.is_read as u8,
101            rating: if row.rating > 0.0 {
102                Some(row.rating)
103            } else {
104                None
105            },
106            added: parse_gr_date(&row.date_added).map(check_ts("added", 2000))?,
107            updated: parse_gr_date(&row.date_updated).map(check_ts("updated", 2000))?,
108            read_started: trim_opt(&row.started_at)
109                .map(parse_gr_date)
110                .transpose()?
111                .map(check_ts("started", 1900)),
112            read_finished: trim_opt(&row.read_at)
113                .map(parse_gr_date)
114                .transpose()?
115                .map(check_ts("finished", 1900)),
116        })?;
117
118        Ok(())
119    }
120
121    // Clean up and finalize output
122    fn finish(self) -> Result<usize> {
123        info!(
124            "wrote {} records for {} users, closing output",
125            self.n_recs,
126            self.users.len()
127        );
128        let res = self.writer.finish()?;
129        save_user_index(&self.users)?;
130        Ok(res)
131    }
132}