1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//! Cluster Amazon ratings.
use crate::prelude::*;
use polars::prelude::*;

/// Group Amazon ratings into clusters.
#[derive(Args, Debug)]
#[command(name = "cluster-ratings")]
pub struct ClusterRatings {
    /// Rating output file
    #[arg(short = 'o', long = "output", name = "FILE")]
    ratings_out: PathBuf,

    /// Input file to cluster
    #[arg(name = "INPUT")]
    infile: PathBuf,
}

impl Command for ClusterRatings {
    fn exec(&self) -> Result<()> {
        let isbns = LazyFrame::scan_parquet("book-links/isbn-clusters.parquet", default())?;
        let isbns = isbns.select(&[col("isbn"), col("cluster")]);

        let ratings = LazyFrame::scan_parquet(&self.infile, default())?;

        let joined = ratings.join(isbns, &[col("asin")], &[col("isbn")], JoinType::Inner.into());
        let joined = joined
            .select(&[
                col("user"),
                col("cluster").alias("item"),
                col("rating"),
                col("timestamp"),
            ])
            .sort("timestamp", default());

        let actions = joined.group_by(&[col("user"), col("item")]).agg(&[
            col("rating").median().alias("rating"),
            col("rating").last().alias("last_rating"),
            col("timestamp").min().alias("first_time"),
            col("timestamp").max().alias("last_time"),
            col("item").count().alias("nratings"),
        ]);

        info!("collecting results");
        let actions = actions.collect()?;

        info!("saving {} records", actions.height());
        save_df_parquet(actions, &self.ratings_out)?;

        Ok(())
    }
}