Skip to content

batch SGP span upserts#331

Open
alvinkam2001 wants to merge 2 commits intomainfrom
alvinkam/sgp-batched-upsert
Open

batch SGP span upserts#331
alvinkam2001 wants to merge 2 commits intomainfrom
alvinkam/sgp-batched-upsert

Conversation

@alvinkam2001
Copy link
Copy Markdown

@alvinkam2001 alvinkam2001 commented Apr 21, 2026

https://linear.app/scale-epd/issue/AGX1-198/actually-use-sgp-batching-for-spans

Greptile Summary

This PR implements true SGP batch upserts by introducing on_spans_start/on_spans_end methods on AsyncTracingProcessor and refactoring AsyncSpanQueue._process_items to group spans by processor before dispatching, so each processor receives all spans in one call per drain cycle instead of one HTTP request per span. SGPAsyncTracingProcessor overrides both batched methods to send a single upsert_batch request per event type.

Confidence Score: 5/5

Safe to merge; the batching logic is sound and well-tested, and the one remaining finding is a minor error-visibility concern in the default fallback only used by non-SGP processors.

The primary change (SGP batching) is correctly implemented and thoroughly tested. The only finding is a P2 concern about return_exceptions=False in the default fan-out fallback, which does not affect the SGP processor (it overrides the batched methods) and is not a current defect on the changed path.

src/agentex/lib/core/tracing/processors/tracing_processor_interface.py — the return_exceptions=False in the default on_spans_start/on_spans_end fallback.

Important Files Changed

Filename Overview
src/agentex/lib/core/tracing/processors/tracing_processor_interface.py Adds batched on_spans_start/on_spans_end methods with a default fan-out fallback; default uses return_exceptions=False which silently drops errors after the first failure in a batch.
src/agentex/lib/core/tracing/span_queue.py _process_items refactored to group spans by processor and dispatch to batched methods, enabling single HTTP call per drain cycle per processor; error handling upgraded to processor-level granularity.
src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py Implements on_spans_start/on_spans_end overrides for true batch upserts; on_span_start/on_span_end now delegate to the batched variants. Span tracking moved to before the upsert call (minor ordering change from original).
tests/lib/core/tracing/processors/test_sgp_tracing_processor.py Adds two new tests verifying that N spans produce exactly one upsert_batch HTTP call for both start and end paths.
tests/lib/core/tracing/test_span_queue.py Updates mock processor to implement on_spans_start/on_spans_end fan-out; adds TestAsyncSpanQueueBatchedDispatch class verifying single-call-per-processor dispatch.

Sequence Diagram

sequenceDiagram
    participant T as Tracer
    participant Q as AsyncSpanQueue
    participant PI as _process_items
    participant P as SGPAsyncTracingProcessor
    participant SGP as SGP API

    T->>Q: enqueue(START, span1, [P])
    T->>Q: enqueue(START, span2, [P])
    T->>Q: enqueue(START, span3, [P])
    Note over Q: drain batch collects spans
    Q->>PI: _process_items([item1, item2, item3])
    Note over PI: group by processor → {P: [span1, span2, span3]}
    PI->>P: on_spans_start([span1, span2, span3])
    Note over P: build sgp_spans list, store in _spans
    P->>SGP: upsert_batch(items=[s1, s2, s3])
    SGP-->>P: 200 OK

    T->>Q: enqueue(END, span1, [P])
    T->>Q: enqueue(END, span2, [P])
    T->>Q: enqueue(END, span3, [P])
    Q->>PI: _process_items([end1, end2, end3])
    Note over PI: group by processor → {P: [span1, span2, span3]}
    PI->>P: on_spans_end([span1, span2, span3])
    Note over P: pop from _spans, update fields
    P->>SGP: upsert_batch(items=[s1, s2, s3])
    SGP-->>P: 200 OK
Loading

Fix All in Cursor Fix All in Claude Code

Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/tracing_processor_interface.py
Line: 49-53

Comment:
**`return_exceptions=False` silently drops errors in the default fan-out**

With `return_exceptions=False` (the default), `asyncio.gather` raises the first exception immediately and the remaining in-flight coroutines continue running orphaned — their results and exceptions are never observed. The old `_process_items` code used `return_exceptions=True` and logged every exception individually. If any processor using the default fallback (not SGP) has a mix of failing and succeeding spans in the same batch, only the first failure is reported; subsequent failures are silently swallowed.

Note: the test helper `_make_processor` in `test_span_queue.py` fans out using `return_exceptions=True`, so tests would not catch this discrepancy.

```suggestion
        await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=True)

    async def on_spans_end(self, spans: list[Span]) -> None:
        """Batched variant of on_span_end.  See on_spans_start for details."""
        await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=True)
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (1): Last reviewed commit: "fix linting issues" | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

@declan-scale
Copy link
Copy Markdown
Contributor

@greptile

Comment on lines +49 to +53
await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=False)

async def on_spans_end(self, spans: list[Span]) -> None:
"""Batched variant of on_span_end. See on_spans_start for details."""
await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=False)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 return_exceptions=False silently drops errors in the default fan-out

With return_exceptions=False (the default), asyncio.gather raises the first exception immediately and the remaining in-flight coroutines continue running orphaned — their results and exceptions are never observed. The old _process_items code used return_exceptions=True and logged every exception individually. If any processor using the default fallback (not SGP) has a mix of failing and succeeding spans in the same batch, only the first failure is reported; subsequent failures are silently swallowed.

Note: the test helper _make_processor in test_span_queue.py fans out using return_exceptions=True, so tests would not catch this discrepancy.

Suggested change
await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=False)
async def on_spans_end(self, spans: list[Span]) -> None:
"""Batched variant of on_span_end. See on_spans_start for details."""
await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=False)
await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=True)
async def on_spans_end(self, spans: list[Span]) -> None:
"""Batched variant of on_span_end. See on_spans_start for details."""
await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=True)
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/tracing_processor_interface.py
Line: 49-53

Comment:
**`return_exceptions=False` silently drops errors in the default fan-out**

With `return_exceptions=False` (the default), `asyncio.gather` raises the first exception immediately and the remaining in-flight coroutines continue running orphaned — their results and exceptions are never observed. The old `_process_items` code used `return_exceptions=True` and logged every exception individually. If any processor using the default fallback (not SGP) has a mix of failing and succeeding spans in the same batch, only the first failure is reported; subsequent failures are silently swallowed.

Note: the test helper `_make_processor` in `test_span_queue.py` fans out using `return_exceptions=True`, so tests would not catch this discrepancy.

```suggestion
        await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=True)

    async def on_spans_end(self, spans: list[Span]) -> None:
        """Batched variant of on_span_end.  See on_spans_start for details."""
        await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=True)
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code

@staticmethod
async def _process_items(items: list[_SpanQueueItem]) -> None:
"""Process a list of span events concurrently."""
"""Dispatch a batch of same-event-type items to each processor in one call.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that all these items are the same event-type?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, see we dispatch them ourselves

if not items:
return

event_type = items[0].event_type
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all the event_types the same?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants