1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! Scan MARC records.  See [ScanMARC] for documentation.
use std::path::PathBuf;
use std::time::Instant;

use log::*;

use clap::Args;
use glob::glob;

use crate::io::{log_file_info, open_gzin_progress};
use crate::prelude::*;

use crate::marc::book_fields::BookOutput;
use crate::marc::flat_fields::FieldOutput;
use crate::marc::parse::{scan_records, scan_records_delim};
use crate::marc::MARCRecord;
use crate::util::logging::data_progress;

/// Scan MARC records and extract basic information.
///
/// This tool scans MARC-XML records, in either raw or delimited-line format,
/// and writes the fields to a Parquet file of flat field records.  It has two
/// modes: normal, which simply writes MARC fields to the Parquet file, and
/// 'book mode', which only saves books and produces additional output files
/// summarizing book record information and book ISBNs.
#[derive(Args, Debug)]
#[command(name = "scan-marc")]
pub struct ScanMARC {
    /// Output files for normal mode.
    #[arg(short = 'o', long = "output")]
    output: Option<PathBuf>,

    /// Prefix for output files in book mode.
    #[arg(short = 'p', long = "output-prefix")]
    prefix: Option<String>,

    /// Turn on book mode.
    #[arg(long = "book-mode")]
    book_mode: bool,

    /// Read in line mode
    #[arg(short = 'L', long = "line-mode")]
    line_mode: bool,

    /// Glob for files to parse.
    #[arg(short = 'G', long = "glob")]
    glob: Option<String>,

    /// Input files to parse (GZ-compressed)
    #[arg(name = "FILE")]
    files: Vec<PathBuf>,
}

impl Command for ScanMARC {
    fn exec(&self) -> Result<()> {
        // dispatch based on our operating mode
        if self.book_mode {
            let pfx = match &self.prefix {
                Some(p) => p,
                None => "book",
            };
            let output = BookOutput::open(pfx)?;
            self.process_records(output)?;
        } else {
            let ofn = match &self.output {
                Some(p) => p.clone(),
                None => PathBuf::from("marc-fields.parquet"),
            };
            let output = FieldOutput::open(&ofn)?;
            self.process_records(output)?;
        };

        Ok(())
    }
}

impl ScanMARC {
    fn find_files(&self) -> Result<Vec<PathBuf>> {
        if let Some(ref gs) = self.glob {
            info!("scanning for files {}", gs);
            let mut v = Vec::new();
            for entry in glob(gs)? {
                let entry = entry?;
                v.push(entry);
            }
            Ok(v)
        } else {
            Ok(self.files.clone())
        }
    }

    fn process_records<W: ObjectWriter<MARCRecord> + DataSink + Send + Sync + 'static>(
        &self,
        mut output: W,
    ) -> Result<()> {
        let mut nfiles = 0;
        let mut all_recs = 0;
        let all_start = Instant::now();

        for inf in self.find_files()? {
            nfiles += 1;
            let inf = inf.as_path();
            let file_start = Instant::now();
            info!("reading from compressed file {}", inf.display());
            let pb = data_progress(0);
            let read = open_gzin_progress(inf, pb.clone())?;
            let nrecs = if self.line_mode {
                scan_records_delim(read, &mut output)?
            } else {
                scan_records(read, &mut output)?
            };

            info!(
                "processed {} records from {} in {:.2}s",
                nrecs,
                inf.display(),
                file_start.elapsed().as_secs_f32()
            );
            all_recs += nrecs;
        }

        let outs = output.output_files();
        let written = output.finish()?;

        info!(
            "imported {} fields from {} records from {} files in {:.2}s",
            written,
            all_recs,
            nfiles,
            all_start.elapsed().as_secs_f32()
        );
        log_file_info(&outs)?;

        Ok(())
    }
}