bookdata/cli/amazon/
cluster_ratings.rsuse crate::prelude::*;
use polars::prelude::*;
#[derive(Args, Debug)]
#[command(name = "cluster-ratings")]
pub struct ClusterRatings {
#[arg(short = 'o', long = "output", name = "FILE")]
ratings_out: PathBuf,
#[arg(name = "INPUT")]
infile: PathBuf,
}
impl Command for ClusterRatings {
fn exec(&self) -> Result<()> {
let isbns = LazyFrame::scan_parquet("book-links/isbn-clusters.parquet", default())?;
let isbns = isbns.select(&[col("isbn"), col("cluster")]);
let ratings = LazyFrame::scan_parquet(&self.infile, default())?;
let joined = ratings.join(isbns, &[col("asin")], &[col("isbn")], JoinType::Inner.into());
let joined = joined
.select(&[
col("user"),
col("cluster").alias("item"),
col("rating"),
col("timestamp"),
])
.sort("timestamp", default());
let actions = joined.group_by(&[col("user"), col("item")]).agg(&[
col("rating").median().alias("rating"),
col("rating").last().alias("last_rating"),
col("timestamp").min().alias("first_time"),
col("timestamp").max().alias("last_time"),
col("item").count().alias("nratings"),
]);
info!("collecting results");
let actions = actions.collect()?;
info!("saving {} records", actions.height());
save_df_parquet(actions, &self.ratings_out)?;
Ok(())
}
}