1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//! GoodReads interaction record schemas and processing.
use hashbrown::HashSet;
use serde::Deserialize;

use crate::arrow::*;
use crate::goodreads::users::save_user_index;
use crate::ids::index::IdIndex;
use crate::parsing::dates::*;
use crate::parsing::*;
use crate::prelude::*;

pub const OUT_FILE: BDPath<'static> = BDPath::new("goodreads/gr-interactions.parquet");

/// Interaction records we read from JSON.
#[derive(Deserialize)]
pub struct RawInteraction {
    pub user_id: String,
    pub book_id: String,
    pub review_id: String,
    #[serde(alias = "isRead")]
    pub is_read: bool,
    pub rating: f32,
    pub date_added: String,
    pub date_updated: String,
    pub read_at: String,
    pub started_at: String,
}

/// GoodReads interaction records as actually written to the table.
///
/// This struct is written to `gr-interactions.parquet` and records actual interaction data.
/// Timestamps are UNIX timestamps recorded as 64-bit integers; they do not use a Parquet
/// timestamp time, due to out-of-range values causing problems when loaded into Python.
#[derive(TableRow)]
pub struct IntRecord {
    pub rec_id: u32,
    /// The review ID.
    ///
    /// This is derived from the hexadecimal review ID by interpreting the hexadecimal-encoded
    /// review ID from the source data as two big-endian i64s and XORing them.  The import
    /// process checks that this does not result in duplicate review IDs, and emits warnings
    /// if any are encountered.
    pub review_id: i64,
    pub user_id: i32,
    pub book_id: i32,
    pub is_read: u8,
    pub rating: Option<f32>,
    pub added: f32,
    pub updated: f32,
    pub read_started: Option<f32>,
    pub read_finished: Option<f32>,
}

/// Object writer to transform and write GoodReads interactions
pub struct IntWriter {
    writer: TableWriter<IntRecord>,
    users: IdIndex<String>,
    review_ids: HashSet<i64>,
    n_recs: u32,
}

impl IntWriter {
    /// Open a new output
    pub fn open() -> Result<IntWriter> {
        let writer = TableWriter::open(OUT_FILE.resolve()?)?;
        Ok(IntWriter {
            writer,
            users: IdIndex::new(),
            review_ids: HashSet::new(),
            n_recs: 0,
        })
    }
}

impl DataSink for IntWriter {
    fn output_files(&self) -> Vec<PathBuf> {
        path_list(&[])
    }
}

impl ObjectWriter<RawInteraction> for IntWriter {
    /// Write a single interaction to the output
    fn write_object(&mut self, row: RawInteraction) -> Result<()> {
        self.n_recs += 1;
        let rec_id = self.n_recs;
        let user_id = self.users.intern_owned(row.user_id)?;
        let book_id: i32 = row.book_id.parse()?;
        let (rev_hi, rev_lo) = decode_hex_i64_pair(&row.review_id)?;
        let review_id = rev_hi ^ rev_lo;
        if !self.review_ids.insert(review_id) {
            warn!("review id {} duplicated ({})", review_id, row.review_id);
        }

        self.writer.write_object(IntRecord {
            rec_id,
            review_id,
            user_id,
            book_id,
            is_read: row.is_read as u8,
            rating: if row.rating > 0.0 {
                Some(row.rating)
            } else {
                None
            },
            added: parse_gr_date(&row.date_added).map(check_ts("added", 2000))?,
            updated: parse_gr_date(&row.date_updated).map(check_ts("updated", 2000))?,
            read_started: trim_opt(&row.started_at)
                .map(parse_gr_date)
                .transpose()?
                .map(check_ts("started", 1900)),
            read_finished: trim_opt(&row.read_at)
                .map(parse_gr_date)
                .transpose()?
                .map(check_ts("finished", 1900)),
        })?;

        Ok(())
    }

    // Clean up and finalize output
    fn finish(self) -> Result<usize> {
        info!(
            "wrote {} records for {} users, closing output",
            self.n_recs,
            self.users.len()
        );
        let res = self.writer.finish()?;
        save_user_index(&self.users)?;
        Ok(res)
    }
}