Monitor Pipeline
The Monitor Pipeline is the first stage of BreachResponse. It continuously watches the Mantle Sepolia blockchain for activity targeting registered protocol contracts, capturing raw on-chain data and feeding it into the detection pipeline.
Architecture
┌──────────────────────────────────────────────────────────┐
│ MONITOR PIPELINE │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ Block Poller │────▶│ Event Scanner│───▶│ Tx Filter │ │
│ │ │ │ │ │ │ │
│ │ eth_blockNum│ │ eth_getLogs │ │ Direct │ │
│ │ catch-up │ │ address[] │ │ calls to │ │
│ │ loop │ │ filter │ │ sentinels │ │
│ └─────────────┘ └──────┬───────┘ └─────┬─────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────┐ │
│ │ Log Grouper │ │
│ │ │ │
│ │ tx_to_logs[tx_hash] = logs │ │
│ │ len(logs) > 1 -> REENTRANCY│ │
│ └─────────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────┐ │
│ │ Telemetry Bridge │ │
│ │ │ │
│ │ POST /api/logs/ingest │ │
│ │ POST /api/nodes/heartbeat │ │
│ └─────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────┘
Block Polling Loop
The sentinel agent's main loop (agent/main.py, run_sentinel_loop()) polls for new blocks every 3 seconds:
def run_sentinel_loop():
# Track block heights to avoid duplicate scans
last_scanned_block = w3.eth.block_number
while True:
# Heartbeat every iteration
Reporter.heartbeat(agent_address)
# Fetch updated registered protocols from DB
monitored_addresses = get_registered_protocols()
if w3 and w3.is_connected() and last_scanned_block is not None:
current_block = w3.eth.block_number
if current_block > last_scanned_block:
# Scan each new block since last check
for block_num in range(last_scanned_block + 1, current_block + 1):
scan_block(block_num, monitored_addresses)
last_scanned_block = current_block
time.sleep(3.0)
Key Characteristics
- Block Range Scanning: The loop catches up to all blocks produced since the last scan. If the agent was offline for 10 blocks, it will scan all 10 in sequence.
- No Gap:
last_scanned_blockis only updated after all intermediate blocks are processed. - Heartbeat: Each iteration sends a heartbeat ping to keep the agent visible as
ACTIVEin the dashboard.
Event Log Filtering
For each new block, the agent queries all event logs emitted by registered sentinel contracts:
checksum_addresses = [Web3.to_checksum_address(addr) for addr in monitored_addresses]
logs = w3.eth.get_logs({
'fromBlock': block_num,
'toBlock': block_num,
'address': checksum_addresses
})
The eth_getLogs Call
This single RPC call retrieves all events emitted by all monitored contracts in a specific block. It's efficient because:
- It filters server-side (the Mantle RPC node only returns matching logs)
- It batches all monitored addresses into one request
- It covers both indexed and non-indexed event parameters
The response is an array of log objects:
[
{
"address": "0x9d9b602cfe69cff9706eac399808e84682ce94fb",
"topics": [
"0x7cfc8a5b8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a",
"0x000000000000000000000000abcdef1234567890abcdef1234567890abc"
],
"data": "0x0000000000000000000000000000000000000000000000000de0b6b3a7640000",
"blockNumber": "0xbc61ab",
"transactionHash": "0x8f2a9aac...",
"logIndex": "0x1"
}
]
Reentrancy Detection via Log Grouping
The critical insight: a reentrancy attack produces multiple event logs from the same transaction on the same contract.
# Group logs by transaction hash
tx_to_logs = {}
for log in logs:
tx_hash = w3.to_hex(log['transactionHash'])
tx_to_logs[tx_hash] = tx_to_logs.get(tx_hash, []) + [log]
for tx_hash, logs_in_tx in tx_to_logs.items():
if len(logs_in_tx) > 1:
# REENTRANCY DETECTED
Why This Works
Consider the TargetVault.withdraw() function:
function withdraw(uint256 amount) external whenNotPaused {
require(balances[msg.sender] >= amount, "Insufficient balance");
(bool success, ) = msg.sender.call{value: amount}(""); // External call
require(success, "Transfer failed");
unchecked {
balances[msg.sender] -= amount; // State update AFTER external call
}
emit Withdrawn(msg.sender, amount); // Event emitted
}
When exploited:
- First call:
Attacker.attack(amount)->TargetVault.withdraw(amount) - ETH sent to
Attacker->Attacker.receive()->TargetVault.withdraw(amount)again - Second call:
TargetVault.withdraw(amount)-- state not yet updated, balance check passes - Two
Withdrawnevents emitted in the same transaction
The log grouper catches both events and flags the transaction.
Direct Transaction Filtering
In addition to event logs, the agent inspects every transaction in each block for direct calls to monitored contracts:
block = w3.eth.get_block(block_num, full_transactions=True)
for tx in block.transactions:
to_addr = tx.get('to')
if to_addr and to_addr.lower() in monitored_addresses:
tx_hash_hex = w3.to_hex(tx['hash'])
print(f"[SCAN] Monitored direct interaction: Tx {tx_hash_hex} -> Target {to_addr}")
This catches interactions that may not emit events (e.g., view functions or failed transactions that reverted after partial execution).
Protocol Registration
The agent dynamically fetches the list of monitored addresses from the frontend API:
def get_registered_protocols():
url = frontend_api_url("sentinels")
req = urllib.request.Request(url, method='GET')
with urllib.request.urlopen(req, timeout=3) as res:
data = json.loads(res.read().decode('utf-8'))
db_addrs = [node['address'].lower() for node in data if 'address' in node]
return db_addrs + ["0x596Ff2Ca0f781a2CED29EC685cD1ba038378dE02".lower()]
This includes:
- All sentinels registered in the PostgreSQL database via
GET /api/sentinels - A hardcoded local simulation vault address for development/testing
Registrations update in real time -- the agent fetches the latest list on every scan iteration.
Telemetry Bridge
The agent bridges its internal state to the frontend dashboard through two API endpoints:
Log Ingestion
def post_log_to_frontend(tx_hash, protocol, exploit_type, gas_saved, status):
url = frontend_api_url("logs/ingest")
payload = {
"txHash": tx_hash,
"protocol": protocol,
"type": exploit_type,
"gasSaved": gas_saved,
"status": status
}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=ingest_headers(), method='POST')
with urllib.request.urlopen(req, timeout=3) as res:
return res.read()
The frontend route (POST /api/logs/ingest) receives this, persists to PostgreSQL, emits an SSE event, and returns a response:
export async function POST(request: Request) {
if (!isAuthorizedIngest(request)) {
return NextResponse.json({ success: false, error: 'Unauthorized' }, { status: 401 });
}
const body = await request.json();
await prisma.alert.create({ data: { ... } });
sseEmitter.emit('log', { id, type, timestamp, data: body });
return NextResponse.json({ success: true });
}
Heartbeat
Reporter.heartbeat(agent_address)
The heartbeat route (POST /api/nodes/heartbeat) updates the sentinel node's lastHeartbeat and events count, or creates a new node if the address is unknown:
export async function POST(request: Request) {
const { address, name } = await request.json();
const existing = await prisma.sentinelNode.findUnique({ where: { address } });
const node = existing
? await prisma.sentinelNode.update({
where: { id: existing.id },
data: { lastHeartbeat: new Date(), status: 'ACTIVE', events: existing.events + 1 }
})
: await prisma.sentinelNode.create({ data: { address, name, status: 'ACTIVE', ... } });
return NextResponse.json({ success: true, node });
}
Simulated Activity
To keep the dashboard populated during development and testing, the agent generates simulated safe transactions on every iteration:
tx_num = random.randint(100000, 999999)
sim_tx_hash = f"0x{random.randint(10, 99)}fa...{tx_num}"
sim_protocol = random.choice(protocols)
print(f"[SCAN] Mempool scan tx: {sim_tx_hash} -> Targeting {sim_protocol} :: SAFE")
post_log_to_frontend(
tx_hash=sim_tx_hash,
protocol=sim_protocol,
exploit_type="Normal Transfer",
gas_saved="0 mETH",
status="SAFE"
)
Every 12 iterations, a controlled anomaly scenario is also generated:
if step % 12 == 0:
# Controlled reentrancy detection scenario
# Marked as CONTROLLED SCENARIO -- never broadcasts real transactions
post_log_to_frontend(
tx_hash=sim_exploit_hash,
protocol="TargetVault",
exploit_type="Reentrancy Response Proposal",
gas_saved="pending operator approval",
status="PROPOSED"
)
These simulated events are clearly marked in logs and never trigger actual on-chain actions.
Error Handling
The monitor pipeline is designed to be resilient:
try:
# Block scanning
...
except Exception as block_err:
print(f"[SENTINEL] Error during block scan iteration: {block_err}")
# Loop continues -- next iteration will catch up
try:
# Log querying
...
except Exception as log_err:
print(f"[SENTINEL] Error querying logs in block {block_num}: {log_err}")
# Continue to next block
try:
# Transaction inspection
...
except Exception as tx_err:
print(f"[SENTINEL] Error checking block txs: {tx_err}")
try:
# Frontend bridge
...
except Exception as e:
print(f"[FRONTEND-BRIDGE] Warning: Failed to sync log to Next.js dashboard: {e}")
# Agent continues without frontend -- telemetry is best-effort
- RPC failures do not crash the agent
- Frontend unavailability does not stop monitoring
- The loop always catches up to missed blocks on next iteration
- A 5-second backoff on critical loop errors prevents tight restart loops
Performance Considerations
RPC Call Optimization
| Call | Frequency | Cost |
|---|---|---|
eth_blockNumber | Every 3 seconds | Lightweight (1 RPC) |
eth_getLogs (per block) | Per new block | Moderate (filtered server-side) |
eth_getBlock(full) | Per new block | Heavy (full tx data) -- only when registered protocols exist |
GET /api/sentinels | Every 3 seconds | Lightweight (DB query) |
POST /api/logs/ingest | Per event | Lightweight (DB insert) |
POST /api/nodes/heartbeat | Every 3 seconds | Lightweight (DB upsert) |
Mantle Sepolia Characteristics
Mantle Sepolia produces blocks approximately every 2 seconds. The 3-second scan interval means the agent typically processes 1-2 blocks per iteration, well within the timeout limits.
Next Steps
- Detect Pipeline -- How suspicious transactions are classified by AI
- Sentinel Registry -- The on-chain contract that governs protocol registration