Skip to main content

Monitor Pipeline

The Monitor Pipeline is the first stage of BreachResponse. It continuously watches the Mantle Sepolia blockchain for activity targeting registered protocol contracts, capturing raw on-chain data and feeding it into the detection pipeline.


Architecture

┌──────────────────────────────────────────────────────────┐
│ MONITOR PIPELINE │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────┐ │
│ │ Block Poller │────▶│ Event Scanner│───▶│ Tx Filter │ │
│ │ │ │ │ │ │ │
│ │ eth_blockNum│ │ eth_getLogs │ │ Direct │ │
│ │ catch-up │ │ address[] │ │ calls to │ │
│ │ loop │ │ filter │ │ sentinels │ │
│ └─────────────┘ └──────┬───────┘ └─────┬─────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────┐ │
│ │ Log Grouper │ │
│ │ │ │
│ │ tx_to_logs[tx_hash] = logs │ │
│ │ len(logs) > 1 -> REENTRANCY│ │
│ └─────────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────┐ │
│ │ Telemetry Bridge │ │
│ │ │ │
│ │ POST /api/logs/ingest │ │
│ │ POST /api/nodes/heartbeat │ │
│ └─────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────┘

Block Polling Loop

The sentinel agent's main loop (agent/main.py, run_sentinel_loop()) polls for new blocks every 3 seconds:

def run_sentinel_loop():
# Track block heights to avoid duplicate scans
last_scanned_block = w3.eth.block_number

while True:
# Heartbeat every iteration
Reporter.heartbeat(agent_address)

# Fetch updated registered protocols from DB
monitored_addresses = get_registered_protocols()

if w3 and w3.is_connected() and last_scanned_block is not None:
current_block = w3.eth.block_number
if current_block > last_scanned_block:
# Scan each new block since last check
for block_num in range(last_scanned_block + 1, current_block + 1):
scan_block(block_num, monitored_addresses)

last_scanned_block = current_block

time.sleep(3.0)

Key Characteristics

  • Block Range Scanning: The loop catches up to all blocks produced since the last scan. If the agent was offline for 10 blocks, it will scan all 10 in sequence.
  • No Gap: last_scanned_block is only updated after all intermediate blocks are processed.
  • Heartbeat: Each iteration sends a heartbeat ping to keep the agent visible as ACTIVE in the dashboard.

Event Log Filtering

For each new block, the agent queries all event logs emitted by registered sentinel contracts:

checksum_addresses = [Web3.to_checksum_address(addr) for addr in monitored_addresses]
logs = w3.eth.get_logs({
'fromBlock': block_num,
'toBlock': block_num,
'address': checksum_addresses
})

The eth_getLogs Call

This single RPC call retrieves all events emitted by all monitored contracts in a specific block. It's efficient because:

  1. It filters server-side (the Mantle RPC node only returns matching logs)
  2. It batches all monitored addresses into one request
  3. It covers both indexed and non-indexed event parameters

The response is an array of log objects:

[
{
"address": "0x9d9b602cfe69cff9706eac399808e84682ce94fb",
"topics": [
"0x7cfc8a5b8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a7c8a",
"0x000000000000000000000000abcdef1234567890abcdef1234567890abc"
],
"data": "0x0000000000000000000000000000000000000000000000000de0b6b3a7640000",
"blockNumber": "0xbc61ab",
"transactionHash": "0x8f2a9aac...",
"logIndex": "0x1"
}
]

Reentrancy Detection via Log Grouping

The critical insight: a reentrancy attack produces multiple event logs from the same transaction on the same contract.

# Group logs by transaction hash
tx_to_logs = {}
for log in logs:
tx_hash = w3.to_hex(log['transactionHash'])
tx_to_logs[tx_hash] = tx_to_logs.get(tx_hash, []) + [log]

for tx_hash, logs_in_tx in tx_to_logs.items():
if len(logs_in_tx) > 1:
# REENTRANCY DETECTED

Why This Works

Consider the TargetVault.withdraw() function:

function withdraw(uint256 amount) external whenNotPaused {
require(balances[msg.sender] >= amount, "Insufficient balance");

(bool success, ) = msg.sender.call{value: amount}(""); // External call
require(success, "Transfer failed");

unchecked {
balances[msg.sender] -= amount; // State update AFTER external call
}

emit Withdrawn(msg.sender, amount); // Event emitted
}

When exploited:

  1. First call: Attacker.attack(amount) -> TargetVault.withdraw(amount)
  2. ETH sent to Attacker -> Attacker.receive() -> TargetVault.withdraw(amount) again
  3. Second call: TargetVault.withdraw(amount) -- state not yet updated, balance check passes
  4. Two Withdrawn events emitted in the same transaction

The log grouper catches both events and flags the transaction.


Direct Transaction Filtering

In addition to event logs, the agent inspects every transaction in each block for direct calls to monitored contracts:

block = w3.eth.get_block(block_num, full_transactions=True)
for tx in block.transactions:
to_addr = tx.get('to')
if to_addr and to_addr.lower() in monitored_addresses:
tx_hash_hex = w3.to_hex(tx['hash'])
print(f"[SCAN] Monitored direct interaction: Tx {tx_hash_hex} -> Target {to_addr}")

