Event Streaming¶
Real-time event streaming lets dashboards, monitoring tools, and automation subscribe to action outcomes as they happen -- without polling.
Acteon exposes a Server-Sent Events (SSE) endpoint at GET /v1/stream that pushes events as actions flow through the gateway pipeline.
How It Works¶
- A client opens an SSE connection to
GET /v1/stream - As actions are dispatched, the gateway broadcasts events to all subscribers
- Events are filtered server-side based on the caller's tenant grants and optional query-parameter filters
- Each event is delivered as a JSON-encoded SSE frame
event: action_dispatched
id: 550e8400-e29b-41d4-a716-446655440000
data: {"id":"550e8400-...","timestamp":"2026-02-07T14:30:00Z","type":"action_dispatched","outcome":{"Executed":{"status":"Success","body":null,"headers":{}}},"provider":"email","namespace":"alerts","tenant":"acme","action_type":"send_email","action_id":"661f9511-..."}
event: group_flushed
id: 771f0622-f30c-52e5-b827-557766551111
data: {"id":"771f0622-...","timestamp":"2026-02-07T14:30:05Z","type":"group_flushed","group_id":"grp-abc","event_count":5,"namespace":"alerts","tenant":"acme"}
Event Types¶
SSE event: tag | Description |
|---|---|
action_dispatched | An action was processed through the dispatch pipeline |
group_flushed | A batch of grouped events was flushed |
timeout | A state machine timeout fired |
chain_advanced | A task chain step was advanced |
approval_required | An action requires human approval |
lagged | Warning: the client fell behind and events were skipped |
Query Parameters¶
All filters are optional. When multiple filters are specified, they are combined with AND logic.
| Parameter | Type | Description |
|---|---|---|
namespace | string | Only receive events from this namespace |
action_type | string | Only receive events for this action type |
outcome | string | Only receive dispatch events with this outcome category (e.g., executed, suppressed, failed, throttled, rerouted, deduplicated) |
event_type | string | Only receive events of this type (e.g., action_dispatched, group_flushed, timeout, chain_advanced, approval_required) |
Authentication¶
The SSE endpoint sits behind the standard auth layer. Authenticate with a Bearer token or API key, the same as any other protected endpoint:
Tenant Isolation¶
Events are filtered based on the caller's tenant grants:
- Scoped callers only receive events for their authorized tenants
- Wildcard callers (admin) receive events for all tenants
- Tenant filtering happens server-side before events reach the client
Connection Limits¶
To prevent resource exhaustion, a configurable per-tenant limit on concurrent SSE connections is enforced (default: 10). When the limit is reached, new connections receive HTTP 429.
Configure in the server config:
Security¶
Stream events are sanitized before broadcast:
ProviderResponse.bodyis replaced withnull(may contain PII or secrets)ProviderResponse.headersare cleared (may contain auth tokens)- Approval URLs are redacted to
[redacted](HMAC-signed tokens)
This means SSE events carry enough metadata for monitoring (outcome status, provider, timing) without exposing sensitive payload data.
Backpressure¶
When a slow client can't keep up with the event rate, it receives a special lagged event indicating how many events were dropped:
The stream continues from the current position. This is preferable to disconnecting the client entirely.
Reconnection¶
SSE has built-in reconnection support. Each event carries a unique id field. On reconnect, browsers and SSE-capable clients send the Last-Event-ID header automatically.
Note: Events that occurred between disconnect and reconnect are not replayed from a buffer. For gap-free event history, use the Audit Trail API.
Keep-Alive¶
The server sends a keep-alive comment every 15 seconds to prevent proxy/load-balancer timeout disconnections.
Examples¶
curl¶
# All events
curl -N -H "Authorization: Bearer <token>" \
http://localhost:8080/v1/stream
# Only executed email actions in the alerts namespace
curl -N -H "Authorization: Bearer <token>" \
"http://localhost:8080/v1/stream?namespace=alerts&action_type=send_email&outcome=executed"
# Only group flush and timeout events
curl -N -H "Authorization: Bearer <token>" \
"http://localhost:8080/v1/stream?event_type=group_flushed"
Rust¶
use acteon_client::{ActeonClient, StreamFilter};
use futures::StreamExt;
let client = ActeonClient::new("http://localhost:8080")
.with_bearer_token("my-token");
let filter = StreamFilter::new()
.namespace("alerts")
.action_type("send_email");
let mut stream = client.stream(&filter).await?;
while let Some(item) = stream.next().await {
match item? {
acteon_client::StreamItem::Event(event) => {
println!("Event: {} in {}", event.id, event.namespace);
}
acteon_client::StreamItem::Lagged { skipped } => {
eprintln!("Warning: missed {skipped} events");
}
acteon_client::StreamItem::KeepAlive => {}
}
}
Python¶
import requests
url = "http://localhost:8080/v1/stream"
headers = {"Authorization": "Bearer <token>"}
params = {"namespace": "alerts", "outcome": "failed"}
with requests.get(url, headers=headers, params=params, stream=True) as resp:
for line in resp.iter_lines(decode_unicode=True):
if line.startswith("data:"):
import json
event = json.loads(line[5:].strip())
print(f"Event: {event['id']} - {event['type']}")
Node.js¶
import { EventSource } from "eventsource";
const es = new EventSource(
"http://localhost:8080/v1/stream?namespace=alerts",
{ headers: { Authorization: "Bearer <token>" } }
);
es.addEventListener("action_dispatched", (e) => {
const event = JSON.parse(e.data);
console.log(`Dispatched: ${event.action_id} -> ${event.provider}`);
});
es.addEventListener("lagged", (e) => {
const { skipped } = JSON.parse(e.data);
console.warn(`Missed ${skipped} events`);
});
Go¶
req, _ := http.NewRequest("GET", "http://localhost:8080/v1/stream?namespace=alerts", nil)
req.Header.Set("Authorization", "Bearer <token>")
resp, _ := http.DefaultClient.Do(req)
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data:") {
data := strings.TrimPrefix(line, "data:")
fmt.Println("Event:", strings.TrimSpace(data))
}
}
Limitations¶
- Live monitoring only: Events that occur while no client is connected are not persisted. Use the Audit Trail for historical data.
- Process-local: In multi-instance deployments, SSE connections only see events from the instance they're connected to. Use sticky sessions at the load balancer level.
- HTTP/1.1 browser limit: Browsers allow at most 6 concurrent SSE connections per origin. This is not a limitation for server-side clients.
- No binary support: Events are JSON-encoded text. Binary payloads are not supported via SSE.