Telemetry API
BreachResponse provides real-time telemetry streaming via Server-Sent Events (SSE) and log ingestion for the Python sentinel agent.
Endpoints
| Method | Path | Auth | Description |
|---|---|---|---|
| POST | /api/logs/ingest | Optional | Ingest telemetry from agent |
| GET | /api/logs/stream | -- | SSE stream of telemetry events |
| POST | /api/nodes/heartbeat | Optional | Agent heartbeat ping |
POST /api/logs/ingest
Receives telemetry data from the Python sentinel agent and broadcasts it to all SSE clients.
Body
{
"txHash": "0x8f2a9aac...",
"protocol": "TargetVault",
"type": "On-Chain Reentrancy Proposal",
"gasSaved": "pending operator approval",
"status": "PROPOSED"
}
| Parameter | Type | Required | Description |
|---|---|---|---|
txHash | string | No | Transaction hash |
protocol | string | No | Protocol name |
type | string | No | Exploit/event type |
gasSaved | string | No | Gas saved or "pending operator approval" |
status | string | No | SAFE, PROPOSED, MITIGATED, SCANNING |
Log-Level Ingestion
The agent also sends raw log lines via the Reporter:
{
"text": "[SENTINEL] Web3 connection established with RPC...",
"level": "INFO"
}
Response (200)
{
"success": true
}
Implementation
export async function POST(request: Request) {
if (!isAuthorizedIngest(request)) {
return NextResponse.json(
{ success: false, error: 'Unauthorized' },
{ status: 401 }
);
}
const body = await request.json();
// Persist to database
await prisma.alert.create({
data: {
txHash: body.txHash,
protocol: body.protocol,
type: body.type,
gasSaved: body.gasSaved,
status: body.status,
}
});
// Emit real-time SSE event
const eventType = body.status === 'MITIGATED' ? 'ALERT' : 'LOG';
sseEmitter.emit('log', {
id: crypto.randomUUID(),
type: eventType,
timestamp: new Date().toISOString(),
data: body,
});
return NextResponse.json({ success: true });
}
GET /api/logs/stream
Server-Sent Events (SSE) endpoint that streams telemetry updates in real time.
Usage
const eventSource = new EventSource('http://localhost:3000/api/logs/stream');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('New event:', data);
};
eventSource.addEventListener('log', (event) => {
const data = JSON.parse(event.data);
// Update dashboard with new telemetry
});
Event Types
| Event Type | When Emitted |
|---|---|
log | Every telemetry event (SAFE, PROPOSED, SCANNING) |
alert | When status === 'MITIGATED' (critical events) |
SSE Event Format
event: log
data: {"id":"abc123...","type":"LOG","timestamp":"2026-06-21T12:00:00.000Z","data":{"txHash":"0x...","status":"SAFE"}}
event: log
data: {"id":"def456...","type":"ALERT","timestamp":"2026-06-21T12:00:03.000Z","data":{"txHash":"0x...","status":"MITIGATED"}}
Connection Management
The SSE connection remains open indefinitely. The client should implement reconnection logic:
let eventSource;
function connectSSE() {
eventSource = new EventSource('/api/logs/stream');
eventSource.onerror = () => {
// Reconnect after 3 seconds
eventSource.close();
setTimeout(connectSSE, 3000);
};
eventSource.addEventListener('log', handleTelemetry);
}
connectSSE();
Event Bus Architecture
┌──────────────┐ POST /api/logs/ingest ┌──────────────┐
│ Python │──────────────────────────────▶│ Next.js │
│ Agent │ │ Route │
└──────────────┘ └──────┬───────┘
│
sseEmitter.emit('log', event)
│
▼
┌──────────────┐
│ EventEmitter│
│ (singleton) │
└──────┬───────┘
│
SSE connections (1 per client)
│
┌──────────────────────┼──────────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Browser │ │ Browser │ │ Browser │
│ Client 1 │ │ Client 2 │ │ Client 3 │
└──────────┘ └──────────┘ └──────────┘
Note: The current implementation uses an in-process
EventEmitter. In production with multiple server instances, replace with Redis Pub/Sub (the code already has Upstash Redis integration for persistence).
POST /api/nodes/heartbeat
Receives heartbeat pings from the sentinel agent to keep its status active in the dashboard.
Body
{
"address": "0x9f758be3ae3D985713964339E2f0bD783fC6015c",
"name": "Sentinel.ax Node"
}
| Parameter | Type | Required | Description |
|---|---|---|---|
address | string | Yes | Agent wallet address (42-char hex) |
name | string | No | Agent display name (max 200 chars) |
Response (200)
{
"success": true,
"node": {
"id": "sentinel-ax-node",
"name": "Sentinel.ax Node",
"address": "0x9f758be3ae3D985713964339E2f0bD783fC6015c",
"status": "ACTIVE",
"latency": "8ms",
"events": 940,
"lastHeartbeat": "2026-06-21T12:00:03.000Z",
"registeredAt": "2026-01-03T00:00:00.000Z"
}
}
Implementation
export async function POST(request: Request) {
if (!isAuthorizedIngest(request)) {
return NextResponse.json(
{ success: false, error: 'Unauthorized' },
{ status: 401 }
);
}
const { address, name } = await request.json();
const existing = await prisma.sentinelNode.findUnique({ where: { address } });
const node = existing
? await prisma.sentinelNode.update({
where: { id: existing.id },
data: {
lastHeartbeat: new Date(),
status: 'ACTIVE',
events: existing.events + 1
}
})
: await prisma.sentinelNode.create({
data: {
address,
name: name || 'Sentinel Node',
status: 'ACTIVE',
latency: '8ms',
events: 1
}
});
return NextResponse.json({ success: true, node });
}
If the address is unknown, the heartbeat creates a new sentinel node entry (auto-registration).
Agent Integration
Python Reporter (agent/reporter.py)
class Reporter:
@staticmethod
def log(text: str, level: str = "INFO"):
"""Sends a raw log line to the SSE endpoint."""
url = frontend_api_url("logs/ingest")
payload = {"text": text, "level": level}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=ingest_headers(), method='POST')
urllib.request.urlopen(req, timeout=2)
@staticmethod
def heartbeat(address: str, name: str = "Sentinel.ax Node"):
"""Sends a heartbeat ping to keep node active in UI."""
url = frontend_api_url("nodes/heartbeat")
payload = {"address": address, "name": name}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=ingest_headers(), method='POST')
urllib.request.urlopen(req, timeout=2)
The agent calls Reporter.heartbeat() on every scan iteration (~3 seconds) and Reporter.log() for important events. post_log_to_frontend() in agent/main.py sends structured telemetry with transaction details.
Telemetry Events (Database)
Events are persisted in the telemetry_logs table:
CREATE TABLE IF NOT EXISTS telemetry_logs (
id TEXT PRIMARY KEY,
text TEXT,
level TEXT,
tx_hash TEXT,
protocol TEXT,
verification_type TEXT,
gas_saved TEXT,
status TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
The prisma.alert adapter provides query access:
const recentAlerts = await prisma.alert.findMany({
orderBy: { createdAt: 'desc' },
take: 100
});
Performance
| Operation | Typical Latency |
|---|---|
POST /api/logs/ingest | 10-50ms (DB insert) |
| SSE event delivery (in-process) | <5ms |
POST /api/nodes/heartbeat | 10-50ms (DB upsert) |
GET /api/sentinels (DB) | 20-80ms |
GET /api/sentinels (memory) | <1ms |
Next Steps
- Sentinels API -- Manage monitored protocol contracts
- Deployment -- Production deployment guides