Skip to main content

Telemetry API

BreachResponse provides real-time telemetry streaming via Server-Sent Events (SSE) and log ingestion for the Python sentinel agent.


Endpoints

MethodPathAuthDescription
POST/api/logs/ingestOptionalIngest telemetry from agent
GET/api/logs/stream--SSE stream of telemetry events
POST/api/nodes/heartbeatOptionalAgent heartbeat ping

POST /api/logs/ingest

Receives telemetry data from the Python sentinel agent and broadcasts it to all SSE clients.

Body

{
"txHash": "0x8f2a9aac...",
"protocol": "TargetVault",
"type": "On-Chain Reentrancy Proposal",
"gasSaved": "pending operator approval",
"status": "PROPOSED"
}
ParameterTypeRequiredDescription
txHashstringNoTransaction hash
protocolstringNoProtocol name
typestringNoExploit/event type
gasSavedstringNoGas saved or "pending operator approval"
statusstringNoSAFE, PROPOSED, MITIGATED, SCANNING

Log-Level Ingestion

The agent also sends raw log lines via the Reporter:

{
"text": "[SENTINEL] Web3 connection established with RPC...",
"level": "INFO"
}

Response (200)

{
"success": true
}

Implementation

export async function POST(request: Request) {
if (!isAuthorizedIngest(request)) {
return NextResponse.json(
{ success: false, error: 'Unauthorized' },
{ status: 401 }
);
}

const body = await request.json();

// Persist to database
await prisma.alert.create({
data: {
txHash: body.txHash,
protocol: body.protocol,
type: body.type,
gasSaved: body.gasSaved,
status: body.status,
}
});

// Emit real-time SSE event
const eventType = body.status === 'MITIGATED' ? 'ALERT' : 'LOG';
sseEmitter.emit('log', {
id: crypto.randomUUID(),
type: eventType,
timestamp: new Date().toISOString(),
data: body,
});

return NextResponse.json({ success: true });
}

GET /api/logs/stream

Server-Sent Events (SSE) endpoint that streams telemetry updates in real time.

Usage

const eventSource = new EventSource('http://localhost:3000/api/logs/stream');

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('New event:', data);
};

eventSource.addEventListener('log', (event) => {
const data = JSON.parse(event.data);
// Update dashboard with new telemetry
});

Event Types

Event TypeWhen Emitted
logEvery telemetry event (SAFE, PROPOSED, SCANNING)
alertWhen status === 'MITIGATED' (critical events)

SSE Event Format

event: log
data: {"id":"abc123...","type":"LOG","timestamp":"2026-06-21T12:00:00.000Z","data":{"txHash":"0x...","status":"SAFE"}}

event: log
data: {"id":"def456...","type":"ALERT","timestamp":"2026-06-21T12:00:03.000Z","data":{"txHash":"0x...","status":"MITIGATED"}}

Connection Management

The SSE connection remains open indefinitely. The client should implement reconnection logic:

let eventSource;

function connectSSE() {
eventSource = new EventSource('/api/logs/stream');

eventSource.onerror = () => {
// Reconnect after 3 seconds
eventSource.close();
setTimeout(connectSSE, 3000);
};

eventSource.addEventListener('log', handleTelemetry);
}

connectSSE();

Event Bus Architecture

┌──────────────┐ POST /api/logs/ingest ┌──────────────┐
│ Python │──────────────────────────────▶│ Next.js │
│ Agent │ │ Route │
└──────────────┘ └──────┬───────┘

sseEmitter.emit('log', event)


┌──────────────┐
│ EventEmitter│
│ (singleton) │
└──────┬───────┘

SSE connections (1 per client)

┌──────────────────────┼──────────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Browser │ │ Browser │ │ Browser │
│ Client 1 │ │ Client 2 │ │ Client 3 │
└──────────┘ └──────────┘ └──────────┘

Note: The current implementation uses an in-process EventEmitter. In production with multiple server instances, replace with Redis Pub/Sub (the code already has Upstash Redis integration for persistence).


POST /api/nodes/heartbeat

Receives heartbeat pings from the sentinel agent to keep its status active in the dashboard.

Body

{
"address": "0x9f758be3ae3D985713964339E2f0bD783fC6015c",
"name": "Sentinel.ax Node"
}
ParameterTypeRequiredDescription
addressstringYesAgent wallet address (42-char hex)
namestringNoAgent display name (max 200 chars)

Response (200)

{
"success": true,
"node": {
"id": "sentinel-ax-node",
"name": "Sentinel.ax Node",
"address": "0x9f758be3ae3D985713964339E2f0bD783fC6015c",
"status": "ACTIVE",
"latency": "8ms",
"events": 940,
"lastHeartbeat": "2026-06-21T12:00:03.000Z",
"registeredAt": "2026-01-03T00:00:00.000Z"
}
}

Implementation

export async function POST(request: Request) {
if (!isAuthorizedIngest(request)) {
return NextResponse.json(
{ success: false, error: 'Unauthorized' },
{ status: 401 }
);
}

const { address, name } = await request.json();

const existing = await prisma.sentinelNode.findUnique({ where: { address } });

const node = existing
? await prisma.sentinelNode.update({
where: { id: existing.id },
data: {
lastHeartbeat: new Date(),
status: 'ACTIVE',
events: existing.events + 1
}
})
: await prisma.sentinelNode.create({
data: {
address,
name: name || 'Sentinel Node',
status: 'ACTIVE',
latency: '8ms',
events: 1
}
});

return NextResponse.json({ success: true, node });
}

If the address is unknown, the heartbeat creates a new sentinel node entry (auto-registration).


Agent Integration

Python Reporter (agent/reporter.py)

class Reporter:
@staticmethod
def log(text: str, level: str = "INFO"):
"""Sends a raw log line to the SSE endpoint."""
url = frontend_api_url("logs/ingest")
payload = {"text": text, "level": level}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=ingest_headers(), method='POST')
urllib.request.urlopen(req, timeout=2)

@staticmethod
def heartbeat(address: str, name: str = "Sentinel.ax Node"):
"""Sends a heartbeat ping to keep node active in UI."""
url = frontend_api_url("nodes/heartbeat")
payload = {"address": address, "name": name}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=ingest_headers(), method='POST')
urllib.request.urlopen(req, timeout=2)

The agent calls Reporter.heartbeat() on every scan iteration (~3 seconds) and Reporter.log() for important events. post_log_to_frontend() in agent/main.py sends structured telemetry with transaction details.


Telemetry Events (Database)

Events are persisted in the telemetry_logs table:

CREATE TABLE IF NOT EXISTS telemetry_logs (
id TEXT PRIMARY KEY,
text TEXT,
level TEXT,
tx_hash TEXT,
protocol TEXT,
verification_type TEXT,
gas_saved TEXT,
status TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

The prisma.alert adapter provides query access:

const recentAlerts = await prisma.alert.findMany({
orderBy: { createdAt: 'desc' },
take: 100
});

Performance

OperationTypical Latency
POST /api/logs/ingest10-50ms (DB insert)
SSE event delivery (in-process)<5ms
POST /api/nodes/heartbeat10-50ms (DB upsert)
GET /api/sentinels (DB)20-80ms
GET /api/sentinels (memory)<1ms

Next Steps