Conversation
|
@greptile |
| await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=False) | ||
|
|
||
| async def on_spans_end(self, spans: list[Span]) -> None: | ||
| """Batched variant of on_span_end. See on_spans_start for details.""" | ||
| await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=False) |
There was a problem hiding this comment.
return_exceptions=False silently drops errors in the default fan-out
With return_exceptions=False (the default), asyncio.gather raises the first exception immediately and the remaining in-flight coroutines continue running orphaned — their results and exceptions are never observed. The old _process_items code used return_exceptions=True and logged every exception individually. If any processor using the default fallback (not SGP) has a mix of failing and succeeding spans in the same batch, only the first failure is reported; subsequent failures are silently swallowed.
Note: the test helper _make_processor in test_span_queue.py fans out using return_exceptions=True, so tests would not catch this discrepancy.
| await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=False) | |
| async def on_spans_end(self, spans: list[Span]) -> None: | |
| """Batched variant of on_span_end. See on_spans_start for details.""" | |
| await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=False) | |
| await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=True) | |
| async def on_spans_end(self, spans: list[Span]) -> None: | |
| """Batched variant of on_span_end. See on_spans_start for details.""" | |
| await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=True) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/core/tracing/processors/tracing_processor_interface.py
Line: 49-53
Comment:
**`return_exceptions=False` silently drops errors in the default fan-out**
With `return_exceptions=False` (the default), `asyncio.gather` raises the first exception immediately and the remaining in-flight coroutines continue running orphaned — their results and exceptions are never observed. The old `_process_items` code used `return_exceptions=True` and logged every exception individually. If any processor using the default fallback (not SGP) has a mix of failing and succeeding spans in the same batch, only the first failure is reported; subsequent failures are silently swallowed.
Note: the test helper `_make_processor` in `test_span_queue.py` fans out using `return_exceptions=True`, so tests would not catch this discrepancy.
```suggestion
await asyncio.gather(*(self.on_span_start(s) for s in spans), return_exceptions=True)
async def on_spans_end(self, spans: list[Span]) -> None:
"""Batched variant of on_span_end. See on_spans_start for details."""
await asyncio.gather(*(self.on_span_end(s) for s in spans), return_exceptions=True)
```
How can I resolve this? If you propose a fix, please make it concise.| @staticmethod | ||
| async def _process_items(items: list[_SpanQueueItem]) -> None: | ||
| """Process a list of span events concurrently.""" | ||
| """Dispatch a batch of same-event-type items to each processor in one call. |
There was a problem hiding this comment.
Is it guaranteed that all these items are the same event-type?
There was a problem hiding this comment.
Nevermind, see we dispatch them ourselves
| if not items: | ||
| return | ||
|
|
||
| event_type = items[0].event_type |
There was a problem hiding this comment.
Are all the event_types the same?
https://linear.app/scale-epd/issue/AGX1-198/actually-use-sgp-batching-for-spans
Greptile Summary
This PR implements true SGP batch upserts by introducing
on_spans_start/on_spans_endmethods onAsyncTracingProcessorand refactoringAsyncSpanQueue._process_itemsto group spans by processor before dispatching, so each processor receives all spans in one call per drain cycle instead of one HTTP request per span.SGPAsyncTracingProcessoroverrides both batched methods to send a singleupsert_batchrequest per event type.Confidence Score: 5/5
Safe to merge; the batching logic is sound and well-tested, and the one remaining finding is a minor error-visibility concern in the default fallback only used by non-SGP processors.
The primary change (SGP batching) is correctly implemented and thoroughly tested. The only finding is a P2 concern about return_exceptions=False in the default fan-out fallback, which does not affect the SGP processor (it overrides the batched methods) and is not a current defect on the changed path.
src/agentex/lib/core/tracing/processors/tracing_processor_interface.py — the return_exceptions=False in the default on_spans_start/on_spans_end fallback.
Important Files Changed
Sequence Diagram
sequenceDiagram participant T as Tracer participant Q as AsyncSpanQueue participant PI as _process_items participant P as SGPAsyncTracingProcessor participant SGP as SGP API T->>Q: enqueue(START, span1, [P]) T->>Q: enqueue(START, span2, [P]) T->>Q: enqueue(START, span3, [P]) Note over Q: drain batch collects spans Q->>PI: _process_items([item1, item2, item3]) Note over PI: group by processor → {P: [span1, span2, span3]} PI->>P: on_spans_start([span1, span2, span3]) Note over P: build sgp_spans list, store in _spans P->>SGP: upsert_batch(items=[s1, s2, s3]) SGP-->>P: 200 OK T->>Q: enqueue(END, span1, [P]) T->>Q: enqueue(END, span2, [P]) T->>Q: enqueue(END, span3, [P]) Q->>PI: _process_items([end1, end2, end3]) Note over PI: group by processor → {P: [span1, span2, span3]} PI->>P: on_spans_end([span1, span2, span3]) Note over P: pop from _spans, update fields P->>SGP: upsert_batch(items=[s1, s2, s3]) SGP-->>P: 200 OKPrompt To Fix All With AI
Reviews (1): Last reviewed commit: "fix linting issues" | Re-trigger Greptile