1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//! Scan Amazon ratings.
use csv;
use std::fs::File;

use crate::amazon::*;
use crate::arrow::*;
use crate::ids::index::IdIndex;
use crate::prelude::*;
use crate::util::logging::data_progress;

/// Scan an Amazon rating CSV file into Parquet.
#[derive(Args, Debug)]
#[command(name = "scan-ratings")]
pub struct ScanRatings {
    /// Swap user and item columns (for AZ 2018 data)
    #[arg(long = "swap-id-columns")]
    swap_columns: bool,

    /// Rating output file
    #[arg(short = 'o', long = "rating-output", name = "FILE")]
    ratings_out: PathBuf,

    /// Input file
    #[arg(name = "INPUT")]
    infile: PathBuf,
}

impl Command for ScanRatings {
    fn exec(&self) -> Result<()> {
        info!("scanning Amazon rating CSV from {}", self.infile.display());
        let out = &self.ratings_out;
        info!("writing to {}", out.display());
        let mut writer = TableWriter::open(out)?;

        let src = File::open(&self.infile)?;
        let pb = data_progress(src.metadata()?.len());
        pb.set_prefix("ratings");
        let src = pb.wrap_read(src);
        let src = csv::ReaderBuilder::new()
            .has_headers(false)
            .from_reader(src);
        let src = src.into_deserialize();
        let mut index: IdIndex<String> = IdIndex::new();
        for row in src {
            let mut row: SourceRating = row?;
            if self.swap_columns {
                std::mem::swap(&mut row.user, &mut row.asin);
            }
            let user = index.intern(row.user.as_str())?;
            writer.write_object(RatingRow {
                user,
                asin: row.asin,
                rating: row.rating,
                timestamp: row.timestamp,
            })?;
        }

        writer.finish()?;
        Ok(())
    }
}