Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ pub(crate) fn spawn_buffered(
builder.spawn(async move {
while let Some(item) = input.next().await {
if sender.send(item).await.is_err() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason an item can't be sent via a channel I think is if the other end (the Receiver) was dropped. This can certainly happen if some other part of the query errors, but I also think it can happen with the plan shuts down early due to a LIMIT or something similar.

Thus I am not sure we should propagate an error in this case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Removed the error.

return;
// receiver dropped when query is shutdown early (e.g., limit) or error,
// no need to return propagate the send error.
return Ok(());
}
}

Ok(())
});

builder.build()
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,11 @@ fn read_spill_as_stream(
let sender = builder.tx();

builder.spawn_blocking(move || {
if let Err(e) = read_spill(sender, path.path()) {
let result = read_spill(sender, path.path());
if let Err(e) = &result {
error!("Failure while reading spill file: {:?}. Error: {}", path, e);
}
result
});

Ok(builder.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,8 @@ mod tests {
// This causes the MergeStream to wait for more input
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}

Ok(())
});

streams.push(builder.build());
Expand Down
27 changes: 19 additions & 8 deletions datafusion/core/src/physical_plan/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use std::task::Poll;

use crate::physical_plan::displayable;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::internal_err;
use datafusion_common::DataFusionError;
use datafusion_common::Result;
use datafusion_common::{exec_err, internal_err};
use datafusion_execution::TaskContext;
use futures::stream::BoxStream;
use futures::{Future, Stream, StreamExt};
Expand All @@ -50,7 +50,7 @@ pub struct RecordBatchReceiverStreamBuilder {
tx: Sender<Result<RecordBatch>>,
rx: Receiver<Result<RecordBatch>>,
schema: SchemaRef,
join_set: JoinSet<()>,
join_set: JoinSet<Result<()>>,
}

impl RecordBatchReceiverStreamBuilder {
Expand Down Expand Up @@ -78,7 +78,7 @@ impl RecordBatchReceiverStreamBuilder {
/// retrieved from `Self::tx`
pub fn spawn<F>(&mut self, task: F)
where
F: Future<Output = ()>,
F: Future<Output = Result<()>>,
F: Send + 'static,
{
self.join_set.spawn(task);
Expand All @@ -91,7 +91,7 @@ impl RecordBatchReceiverStreamBuilder {
/// retrieved from `Self::tx`
pub fn spawn_blocking<F>(&mut self, f: F)
where
F: FnOnce(),
F: FnOnce() -> Result<()>,
F: Send + 'static,
{
self.join_set.spawn_blocking(f);
Expand Down Expand Up @@ -120,7 +120,7 @@ impl RecordBatchReceiverStreamBuilder {
"Stopping execution: error executing input: {}",
displayable(input.as_ref()).one_line()
);
return;
return Ok(());
}
Ok(stream) => stream,
};
Expand All @@ -137,7 +137,7 @@ impl RecordBatchReceiverStreamBuilder {
"Stopping execution: output is gone, plan cancelling: {}",
displayable(input.as_ref()).one_line()
);
return;
return Ok(());
}

// stop after the first error is encontered (don't
Expand All @@ -147,9 +147,11 @@ impl RecordBatchReceiverStreamBuilder {
"Stopping execution: plan returned error: {}",
displayable(input.as_ref()).one_line()
);
return;
return Ok(());
}
}

Ok(())
});
}

Expand All @@ -169,7 +171,16 @@ impl RecordBatchReceiverStreamBuilder {
let check = async move {
while let Some(result) = join_set.join_next().await {
match result {
Ok(()) => continue, // nothing to report
Ok(task_result) => {
match task_result {
// nothing to report
Ok(_) => continue,
// This means a blocking task error
Err(e) => {
return Some(exec_err!("Spawned Task error: {e}"));
}
}
}
// This means a tokio task error, likely a panic
Err(e) => {
if e.is_panic() {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/test/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ impl ExecutionPlan for MockExec {
println!("ERROR batch via delayed stream: {e}");
}
}

Ok(())
});
// returned stream simply reads off the rx stream
Ok(builder.build())
Expand Down Expand Up @@ -364,6 +366,8 @@ impl ExecutionPlan for BarrierExec {
println!("ERROR batch via barrier stream stream: {e}");
}
}

Ok(())
});

// returned stream simply reads off the rx stream
Expand Down