-
Notifications
You must be signed in to change notification settings - Fork 16
Fix segfault on unexpected node shutdown #1201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,13 +22,11 @@ namespace ErrorCodes | |
| RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext( | ||
| RemoteQueryExecutor & executor_, | ||
| bool suspend_when_query_sent_, | ||
| bool read_packet_type_separately_, | ||
| bool allow_retries_in_cluster_requests_) | ||
| bool read_packet_type_separately_) | ||
| : AsyncTaskExecutor(std::make_unique<Task>(*this)) | ||
| , executor(executor_) | ||
| , suspend_when_query_sent(suspend_when_query_sent_) | ||
| , read_packet_type_separately(read_packet_type_separately_) | ||
| , allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_) | ||
| { | ||
| if (-1 == pipe2(pipe_fd, O_NONBLOCK)) | ||
| throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe"); | ||
|
|
@@ -63,46 +61,38 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus | |
| { | ||
| while (true) | ||
| { | ||
| try | ||
| { | ||
| read_context.has_read_packet_part = PacketPart::None; | ||
|
|
||
| if (read_context.read_packet_type_separately) | ||
| { | ||
| read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); | ||
| read_context.has_read_packet_part = PacketPart::Type; | ||
| suspend_callback(); | ||
| } | ||
| read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); | ||
| read_context.has_read_packet_part = PacketPart::Body; | ||
| if (read_context.packet.type == Protocol::Server::Data) | ||
| read_context.has_data_packets = true; | ||
| } | ||
| catch (const Exception & e) | ||
| read_context.has_read_packet_part = PacketPart::None; | ||
|
|
||
| if (read_context.read_packet_type_separately) | ||
| { | ||
| /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. | ||
| /// If initiator did not process any data packets before, this fact can be ignored. | ||
| /// Unprocessed tasks will be executed on other nodes. | ||
| if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF | ||
| && !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards()) | ||
| { | ||
| read_context.has_read_packet_part = PacketPart::None; | ||
| } | ||
| else | ||
| throw; | ||
| read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback); | ||
| read_context.has_read_packet_part = PacketPart::Type; | ||
| suspend_callback(); | ||
| } | ||
| read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback); | ||
| read_context.has_read_packet_part = PacketPart::Body; | ||
| if (read_context.packet.type == Protocol::Server::Data) | ||
| read_context.has_data_packets = true; | ||
|
|
||
| suspend_callback(); | ||
| } | ||
| } | ||
| catch (const Exception &) | ||
| catch (const Exception & e) | ||
| { | ||
| if (!read_context.allow_retries_in_cluster_requests) | ||
| /// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes. | ||
| /// If initiator did not process any data packets before, this fact can be ignored. | ||
| /// Unprocessed tasks will be executed on other nodes. | ||
| if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF | ||
| && !read_context.has_data_packets.load() | ||
| && read_context.executor.skipUnavailableShards()) | ||
| { | ||
| read_context.packet.type = Protocol::Server::ConnectionLost; | ||
| read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); | ||
| read_context.has_read_packet_part = PacketPart::Body; | ||
| suspend_callback(); | ||
|
Comment on lines
+85
to
+92
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When handling an early EOF from a shard ( Useful? React with 👍 / 👎. |
||
| } | ||
| else | ||
| throw; | ||
| read_context.packet.type = Protocol::Server::ConnectionLost; | ||
| read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode()); | ||
| read_context.has_read_packet_part = PacketPart::Body; | ||
| suspend_callback(); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With
skip_unavailable_shardsenabled, an ATTEMPT_TO_READ_AFTER_EOF before any data now produces aConnectionLostpacket that is rethrown inRemoteQueryExecutor(caseProtocol::Server::ConnectionLostaround lines 732–742 ofRemoteQueryExecutor.cpp). The previous logic swallowed this exception when no data had been read so other shards could continue. This regression causes queries that should skip an unreachable shard to fail whenever the shard drops the socket before sending data.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception is rethrown only when tasks were not rescheduled on other replicas. Without rescheduling tasks can be lost forever, and response contain only part of data.