1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
//! Extract basic information from a Parquet file.
use std::fmt::Debug;
use std::fs::File;
use std::io::{stdout, Read, Seek, Write};
use std::mem::drop;

use polars_arrow::datatypes::Field;
use serde::Serialize;

use friendly::{bytes, scalar};
use polars_parquet::read::schema::parquet_to_arrow_schema;
use polars_parquet::read::*;

use crate::prelude::*;

use super::Command;

/// Extract basic information from a Parquet file.
#[derive(Args, Debug)]
#[command(name = "collect-isbns")]
pub struct PQInfo {
    /// Check the length by decoding the file.
    #[arg(long = "check-length")]
    check_length: bool,

    /// Path to the output JSON file.
    #[arg(short = 'o', long = "output")]
    out_file: Option<PathBuf>,

    /// Path to the Parquet file.
    #[arg(name = "FILE")]
    source_file: PathBuf,
}

#[derive(Debug, Serialize)]
struct InfoStruct {
    row_count: usize,
    file_size: u64,
    fields: Vec<FieldStruct>,
}

#[derive(Debug, Serialize)]
struct FieldStruct {
    name: String,
    data_type: String,
    nullable: bool,
}

impl From<&Field> for FieldStruct {
    fn from(f: &Field) -> Self {
        FieldStruct {
            name: f.name.clone(),
            data_type: format!("{:?}", f.data_type),
            nullable: f.is_nullable,
        }
    }
}

fn check_length<R: Read + Seek>(reader: &mut FileReader<R>, expected: usize) -> Result<()> {
    let mut len = 0;
    for chunk in reader {
        let chunk = chunk?;
        let clen = chunk.len();
        let arrays = chunk.into_arrays();
        let types = arrays.iter().map(|a| a.data_type()).collect::<Vec<_>>();
        debug!("chunk length: {}", clen);
        debug!("chunk types: {:?}", types);
        len += clen;
    }

    if len != expected {
        warn!("expected {} rows but decoded {}", expected, len);
    }

    Ok(())
}

impl Command for PQInfo {
    fn exec(&self) -> Result<()> {
        info!("reading {:?}", self.source_file);

        let mut pqf = File::open(&self.source_file)?;
        let fmeta = pqf.metadata()?;
        info!("file size: {}", bytes(fmeta.len()));

        let meta = read_metadata(&mut pqf)?;

        info!("row count: {}", scalar(meta.num_rows));
        info!("row groups: {}", meta.row_groups.len());
        let rc2: usize = meta.row_groups.iter().map(|rg| rg.num_rows()).sum();
        if rc2 != meta.num_rows {
            warn!("row group total {} != file total {}", rc2, meta.num_rows);
        }

        info!("decoding schema");
        let schema = meta.schema();
        let fields = parquet_to_arrow_schema(schema.fields());

        let out = stdout();
        let mut ol = out.lock();
        for field in &fields {
            writeln!(&mut ol, "{:?}", field)?;
        }
        drop(ol);

        if let Some(ref file) = self.out_file {
            let mut out = File::create(file)?;
            let info = InfoStruct {
                row_count: meta.num_rows,
                file_size: fmeta.len() as u64,
                fields: fields.iter().map(|f| f.into()).collect(),
            };

            serde_json::to_writer_pretty(&mut out, &info)?;
        }

        if self.check_length {
            let schema = infer_schema(&meta)?;
            let mut reader = FileReader::new(pqf, meta.row_groups, schema, None, None, None);
            check_length(&mut reader, meta.num_rows)?;
        }

        Ok(())
    }
}