Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
DataFusionError, Result,
};

use super::StatementOptions;
use super::{parse_utils::parse_column_level_option_tuples, StatementOptions};

/// Options for writing parquet files
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -141,13 +141,27 @@ impl TryFrom<(&ConfigOptions, &StatementOptions)> for ParquetWriterOptions {
"bloom_filter_enabled" => builder
.set_bloom_filter_enabled(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
"encoding" => builder
"encoding_default" => builder
.set_encoding(parse_encoding_string(value)?),
"encoding" => {
for (col, col_val) in parse_column_level_option_tuples(value)?.into_iter(){
println!("{} {}", col, col_val);
builder = builder.set_column_encoding(col, parse_encoding_string(&col_val)?);
};
builder
}
"dictionary_enabled" => builder
.set_dictionary_enabled(value.parse()
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?),
"compression" => builder
"compression_default" => builder
.set_compression(parse_compression_string(value)?),
"compression" => {
for (col, col_val) in parse_column_level_option_tuples(value)?.into_iter(){
println!("{} {}", col, col_val);
builder = builder.set_column_compression(col, parse_compression_string(&col_val)?)
}
builder
}
"statistics_enabled" => builder
.set_statistics_enabled(parse_statistics_string(value)?),
"max_statistics_size" => builder
Expand Down
42 changes: 42 additions & 0 deletions datafusion/common/src/file_options/parse_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use parquet::{
basic::{BrotliLevel, GzipLevel, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
schema::types::ColumnPath,
};

use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -181,3 +182,44 @@ pub(crate) fn parse_statistics_string(str_setting: &str) -> Result<EnabledStatis
))),
}
}

/// Parses a string which contains tuples which determine column level parquet settings
/// e.g. '(col1 snappy, col2 zstd(5), col3.nested.nested2 zstd(10))'
pub(crate) fn parse_column_level_option_tuples(
col_options: &str,
) -> Result<Vec<(ColumnPath, String)>> {
println!("{}", col_options);

let col_options = col_options.replace('\'', "").trim().to_owned();

if !(col_options.chars().nth(0) == Some('(')
&& col_options.chars().nth_back(0) == Some(')'))
{
return Err(DataFusionError::Configuration(format!(
"Unable to parse column level options specified as {col_options}"
)));
}

println!("clipped {}", &col_options[1..col_options.len() - 1]);

col_options[1..col_options.len() - 1]
.split(',')
.map(|s| {
s.trim()
.split_once(' ')
.map(|(s1, s2)| {
(
ColumnPath::new(
s1.split('.')
.map(|sn| sn.to_owned())
.collect::<Vec<String>>(),
),
s2.to_owned(),
)
})
.ok_or(DataFusionError::Configuration(format!(
"Unable to parse column level options specified as {col_options}"
)))
})
.collect()
}
12 changes: 7 additions & 5 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2,

# Copy to directory as multiple files
query IT
COPY source_table TO 'test_files/scratch/copy/table' (format parquet, single_file_output false, compression 'zstd(10)');
COPY source_table TO 'test_files/scratch/copy/table' (format parquet, single_file_output false, compression_default 'zstd(10)');
----
2

query TT
EXPLAIN COPY source_table TO 'test_files/scratch/copy/table' (format parquet, single_file_output false, compression 'zstd(10)');
EXPLAIN COPY source_table TO 'test_files/scratch/copy/table' (format parquet, single_file_output false, compression_default 'zstd(10)');
----
logical_plan
CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_output=false options: (compression 'zstd(10)')
CopyTo: format=parquet output_url=test_files/scratch/copy/table single_file_output=false options: (compression_default 'zstd(10)')
--TableScan: source_table projection=[col1, col2]
physical_plan
InsertExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
Expand Down Expand Up @@ -70,7 +70,8 @@ COPY source_table
TO 'test_files/scratch/copy/table_with_options'
(format parquet,
single_file_output false,
compression 'snappy',
compression_default 'snappy',
compression '(col1 snappy, col2 zstd(5), col3.nested.nested2 zstd(10))',
max_row_group_size 12345,
data_pagesize_limit 1234,
write_batch_size 1234,
Expand All @@ -80,7 +81,8 @@ created_by 'DF copy.slt',
column_index_truncate_length 123,
data_page_row_count_limit 1234,
bloom_filter_enabled true,
encoding plain,
encoding_default plain,
encoding '(col1 delta_binary_packed, col2 plain)',
dictionary_enabled false,
statistics_enabled page,
max_statistics_size 123,
Expand Down