Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions temporalio/contrib/openai_agents/_trace_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
get_trace_provider,
)
from agents.tracing.scope import Scope
from agents.tracing.spans import NoOpSpan, Span
from agents.tracing.spans import NoOpSpan

import temporalio.activity
import temporalio.api.common.v1
Expand Down Expand Up @@ -402,48 +402,41 @@ async def signal_external_workflow(
def start_activity(
self, input: temporalio.worker.StartActivityInput
) -> temporalio.workflow.ActivityHandle:
# Use synchronous span pattern to avoid context detachment errors.
# Async callbacks (add_done_callback) fire in different context instances,
# breaking OTel's token validation. Instead, complete span immediately
# at orchestration point - execution time is captured by activity worker.
trace = get_trace_provider().get_current_trace()
span: Span | None = None
if trace:
span = custom_span(
with custom_span(
name="temporal:startActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)

):
pass # Span completes immediately in same context
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_activity(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
return self.next.start_activity(input)

async def start_child_workflow(
self, input: temporalio.worker.StartChildWorkflowInput
) -> temporalio.workflow.ChildWorkflowHandle:
# Use synchronous span pattern - see start_activity for explanation
trace = get_trace_provider().get_current_trace()
span: Span | None = None
if trace:
span = custom_span(
with custom_span(
name="temporal:startChildWorkflow", data={"workflow": input.workflow}
)
span.start(mark_as_current=True)
):
pass # Span completes immediately in same context
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = await self.next.start_child_workflow(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
return await self.next.start_child_workflow(input)

def start_local_activity(
self, input: temporalio.worker.StartLocalActivityInput
) -> temporalio.workflow.ActivityHandle:
# Use synchronous span pattern - see start_activity for explanation
trace = get_trace_provider().get_current_trace()
span: Span | None = None
if trace:
span = custom_span(
with custom_span(
name="temporal:startLocalActivity", data={"activity": input.activity}
)
span.start(mark_as_current=True)
):
pass # Span completes immediately in same context
set_header_from_context(input, temporalio.workflow.payload_converter())
handle = self.next.start_local_activity(input)
if span:
handle.add_done_callback(lambda _: span.finish()) # type: ignore
return handle
return self.next.start_local_activity(input)