1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
//! OpenLibrary edition schemas.
use friendly::scalar;

use crate::arrow::*;
use crate::cleaning::isbns::clean_asin_chars;
use crate::cleaning::isbns::clean_isbn_chars;
use crate::ids::index::IdIndex;
use crate::prelude::*;

pub use super::source::OLEditionRecord;
use super::source::Row;
use super::subject::SubjectEntry;
use super::subject::SubjectType;

/// An edition row in the extracted Parquet.
#[derive(TableRow)]
pub struct EditionRec {
    pub id: i32,
    pub key: String,
    pub title: Option<String>,
}

/// Link between edition and work.
#[derive(TableRow)]
pub struct LinkRec {
    pub edition: i32,
    pub work: i32,
}

/// Edition ISBN record.
#[derive(TableRow)]
pub struct ISBNrec {
    pub edition: i32,
    pub isbn: String,
}

/// Edition author record.
#[derive(TableRow)]
pub struct EditionAuthorRec {
    pub edition: i32,
    pub pos: i16,
    pub author: i32,
}

/// Edition-subject record in extracted Parquet.
#[derive(TableRow)]
pub struct EditionSubjectRec {
    pub id: i32,
    pub subj_type: SubjectType,
    pub subject: String,
}

impl From<SubjectEntry> for EditionSubjectRec {
    fn from(value: SubjectEntry) -> Self {
        EditionSubjectRec {
            id: value.entity,
            subj_type: value.subj_type.into(),
            subject: value.subject,
        }
    }
}

/// Process edition records into Parquet.
///
/// This must be run **after** the author and work processors.
pub struct EditionProcessor {
    last_id: i32,
    author_ids: IdIndex<String>,
    work_ids: IdIndex<String>,
    rec_writer: TableWriter<EditionRec>,
    link_writer: TableWriter<LinkRec>,
    isbn_writer: TableWriter<ISBNrec>,
    author_writer: TableWriter<EditionAuthorRec>,
    subject_writer: TableWriter<EditionSubjectRec>,
}

impl EditionProcessor {
    pub fn new() -> Result<EditionProcessor> {
        Ok(EditionProcessor {
            last_id: 0,
            author_ids: IdIndex::load_standard("author-ids-after-works.parquet")?,
            work_ids: IdIndex::load_standard("works.parquet")?,
            rec_writer: TableWriter::open("editions.parquet")?,
            link_writer: TableWriter::open("edition-works.parquet")?,
            isbn_writer: TableWriter::open("edition-isbns.parquet")?,
            author_writer: TableWriter::open("edition-authors.parquet")?,
            subject_writer: TableWriter::open("edition-subjects.parquet")?,
        })
    }

    fn save_isbns(
        &mut self,
        edition: i32,
        isbns: Vec<String>,
        clean: fn(&str) -> String,
    ) -> Result<()> {
        for isbn in isbns {
            let isbn = clean(&isbn);
            // filter but with a reasonable threshold of error
            if isbn.len() >= 8 {
                self.isbn_writer.write_object(ISBNrec { edition, isbn })?;
            }
        }

        Ok(())
    }
}

impl ObjectWriter<Row<OLEditionRecord>> for EditionProcessor {
    fn write_object(&mut self, row: Row<OLEditionRecord>) -> Result<()> {
        self.last_id += 1;
        let id = self.last_id;

        self.rec_writer.write_object(EditionRec {
            id,
            key: row.key.clone(),
            title: row.record.title.clone(),
        })?;

        self.save_isbns(id, row.record.isbn_10, clean_isbn_chars)?;
        self.save_isbns(id, row.record.isbn_13, clean_isbn_chars)?;
        self.save_isbns(id, row.record.asin, clean_asin_chars)?;

        for work in row.record.works {
            let work = self.work_ids.intern_owned(work.key)?;
            self.link_writer
                .write_object(LinkRec { edition: id, work })?;
        }

        for pos in 0..row.record.authors.len() {
            let akey = row.record.authors[pos].key();
            if let Some(akey) = akey {
                let aid = self.author_ids.intern(akey)?;
                let pos = pos as i16;
                self.author_writer.write_object(EditionAuthorRec {
                    edition: id,
                    pos,
                    author: aid,
                })?;
            }
        }

        for sr in row.record.subjects.subject_records(id) {
            self.subject_writer.write_object(sr.into())?;
        }

        Ok(())
    }

    fn finish(self) -> Result<usize> {
        let n = self.rec_writer.finish()?;
        info!("wrote {} edition records", scalar(n));
        let n = self.author_writer.finish()?;
        info!("wrote {} edition-author records", scalar(n));
        let n = self.link_writer.finish()?;
        info!("wrote {} edition-work records", scalar(n));
        let n = self.isbn_writer.finish()?;
        info!("wrote {} edition-isbn records", scalar(n));
        let n = self.subject_writer.finish()?;
        info!("wrote {} edition-subject records", scalar(n));
        self.author_ids.save_standard("all-authors.parquet")?;
        self.work_ids.save_standard("all-works.parquet")?;
        Ok(self.last_id as usize)
    }
}

impl EditionProcessor {}