1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//! BookCrossing interaction clustering.
use std::path::PathBuf;

use crate::prelude::*;
use polars::prelude::*;

#[derive(Args, Debug)]
#[command(name = "cluster-actions")]
pub struct Cluster {
    /// Cluster ratings.
    #[arg(long = "ratings")]
    ratings: bool,

    /// Cluster actions (implicit feedback).
    #[arg(long = "add-actions")]
    add_actions: bool,

    /// The output file.
    #[arg(short = 'o', long = "output", name = "FILE")]
    outfile: PathBuf,
}

impl Command for Cluster {
    fn exec(&self) -> Result<()> {
        if !self.ratings && !self.add_actions {
            error!("one of --ratings or --add-actions must be specified");
            return Err(anyhow!("no mode specified"));
        }
        require_working_dir("bx")?;

        let isbns = LazyFrame::scan_parquet("../book-links/isbn-clusters.parquet", default())?;
        let isbns = isbns.select(&[col("isbn"), col("cluster")]);

        let ratings = LazyCsvReader::new("cleaned-ratings.csv")
            .has_header(true)
            .finish()?;
        let ratings = if self.ratings {
            ratings.filter(col("rating").gt(0))
        } else {
            ratings
        };
        let joined = ratings.join(isbns, &[col("isbn")], &[col("isbn")], JoinType::Inner.into());
        let grouped = joined.group_by(&[col("user"), col("cluster").alias("item")]);
        let agg = if self.ratings {
            grouped.agg(&[
                col("rating").median().alias("rating"),
                col("cluster").count().alias("nratings"),
            ])
        } else {
            grouped.agg(&[col("cluster").count().alias("nactions")])
        };

        info!("collecting results");
        let results = agg.collect()?;

        info!("writing to {:?}", &self.outfile);
        save_df_parquet(results, &self.outfile)?;

        Ok(())
    }
}