Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/restate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from .retry_policy import InvocationRetryPolicy
# pylint: disable=line-too-long
from .context import DurablePromise, RestateDurableFuture, RestateDurableCallFuture, RestateDurableSleepFuture, SendHandle, RunOptions
from .exceptions import TerminalError
from .exceptions import TerminalError, SdkInternalBaseException, is_internal_exception
from .asyncio import as_completed, gather, wait_completed, select

from .endpoint import app
Expand Down Expand Up @@ -67,5 +67,7 @@ def test_harness(app, # type: ignore
"select",
"logging",
"RestateLoggingFilter",
"InvocationRetryPolicy"
"InvocationRetryPolicy",
"SdkInternalBaseException",
"is_internal_exception"
]
47 changes: 45 additions & 2 deletions python/restate/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,53 @@
#
"""This module contains the restate exceptions"""

class TerminalError(Exception):
"""This exception is raised to indicate a termination of the execution"""
# pylint: disable=C0301

class TerminalError(Exception):
"""This exception is thrown to indicate that Restate should not retry."""
def __init__(self, message: str, status_code: int = 500) -> None:
super().__init__(message)
self.message = message
self.status_code = status_code


class SdkInternalBaseException(Exception):
"""This exception is internal, and you should not catch it.
If you need to distinguish with other exceptions, use is_internal_exception."""
def __init__(self, message: str) -> None:
super().__init__(
message +
"""
This exception is safe to ignore. If you see it, you might be using a try/catch all statement.

Don't do:
try:
# Code
except:
# This catches all exceptions, including the SdkInternalBaseException!

Do instead:
try:
# Code
except TerminalError:
# In Restate handlers you typically want to catch TerminalError only

Or remove the try/except altogether if you don't need it.
For further info on error handling, refer to https://docs.restate.dev/develop/python/error-handling
""")

class SuspendedException(SdkInternalBaseException):
"""This exception is raised to indicate that the execution is suspended"""
def __init__(self) -> None:
super().__init__("Invocation got suspended, Restate will resume this invocation when progress can be made.")

class SdkInternalException(SdkInternalBaseException):
"""This exception is raised to indicate that the execution raised a retryable error."""
def __init__(self) -> None:
super().__init__("Invocation attempt raised a retryable error.\n"
"Restate will retry executing this invocation from the point where it left off.")


def is_internal_exception(e) -> bool:
"""Returns true if the exception is an internal Restate exception"""
return isinstance(e, SdkInternalBaseException)
27 changes: 9 additions & 18 deletions python/restate/server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import time

from restate.context import DurablePromise, AttemptFinishedEvent, HandlerType, ObjectContext, Request, RestateDurableCallFuture, RestateDurableFuture, RunAction, SendHandle, RestateDurableSleepFuture, RunOptions, P
from restate.exceptions import TerminalError
from restate.exceptions import TerminalError, SdkInternalBaseException, SdkInternalException, SuspendedException
from restate.handler import Handler, handler_from_callable, invoke_handler
from restate.serde import BytesSerde, DefaultSerde, JsonSerde, Serde
from restate.server_types import ReceiveChannel, Send
Expand Down Expand Up @@ -273,19 +273,6 @@ def update_restate_context_is_replaying(vm: VMWrapper):
"""Update the context var 'restate_context_is_replaying'. This should be called after each vm.sys_*"""
restate_context_is_replaying.set(vm.is_replaying())

async def cancel_current_task():
"""Cancel the current task"""
current_task = asyncio.current_task()
if current_task is not None:
# Cancel through asyncio API
current_task.cancel(
"Cancelled by Restate SDK, you should not call any Context method after this exception is thrown."
)
# Sleep 0 will pop up the cancellation
await asyncio.sleep(0)
else:
raise asyncio.CancelledError("Cancelled by Restate SDK, you should not call any Context method after this exception is thrown.")

# pylint: disable=R0902
class ServerInvocationContext(ObjectContext):
"""This class implements the context for the restate framework based on the server."""
Expand Down Expand Up @@ -327,6 +314,8 @@ async def enter(self):
# pylint: disable=W0718
except asyncio.CancelledError:
pass
except SdkInternalBaseException:
pass
except DisconnectedException:
raise
except Exception as e:
Expand Down Expand Up @@ -393,11 +382,11 @@ async def must_take_notification(self, handle):
await self.take_and_send_output()
# Print this exception, might be relevant for the user
traceback.print_exception(res)
await cancel_current_task()
raise SdkInternalException() from res
if isinstance(res, Suspended):
# We might need to write out something at this point.
await self.take_and_send_output()
await cancel_current_task()
raise SuspendedException()
if isinstance(res, NotReady):
raise ValueError(f"Unexpected value error: {handle}")
if res is None:
Expand All @@ -414,9 +403,9 @@ async def create_poll_or_cancel_coroutine(self, handles: typing.List[int]) -> No
if isinstance(do_progress_response, BaseException):
# Print this exception, might be relevant for the user
traceback.print_exception(do_progress_response)
await cancel_current_task()
raise SdkInternalException() from do_progress_response
if isinstance(do_progress_response, Suspended):
await cancel_current_task()
raise SuspendedException()
if isinstance(do_progress_response, DoProgressAnyCompleted):
# One of the handles completed
return
Expand Down Expand Up @@ -565,6 +554,8 @@ async def create_run_coroutine(self,
self.vm.propose_run_completion_failure(handle, failure)
except asyncio.CancelledError as e:
raise e from None
except SdkInternalBaseException as e:
raise e from None
# pylint: disable=W0718
except Exception as e:
end = time.time()
Expand Down
15 changes: 7 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,23 +784,22 @@ impl ErrorFormatter for PythonErrorFormatter {
fn display_closed_error(&self, f: &mut fmt::Formatter<'_>, event: &str) -> fmt::Result {
write!(f, "Execution is suspended, but the handler is still attempting to make progress (calling '{event}'). This can happen:

* If you don't need to handle task cancellation, just avoid catch all statements. Don't do:
* If you use try/catch all statements.
Don't do:
try:
# Code
except:
# This catches all exceptions, including the asyncio.CancelledError!
# This catches all exceptions, including the SdkInternalBaseException!
# '{event}' <- This operation prints this exception

Do instead:
try:
# Code
except TerminalException:
# In Restate handlers you typically want to catch TerminalException only
except TerminalError:
# In Restate handlers you typically want to catch TerminalError only

* To catch ctx.run/ctx.run_typed errors, check https://docs.restate.dev/develop/python/durable-steps#run for more details.

* If the asyncio.CancelledError is caught, you must not run any Context operation in the except arm.
Check https://docs.python.org/3/library/asyncio-task.html#task-cancellation for more details on task cancellation.
Or remove the try/except altogether if you don't need it.
For further info on error handling, refer to https://docs.restate.dev/develop/python/error-handling

* If you use the context after the handler completed, e.g. moving the context to another thread.
Check https://docs.restate.dev/develop/python/concurrent-tasks for more details on how to create durable concurrent tasks in Python.")
Expand Down