1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use clap::Args;

use crate::{arrow::*, prelude::*};
use polars::prelude::*;

static ALL_ISBNS_FILE: &str = "book-links/all-isbns.parquet";

/// Link records to ISBN IDs.
#[derive(Debug, Args)]
#[command(name = "link-isbn-ids")]
pub struct LinkISBNIds {
    /// Read record IDs from RECFLD.
    #[arg(
        short = 'R',
        long = "record-id",
        name = "RECFLD",
        default_value = "rec_id"
    )]
    rec_field: String,

    /// Read ISBNs from FIELD.
    #[arg(
        short = 'I',
        long = "isbn-field",
        name = "FIELD",
        default_value = "isbn"
    )]
    isbn_fields: Vec<String>,

    /// Write output to FILE.
    #[arg(short = 'o', long = "output", name = "FILE")]
    outfile: PathBuf,

    /// Read records from INPUT.
    #[arg(name = "INFILE")]
    infile: PathBuf,
}

impl Command for LinkISBNIds {
    fn exec(&self) -> Result<()> {
        info!("record field: {}", &self.rec_field);
        info!("ISBN fields: {:?}", &self.isbn_fields);

        let isbns = scan_df_parquet(ALL_ISBNS_FILE)?;
        let records = scan_df_parquet(&self.infile)?;

        let merged = if self.isbn_fields.len() == 1 {
            // one column, join on it
            records.join(
                isbns,
                &[col(self.isbn_fields[0].as_str())],
                &[col("isbn")],
                JoinType::Inner.into(),
            )
        } else {
            let mut melt = MeltArgs::default();
            melt.id_vars.push((&self.rec_field).into());
            for fld in &self.isbn_fields {
                melt.value_vars.push(fld.into());
            }
            melt.value_name = Some("isbn".into());
            melt.variable_name = Some("field".into());
            let rm = records.melt(melt);
            rm.join(
                isbns,
                &[col("isbn")],
                &[col("isbn")],
                JoinType::Inner.into(),
            )
        };
        let filtered = merged
            .filter(col("isbn").is_not_null())
            .select(&[col(self.rec_field.as_str()), col("isbn_id")])
            .unique(None, UniqueKeepStrategy::First)
            .sort(self.rec_field.as_str(), default());

        info!("collecting results");
        let frame = filtered.collect()?;
        if frame.column(&self.rec_field)?.null_count() > 0 {
            error!("final frame has null record IDs");
            return Err(anyhow!("data check failed"));
        }
        if frame.column("isbn_id")?.null_count() > 0 {
            error!("final frame has null ISBN IDs");
            return Err(anyhow!("data check failed"));
        }

        info!("saving {} links to {:?}", frame.height(), &self.outfile);
        let schema = nonnull_schema(&frame);
        let writer = open_parquet_writer(&self.outfile, schema)?;
        writer.write_and_finish(frame.iter_chunks(false))?;

        Ok(())
    }
}