Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::plan::TableScanPlan;
use datafusion::logical_plan::LogicalPlan;
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScanPlan};
use datafusion::prelude::{AvroReadOptions, CsvReadOptions};
use datafusion::sql::parser::FileType;

Expand Down Expand Up @@ -270,13 +269,13 @@ impl BallistaContext {

let plan = ctx.create_logical_plan(sql)?;
match plan {
LogicalPlan::CreateExternalTable {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
} => match file_type {
}) => match file_type {
FileType::CSV => {
self.register_csv(
name,
Expand Down
8 changes: 4 additions & 4 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use datafusion::logical_plan::window_frames::{
};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, digest, exp, floor, ln, log10, log2, round, signum,
sin, sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinConstraint, JoinType,
LogicalPlan, LogicalPlanBuilder, Operator,
sin, sqrt, tan, trunc, Column, CreateExternalTable, DFField, DFSchema, Expr,
JoinConstraint, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
Expand Down Expand Up @@ -271,13 +271,13 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
let pb_file_type: protobuf::FileType =
create_extern_table.file_type.try_into()?;

Ok(LogicalPlan::CreateExternalTable {
Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {
schema: pb_schema.try_into()?,
name: create_extern_table.name.clone(),
location: create_extern_table.location.clone(),
file_type: pb_file_type.into(),
has_header: create_extern_table.has_header,
})
}))
}
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan = convert_box_required!(analyze.input)?;
Expand Down
18 changes: 10 additions & 8 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ mod roundtrip_tests {
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit},
datasource::object_store::local::LocalFileSystem,
logical_plan::{
col, Expr, LogicalPlan, LogicalPlanBuilder, Partitioning, ToDFSchema,
col, CreateExternalTable, Expr, LogicalPlan, LogicalPlanBuilder,
Partitioning, ToDFSchema,
},
physical_plan::functions::BuiltinScalarFunction::Sqrt,
prelude::*,
Expand Down Expand Up @@ -655,13 +656,14 @@ mod roundtrip_tests {
];

for file in filetypes.iter() {
let create_table_node = LogicalPlan::CreateExternalTable {
schema: df_schema_ref.clone(),
name: String::from("TestName"),
location: String::from("employee.csv"),
file_type: *file,
has_header: true,
};
let create_table_node =
LogicalPlan::CreateExternalTable(CreateExternalTable {
schema: df_schema_ref.clone(),
name: String::from("TestName"),
location: String::from("employee.csv"),
file_type: *file,
has_header: true,
});

roundtrip_test!(create_table_node);
}
Expand Down
12 changes: 6 additions & 6 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingTable;
use datafusion::logical_plan::{
exprlist_to_fields,
plan::TableScanPlan,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
Column, CreateExternalTable, Expr, JoinConstraint, JoinType, LogicalPlan,
TableScanPlan,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
Expand Down Expand Up @@ -945,13 +945,13 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
},
)),
}),
LogicalPlan::CreateExternalTable {
LogicalPlan::CreateExternalTable(CreateExternalTable {
name,
location,
file_type,
has_header,
schema: df_schema,
} => {
}) => {
use datafusion::sql::parser::FileType;

let pb_file_type: protobuf::FileType = match file_type {
Expand Down Expand Up @@ -1009,10 +1009,10 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
))),
})
}
LogicalPlan::CreateMemoryTable { .. } => Err(proto_error(
LogicalPlan::CreateMemoryTable(_) => Err(proto_error(
"Error converting CreateMemoryTable. Not yet supported in Ballista",
)),
LogicalPlan::DropTable { .. } => Err(proto_error(
LogicalPlan::DropTable(_) => Err(proto_error(
"Error converting DropTable. Not yet supported in Ballista",
)),
}
Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl QueryPlanner for BallistaQueryPlanner {
_ctx_state: &ExecutionContextState,
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
match logical_plan {
LogicalPlan::CreateExternalTable { .. } => {
LogicalPlan::CreateExternalTable(_) => {
// table state is managed locally in the BallistaContext, not in the scheduler
Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
}
Expand Down
13 changes: 7 additions & 6 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::execution::dataframe_impl::DataFrameImpl;
use crate::logical_plan::{
FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
CreateExternalTable, CreateMemoryTable, DropTable, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::constant_folding::ConstantFolding;
Expand Down Expand Up @@ -191,13 +192,13 @@ impl ExecutionContext {
pub async fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let plan = self.create_logical_plan(sql)?;
match plan {
LogicalPlan::CreateExternalTable {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref schema,
ref name,
ref location,
ref file_type,
ref has_header,
} => {
}) => {
let file_format = match file_type {
FileType::CSV => {
Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
Expand Down Expand Up @@ -241,7 +242,7 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}

LogicalPlan::CreateMemoryTable { input, name } => {
LogicalPlan::CreateMemoryTable(CreateMemoryTable { name, input }) => {
let plan = self.optimize(&input)?;
let physical = Arc::new(DataFrameImpl::new(self.state.clone(), &plan));

Expand All @@ -256,7 +257,7 @@ impl ExecutionContext {
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}

LogicalPlan::DropTable { name, if_exist, .. } => {
LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => {
let returned = self.deregister_table(name.as_str())?;
if !if_exist && returned.is_none() {
Err(DataFusionError::Execution(format!(
Expand Down Expand Up @@ -1174,7 +1175,7 @@ impl FunctionRegistry for ExecutionContextState {
#[cfg(test)]
mod tests {
use super::*;
use crate::logical_plan::plan::TableScanPlan;
use crate::logical_plan::TableScanPlan;
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::{make_scalar_function, Volatility};
use crate::physical_plan::{collect, collect_partitioned};
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ pub use expr::{
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
JoinConstraint, JoinType, LogicalPlan, Partitioning, PlanType, PlanVisitor,
CreateExternalTable, CreateMemoryTable, DropTable, JoinConstraint, JoinType,
LogicalPlan, Partitioning, PlanType, PlanVisitor, TableScanPlan,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
117 changes: 71 additions & 46 deletions datafusion/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,41 @@ pub struct TableScanPlan {
pub limit: Option<usize>,
}

/// Creates an in memory table.
#[derive(Clone)]
pub struct CreateMemoryTable {
/// The table name
pub name: String,
/// The logical plan
pub input: Arc<LogicalPlan>,
}

/// Creates an external table.
#[derive(Clone)]
pub struct CreateExternalTable {
/// The table schema
pub schema: DFSchemaRef,
/// The table name
pub name: String,
/// The physical location
pub location: String,
/// The file type of physical file
pub file_type: FileType,
/// Whether the CSV file contains a header
pub has_header: bool,
}

/// Drops a table.
#[derive(Clone)]
pub struct DropTable {
/// The table name
pub name: String,
/// If the table exists
pub if_exist: bool,
/// Dummy schema
pub schema: DFSchemaRef,
}

/// Produces a relation with string representations of
/// various parts of the plan
#[derive(Clone)]
Expand Down Expand Up @@ -230,34 +265,11 @@ pub enum LogicalPlan {
input: Arc<LogicalPlan>,
},
/// Creates an external table.
CreateExternalTable {
/// The table schema
schema: DFSchemaRef,
/// The table name
name: String,
/// The physical location
location: String,
/// The file type of physical file
file_type: FileType,
/// Whether the CSV file contains a header
has_header: bool,
},
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
CreateMemoryTable {
/// The table name
name: String,
/// The logical plan
input: Arc<LogicalPlan>,
},
CreateMemoryTable(CreateMemoryTable),
/// Drops a table.
DropTable {
/// The table name
name: String,
/// If the table exists
if_exist: bool,
/// Dummy schema
schema: DFSchemaRef,
},
DropTable(DropTable),
/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand Down Expand Up @@ -295,13 +307,17 @@ impl LogicalPlan {
LogicalPlan::CrossJoin { schema, .. } => schema,
LogicalPlan::Repartition { input, .. } => input.schema(),
LogicalPlan::Limit { input, .. } => input.schema(),
LogicalPlan::CreateExternalTable { schema, .. } => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
LogicalPlan::Union { schema, .. } => schema,
LogicalPlan::CreateMemoryTable { input, .. } => input.schema(),
LogicalPlan::DropTable { schema, .. } => schema,
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
input.schema()
}
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
}
}

Expand Down Expand Up @@ -342,13 +358,15 @@ impl LogicalPlan {
LogicalPlan::Explain(ExplainPlan { schema, .. })
| LogicalPlan::Analyze(AnalyzePlan { schema, .. })
| LogicalPlan::EmptyRelation { schema, .. }
| LogicalPlan::CreateExternalTable { schema, .. } => vec![schema],
| LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
vec![schema]
}
LogicalPlan::Limit { input, .. }
| LogicalPlan::Repartition { input, .. }
| LogicalPlan::Sort { input, .. }
| LogicalPlan::CreateMemoryTable { input, .. }
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
| LogicalPlan::Filter { input, .. } => input.all_schemas(),
LogicalPlan::DropTable { .. } => vec![],
LogicalPlan::DropTable(_) => vec![],
}
}

Expand Down Expand Up @@ -393,9 +411,9 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Limit { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::CreateMemoryTable { .. }
| LogicalPlan::DropTable { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::Explain { .. }
Expand All @@ -422,13 +440,15 @@ impl LogicalPlan {
LogicalPlan::Union { inputs, .. } => inputs.iter().collect(),
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
LogicalPlan::CreateMemoryTable { input, .. } => vec![input],
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
vec![input]
}
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::DropTable { .. } => vec![],
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::DropTable(_) => vec![],
}
}

Expand Down Expand Up @@ -561,7 +581,9 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit { input, .. } => input.accept(visitor)?,
LogicalPlan::CreateMemoryTable { input, .. } => input.accept(visitor)?,
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::Extension(extension) => {
for input in extension.node.inputs() {
if !input.accept(visitor)? {
Expand All @@ -576,8 +598,8 @@ impl LogicalPlan {
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable { .. }
| LogicalPlan::DropTable { .. } => true,
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::DropTable(_) => true,
};
if !recurse {
return Ok(false);
Expand Down Expand Up @@ -889,15 +911,18 @@ impl LogicalPlan {
}
},
LogicalPlan::Limit { ref n, .. } => write!(f, "Limit: {}", n),
LogicalPlan::CreateExternalTable { ref name, .. } => {
LogicalPlan::CreateExternalTable(CreateExternalTable {
ref name,
..
}) => {
write!(f, "CreateExternalTable: {:?}", name)
}
LogicalPlan::CreateMemoryTable { ref name, .. } => {
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name, ..
}) => {
write!(f, "CreateMemoryTable: {:?}", name)
}
LogicalPlan::DropTable {
ref name, if_exist, ..
} => {
LogicalPlan::DropTable(DropTable { name, if_exist, .. }) => {
write!(f, "DropTable: {:?} if not exist:={}", name, if_exist)
}
LogicalPlan::Explain { .. } => write!(f, "Explain"),
Expand Down
Loading