Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/a2a/client/transports/http_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,22 @@ async def send_http_stream_request(
async with aconnect_sse(
httpx_client, method, url, **kwargs
) as event_source:
event_source.response.raise_for_status()
try:
event_source.response.raise_for_status()
except httpx.HTTPStatusError as e:
# Read upfront streaming error content immediately, otherwise lower-level handlers
# (e.g. response.json()) crash with 'ResponseNotRead' Access errors.
await event_source.response.aread()
raise e

# If the response is not a stream, read it standardly (e.g., upfront JSON-RPC error payload)
if 'text/event-stream' not in event_source.response.headers.get(
'content-type', ''
):
content = await event_source.response.aread()
yield content.decode('utf-8')
return

async for sse in event_source.aiter_sse():
if not sse.data:
continue
Expand Down
26 changes: 13 additions & 13 deletions src/a2a/compat/v0_3/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.types.a2a_pb2 import AgentCard
from a2a.utils.errors import A2AError, InvalidParamsError
from a2a.utils.helpers import maybe_await, validate, validate_async_generator
from a2a.utils.helpers import maybe_await, validate


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -170,17 +170,17 @@ async def _handler(
context, _handler, a2a_v0_3_pb2.SendMessageResponse()
)

@validate_async_generator(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
async def SendStreamingMessage(
self,
request: a2a_v0_3_pb2.SendMessageRequest,
context: grpc.aio.ServicerContext,
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
"""Handles the 'SendStreamingMessage' gRPC method (v0.3)."""

@validate(
lambda _: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
async def _handler(
server_context: ServerCallContext,
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
Expand Down Expand Up @@ -233,17 +233,17 @@ async def _handler(

return await self._handle_unary(context, _handler, a2a_v0_3_pb2.Task())

@validate_async_generator(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
async def TaskSubscription(
self,
request: a2a_v0_3_pb2.TaskSubscriptionRequest,
context: grpc.aio.ServicerContext,
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
"""Handles the 'TaskSubscription' gRPC method (v0.3)."""

@validate(
lambda _: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
async def _handler(
server_context: ServerCallContext,
) -> AsyncIterable[a2a_v0_3_pb2.StreamResponse]:
Expand All @@ -260,17 +260,17 @@ async def _handler(
async for item in self._handle_stream(context, _handler):
yield item

@validate(
lambda self: self.agent_card.capabilities.push_notifications,
'Push notifications are not supported by the agent',
)
async def CreateTaskPushNotificationConfig(
self,
request: a2a_v0_3_pb2.CreateTaskPushNotificationConfigRequest,
context: grpc.aio.ServicerContext,
) -> a2a_v0_3_pb2.TaskPushNotificationConfig:
"""Handles the 'CreateTaskPushNotificationConfig' gRPC method (v0.3)."""

@validate(
lambda _: self.agent_card.capabilities.push_notifications,
'Push notifications are not supported by the agent',
)
async def _handler(
server_context: ServerCallContext,
) -> a2a_v0_3_pb2.TaskPushNotificationConfig:
Expand Down
5 changes: 2 additions & 3 deletions src/a2a/compat/v0_3/rest_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from a2a.utils import constants
from a2a.utils.helpers import (
validate,
validate_async_generator,
validate_version,
)
from a2a.utils.telemetry import SpanKind, trace_class
Expand Down Expand Up @@ -85,7 +84,7 @@ async def on_message_send(
return MessageToDict(pb2_v03_resp)

@validate_version(constants.PROTOCOL_VERSION_0_3)
@validate_async_generator(
@validate(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
Expand Down Expand Up @@ -143,7 +142,7 @@ async def on_cancel_task(
return MessageToDict(pb2_v03_task)

@validate_version(constants.PROTOCOL_VERSION_0_3)
@validate_async_generator(
@validate(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
Expand Down
Loading
Loading