bookdata/openlib/
work.rs

1//! OpenLibrary work schemas.
2use friendly::scalar;
3use parquet_derive::ParquetRecordWriter;
4
5use crate::arrow::*;
6use crate::prelude::*;
7
8use super::key::parse_ol_key;
9use super::key::KS_WORK;
10pub use super::source::OLWorkRecord;
11use super::source::Row;
12use super::subject::SubjectEntry;
13
14/// Work row in extracted Parquet.
15#[derive(Debug, Clone, ParquetRecordWriter)]
16pub struct WorkRec {
17    pub id: u32,
18    pub key: String,
19    pub title: Option<String>,
20}
21
22/// Work-author link in extracted Parquet.
23#[derive(Debug, Clone, ParquetRecordWriter)]
24pub struct WorkAuthorRec {
25    pub id: u32,
26    pub pos: i16,
27    pub author: u32,
28}
29
30/// Work-subject record in extracted Parquet.
31#[derive(Debug, Clone, ParquetRecordWriter)]
32pub struct WorkSubjectRec {
33    pub id: u32,
34    pub subj_type: u8,
35    pub subject: String,
36}
37
38impl From<SubjectEntry> for WorkSubjectRec {
39    fn from(value: SubjectEntry) -> Self {
40        WorkSubjectRec {
41            id: value.entity as u32,
42            subj_type: value.subj_type.into(),
43            subject: value.subject,
44        }
45    }
46}
47
48/// Process author source records into Parquet.
49///
50/// This must be run **after** the author processor.
51pub struct WorkProcessor {
52    rec_writer: TableWriter<WorkRec>,
53    author_writer: TableWriter<WorkAuthorRec>,
54    subject_writer: TableWriter<WorkSubjectRec>,
55}
56
57impl WorkProcessor {
58    /// Create a new work processor.
59    pub fn new() -> Result<WorkProcessor> {
60        Ok(WorkProcessor {
61            rec_writer: TableWriter::open("works.parquet")?,
62            author_writer: TableWriter::open("work-authors.parquet")?,
63            subject_writer: TableWriter::open("work-subjects.parquet")?,
64        })
65    }
66}
67
68impl ObjectWriter<Row<OLWorkRecord>> for WorkProcessor {
69    fn write_object(&mut self, row: Row<OLWorkRecord>) -> Result<()> {
70        let id = parse_ol_key(&row.key, KS_WORK)?;
71
72        self.rec_writer.write_object(WorkRec {
73            id,
74            key: row.key.clone(),
75            title: row.record.title.clone(),
76        })?;
77
78        for pos in 0..row.record.authors.len() {
79            let akey = row.record.authors[pos].id()?;
80            if let Some(aid) = akey {
81                let pos = pos as i16;
82                self.author_writer.write_object(WorkAuthorRec {
83                    id,
84                    pos,
85                    author: aid,
86                })?;
87            }
88        }
89
90        for sr in row.record.subjects.subject_records(id) {
91            self.subject_writer.write_object(sr.into())?;
92        }
93
94        Ok(())
95    }
96
97    fn finish(self) -> Result<usize> {
98        let nr = self.rec_writer.finish()?;
99        info!("wrote {} work records", scalar(nr));
100        let na = self.author_writer.finish()?;
101        info!("wrote {} work-author records", scalar(na));
102        let ns = self.subject_writer.finish()?;
103        info!("wrote {} work-subject records", scalar(ns));
104        Ok(nr)
105    }
106}