This catches interactions that may not emit events (e.g., view functions or failed transactions that reverted after partial execution).


Protocol Registration

The agent dynamically fetches the list of monitored addresses from the frontend API:

def get_registered_protocols():
url = frontend_api_url("sentinels")
req = urllib.request.Request(url, method='GET')
with urllib.request.urlopen(req, timeout=3) as res:
data = json.loads(res.read().decode('utf-8'))
db_addrs = [node['address'].lower() for node in data if 'address' in node]
return db_addrs + ["0x596Ff2Ca0f781a2CED29EC685cD1ba038378dE02".lower()]

This includes:

  1. All sentinels registered in the PostgreSQL database via GET /api/sentinels
  2. A hardcoded local simulation vault address for development/testing

Registrations update in real time -- the agent fetches the latest list on every scan iteration.


Telemetry Bridge

The agent bridges its internal state to the frontend dashboard through two API endpoints:

Log Ingestion

def post_log_to_frontend(tx_hash, protocol, exploit_type, gas_saved, status):
url = frontend_api_url("logs/ingest")
payload = {
"txHash": tx_hash,
"protocol": protocol,
"type": exploit_type,
"gasSaved": gas_saved,
"status": status
}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=ingest_headers(), method='POST')
with urllib.request.urlopen(req, timeout=3) as res:
return res.read()

The frontend route (POST /api/logs/ingest) receives this, persists to PostgreSQL, emits an SSE event, and returns a response:

export async function POST(request: Request) {
if (!isAuthorizedIngest(request)) {
return NextResponse.json({ success: false, error: 'Unauthorized' }, { status: 401 });
}
const body = await request.json();
await prisma.alert.create({ data: { ... } });
sseEmitter.emit('log', { id, type, timestamp, data: body });
return NextResponse.json({ success: true });
}

Heartbeat

Reporter.heartbeat(agent_address)

The heartbeat route (POST /api/nodes/heartbeat) updates the sentinel node's lastHeartbeat and events count, or creates a new node if the address is unknown:

export async function POST(request: Request) {
const { address, name } = await request.json();
const existing = await prisma.sentinelNode.findUnique({ where: { address } });
const node = existing
? await prisma.sentinelNode.update({
where: { id: existing.id },
data: { lastHeartbeat: new Date(), status: 'ACTIVE', events: existing.events + 1 }
})
: await prisma.sentinelNode.create({ data: { address, name, status: 'ACTIVE', ... } });
return NextResponse.json({ success: true, node });
}

Simulated Activity

To keep the dashboard populated during development and testing, the agent generates simulated safe transactions on every iteration:

tx_num = random.randint(100000, 999999)
sim_tx_hash = f"0x{random.randint(10, 99)}fa...{tx_num}"
sim_protocol = random.choice(protocols)

print(f"[SCAN] Mempool scan tx: {sim_tx_hash} -> Targeting {sim_protocol} :: SAFE")
post_log_to_frontend(
tx_hash=sim_tx_hash,
protocol=sim_protocol,
exploit_type="Normal Transfer",
gas_saved="0 mETH",
status="SAFE"
)

Every 12 iterations, a controlled anomaly scenario is also generated:

if step % 12 == 0:
# Controlled reentrancy detection scenario
# Marked as CONTROLLED SCENARIO -- never broadcasts real transactions
post_log_to_frontend(
tx_hash=sim_exploit_hash,
protocol="TargetVault",
exploit_type="Reentrancy Response Proposal",
gas_saved="pending operator approval",
status="PROPOSED"
)

These simulated events are clearly marked in logs and never trigger actual on-chain actions.


Error Handling

The monitor pipeline is designed to be resilient:

try:
# Block scanning
...
except Exception as block_err:
print(f"[SENTINEL] Error during block scan iteration: {block_err}")
# Loop continues -- next iteration will catch up

try:
# Log querying
...
except Exception as log_err:
print(f"[SENTINEL] Error querying logs in block {block_num}: {log_err}")
# Continue to next block

try:
# Transaction inspection
...
except Exception as tx_err:
print(f"[SENTINEL] Error checking block txs: {tx_err}")

try:
# Frontend bridge
...
except Exception as e:
print(f"[FRONTEND-BRIDGE] Warning: Failed to sync log to Next.js dashboard: {e}")
# Agent continues without frontend -- telemetry is best-effort
  • RPC failures do not crash the agent
  • Frontend unavailability does not stop monitoring
  • The loop always catches up to missed blocks on next iteration
  • A 5-second backoff on critical loop errors prevents tight restart loops

Performance Considerations

RPC Call Optimization

CallFrequencyCost
eth_blockNumberEvery 3 secondsLightweight (1 RPC)
eth_getLogs (per block)Per new blockModerate (filtered server-side)
eth_getBlock(full)Per new blockHeavy (full tx data) -- only when registered protocols exist
GET /api/sentinelsEvery 3 secondsLightweight (DB query)
POST /api/logs/ingestPer eventLightweight (DB insert)
POST /api/nodes/heartbeatEvery 3 secondsLightweight (DB upsert)

Mantle Sepolia Characteristics

Mantle Sepolia produces blocks approximately every 2 seconds. The 3-second scan interval means the agent typically processes 1-2 blocks per iteration, well within the timeout limits.


Next Steps