Detect Pipeline
The Detect Pipeline is the intelligence layer of BreachResponse. It takes raw on-chain activity from the Monitor Pipeline and determines whether each transaction represents a genuine threat -- using heuristic scanning, AI classification, dual-model comparison, and decentralized consensus validation.
Pipeline Flow
Suspicious Transaction
│
▼
┌─────────────────────────────────────────┐
│ 1. Heuristic Pre-Filter │
│ │
│ Checks: calldata, gas, value, opcodes │
│ Output: score (0-7), threatType │
│ Threshold: score ≥ 3 -> suspicious │
└─────────────────┬───────────────────────┘
│ score ≥ 3
▼
┌─────────────────────────────────────────┐
│ 2. Agent-Side AI Analysis │
│ │
│ IncidentAnalyzer (agent Python) │
│ Model: GPT-4o-mini (or configured) │
│ Output: confidence (0.0-1.0) │
│ Threshold: > 0.9 -> verified threat │
└─────────────────┬───────────────────────┘
│ optional parallel path
▼
┌─────────────────────────────────────────┐
│ 3. Frontend AI Classification │
│ │
│ POST /api/analyze (Groq only) │
│ POST /api/compare (Groq + Hunyuan) │
│ POST /api/audit (Hunyuan + Groq, │
│ bytecode analysis) │
└─────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ 4. GenLayer Consensus Guard │
│ │
│ IncidentConsensusGuard │
│ nondeterministic LLM validation │
│ Leader-follower consensus │
│ Status: approved | rejected | escalated│
└─────────────────────────────────────────┘
1. Heuristic Pre-Filter
Before invoking AI models (which incur latency and API costs), transactions pass through a multi-factor heuristic scanner defined in frontend/src/lib/threatScan.ts:
export function detectThreatHeuristic(tx: HeuristicTx): {
isThreat: boolean;
threatType: string;
score: number;
} {
const value = tx.value ?? BigInt(0);
const gas = tx.gas ?? BigInt(0);
const input = tx.input ?? '0x';
// Signature-based detection
const hasReentrancySig = input.includes('2e1a7d4d') // withdraw(uint256)
|| input.includes('8456cb59'); // pause()
const hasDelegateCall = input.includes('4e487b71') // Panic(uint256)
|| input.includes('f4'); // DELEGATECALL opcode
const longCalldata = input.length > 200;
// Anomaly scoring
const highValue = value > BigInt(0);
const highGas = gas > BigInt(200000);
let score = 0;
let matchedType = '';
if (hasReentrancySig) { score += 3; matchedType = 'Reentrancy'; }
if (hasDelegateCall || longCalldata) {
score += 2;
if (!matchedType) matchedType = 'Oracle Manipulation';
}
if (highValue) score += 1;
if (highGas) score += 1;
// Deterministic variance for demo variety
const h = hashInt(tx.hash);
score += h % 3;
return {
isThreat: score >= 3,
threatType: matchedType || THREAT_TYPES[h % THREAT_TYPES.length],
score,
};
}
Detection Factors
| Factor | Weight | Signal |
|---|---|---|
Reentrancy signature (2e1a7d4d, 8456cb59) | +3 | Known exploit function selectors in calldata |
DELEGATECALL pattern (f4 opcode) | +2 | Potential proxy manipulation |
| Long calldata (>200 chars) | +2 | Complex encoded calls -- possible exploit |
| Non-zero value | +1 | Value transfer -- potential drain |
| High gas (>200,000) | +1 | Expensive execution -- possible loop/recursion |
| Deterministic variance | 0-2 | Demo variety (hash-based, not security-relevant) |
Threshold
Transactions scoring 3 or above are flagged as suspicious and proceed to AI classification. Below threshold, they are classified as SAFE and logged for dashboard display only.
2. Agent-Side AI Analysis
The Python agent runs its own independent AI analysis through IncidentAnalyzer (agent/incident_analyzer.py):
class IncidentAnalyzer:
def __init__(self, api_key=None):
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.model = os.getenv("LLM_MODEL", "gpt-4o-mini")
self.client = OpenAI(
api_key=self.api_key,
base_url=os.getenv("OPENAI_BASE_URL") # For Groq/Hunyuan compatibility
)
def analyze_exploit_payload(self, tx_calldata: str) -> float:
prompt = f"""
You are a Web3 Security Auditor. Analyze the following transaction.
Determine if it is an exploit. Recommend emergency pause calldata.
Assume the target contract implements IPausable.pause()
with keccak256 selector 0x8456cb59.
Input Data:
{tx_calldata}
Respond strictly in JSON format:
{{
"is_exploit": boolean,
"confidence_score": float (0.0 to 1.0),
"exploit_type": string,
"recommended_calldata": string
}}
"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are a smart contract active-defense AI."},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"},
temperature=0.0
)
result = json.loads(response.choices[0].message.content)
return float(result.get("confidence_score", 0.0))
Fallback Mode
If no OPENAI_API_KEY is configured, the analyzer falls back to keyword-based detection:
if not self.client:
if "reentrant" in tx_calldata.lower() or "multiple" in tx_calldata.lower():
return 0.98
return 0.01
Response Formulation
When confidence exceeds the 0.9 threshold, the analyzer formulates a rescue transaction:
def formulate_rescue_transaction(self, target_address, exploit_type):
pause_calldata = self.last_analysis.get("recommended_calldata", "0x8456cb59")
return {
"to": target_address,
"data": pause_calldata,
"gas_limit": 120000,
"description": f"Emergency mitigation for {exploit_type}"
}
The recommended calldata defaults to 0x8456cb59 (the pause() function selector) but the LLM can recommend alternative actions based on the exploit type.
3. Frontend AI Classification
Single-Model Analysis (POST /api/analyze)
Uses Groq Llama 3.1-8B-Instant for fast, low-cost classification:
// frontend/src/app/api/analyze/route.ts
const res = await fetch('https://api.groq.com/openai/v1/chat/completions', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.GROQ_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'llama-3.1-8b-instant',
messages: [
{
role: 'system',
content: 'You are a smart contract security AI running on a real-time threat detection platform...'
},
{
role: 'user',
content: `Analyze this smart contract security incident detected on Mantle network:
Protocol: ${protocol}
Transaction hash: ${txHash}
Detection signature: ${threatType}
Return this exact JSON:
{
"confidence": <float 0.70-0.99>,
"severity": "<CRITICAL|HIGH|MEDIUM>",
"evidencePoints": ["<evidence 1>", "<evidence 2>", "<evidence 3>"],
"recommendation": "<Pause protocol|Alert operators|Monitor only|Multisig review>",
"reasoning": "<1-2 sentences>",
"gasUsed": <integer>,
"expectedGas": <integer>,
"gasAnomalyFactor": <float>,
"gasAnomalyFlag": <true|false>,
"gasAnomalyReason": "<1 sentence>"
}`
}
],
temperature: 0.2,
max_tokens: 350,
response_format: { type: 'json_object' }
})
});
Key parameters:
- temperature: 0.2 -- Low creativity for consistent threat assessment
- response_format: json_object -- Structured output, no markdown parsing needed
- max_tokens: 350 -- Compact responses for low latency
Dual-Model Comparison (POST /api/compare)
Runs Groq and Hunyuan in parallel and returns both verdicts:
// frontend/src/app/api/compare/route.ts
export async function POST(request: Request) {
const { txHash, protocol, threatType } = await request.json();
const prompt = buildUserPrompt(protocol, txHash, threatType);
// Parallel execution -- both models respond simultaneously
const [groq, hunyuan] = await Promise.all([
callGroq(prompt),
callHunyuan(prompt)
]);
const agreement = groq.severity === hunyuan.severity;
const consensusConfidence = (groq.confidence + hunyuan.confidence) / 2;
return NextResponse.json({ groq, hunyuan, agreement, consensusConfidence });
}
Each model returns:
interface ModelVerdict {
model: string; // "llama-3.1-8b-instant" | "hunyuan-lite"
provider: string; // "Groq" | "Hunyuan"
confidence: number; // 0.0-1.0
severity: string; // "CRITICAL" | "HIGH" | "MEDIUM"
recommendation: string;
reasoning: string;
latencyMs: number; // Model response time
source: 'live' | 'fallback';
}
Hunyuan does not support response_format: json_object, so the compare endpoint uses a custom safeParse function that strips markdown fences:
function safeParse(content: string): Record<string, unknown> {
try { return JSON.parse(content); }
catch {
const match = content.match(/\{[\s\S]*\}/);
if (match) { try { return JSON.parse(match[0]); } catch {} }
return {};
}
}
Contract Auditing (POST /api/audit)
A specialized AI endpoint that analyzes deployed contract bytecode:
// frontend/src/app/api/audit/route.ts
// 1. Fetch bytecode from Mantle RPC
const bytecode = await rpcCall('eth_getCode', [address, 'latest']);
// 2. Extract dangerous opcode patterns
const foundOpcodes = DANGEROUS_OPCODES.filter(op => {
const regex = new RegExp(`(^|[0-9a-f]{2})${op.hex}([0-9a-f]{2}|$)`);
return regex.test(hex);
});
// DANGEROUS_OPCODES:
// ff -- SELFDESTRUCT (CRITICAL)
// f4 -- DELEGATECALL (HIGH)
// f0 -- CREATE (MEDIUM)
// f5 -- CREATE2 (MEDIUM)
// 3. Extract PUSH4 function selectors (opcode 0x63)
for (let i = 0; i < hex.length - 10; i += 2) {
if (hex[i] === '6' && hex[i + 1] === '3') {
selectors.push('0x' + hex.slice(i + 2, i + 10));
}
}
// 4. Try to fetch verified Solidity source from Sourcify
const soliditySource = await fetchSourceFromSourcify(address);
// 5. Build prompt with source (if available) or bytecode only
const prompt = sourceAvailable
? `Audit this Solidity smart contract...
Solidity source (first 6000 chars):
\`\`\`solidity
${soliditySource}
\`\`\``
: `Audit this smart contract...
Bytecode size: ${hex.length / 2} bytes
Dangerous opcodes found: ${foundOpcodes.map(...)}
Function selectors extracted: ${uniqueSelectors.join(', ')}`;
The audit endpoint tries Hunyuan first, then falls back to Groq. If neither API key is configured, it returns a deterministic fallback audit with common vulnerability patterns.
4. GenLayer Consensus Guard
For incidents requiring strong validation guarantees, the system uses a GenLayer smart contract (contracts/genlayer/IncidentConsensusGuard.py).
Contract Structure
class IncidentConsensusGuard(gl.Contract):
owner: Address
operator: Address
incidents: TreeMap[str, str] # JSON-serialized incidents
incident_ids: TreeMap[str, str] # Index -> ID mapping
incident_count: u256
Incident Lifecycle
submit_incident() -> status: pending_consensus
│
▼
evaluate_incident() -> leader runs LLM
│ validator re-runs LLM
├─ consensus YES -> status: approved (or escalated if low confidence)
└─ consensus NO -> status: rejected
│
▼
mark_executed() -> status: executed
Consensus Mechanism
The core of the consensus system is gl.vm.run_nondet_unsafe(leader_fn, validator_fn):
def evaluate_incident(self, incident_id: str) -> dict:
incident = self._require_incident(incident_id)
def leader_fn():
prompt = f"""You are a decentralized incident-response validator...
Incident id: {incident['id']}
Protocol: {incident['protocol']}
Threat type: {incident['threat_type']}
Proposed action: {incident['proposed_action']}
Primary LLM confidence 0-100: {incident['llm_confidence']}
Return JSON with:
- approved: boolean
- severity: one of none, low, medium, high, critical
- confidence: integer 0-100
- action: one allowed action
- rationale: short reason"""
raw = gl.nondet.exec_prompt(prompt, response_format="json")
return self._normalize_assessment(raw, incident["proposed_action"])
def validator_fn(leaders_res):
# Re-run the same LLM prompt
validator_result = leader_fn()
leader_data = leaders_res.calldata
# Must agree on:
# - approved boolean
# - action string
# - severity (within 1 level: high->critical ok, high->low not ok)
# - confidence (within 20 points)
if leader_data["approved"] != validator_data["approved"]:
return False
if leader_data["action"] != validator_data["action"]:
return False
if abs(severity_weight(leader_data) - severity_weight(validator_data)) > 1:
return False
if abs(int(leader_data["confidence"]) - int(validator_data["confidence"])) > 20:
return False
return True
result = gl.vm.run_nondet_unsafe(leader_fn, validator_fn)
Allowlisted Actions
The contract only permits five actions:
ALLOWED_ACTIONS = {
"monitor_only": True,
"alert": True,
"pause_protocol": True,
"quarantine_address": True,
"multisig_proposal": True,
}
Any other action (transfer_funds, withdraw, ownership transfer) is rejected at the contract level.
Autonomous Threshold
If the primary LLM confidence is below 70%, the incident is automatically escalated to multisig:
if confidence < 70:
approved = False
action = "multisig_proposal"
rationale = "LLM confidence below autonomous threshold. Escalate. " + rationale
Frontend Integration
The TypeScript client (frontend/src/lib/genlayerConsensus.ts) wraps these contract calls:
export class IncidentConsensusGuardClient {
async submitIncident(input: ConsensusIncidentInput) {
return this.client.writeContract({
address: GENLAYER_CONSENSUS_GUARD_ADDRESS,
functionName: 'submit_incident',
args: [input.incidentId, input.protocol, input.txHash,
input.threatType, input.proposedAction,
input.llmReasoning, input.confidence],
});
}
async evaluateIncident(incidentId: string) { ... }
async getIncident(incidentId: string) { ... }
async listIncidents() { ... }
}
Confidence Levels and Actions
| Primary Confidence | Consensus Confidence | Status | Action |
|---|---|---|---|
| ≥ 90% | ≥ 80% | approved | Execute approved action |
| ≥ 70% | ≥ 70% | approved | Execute approved action |
| ≥ 70% | < 70% | escalated | Multisig proposal |
| < 70% | any | escalated | Multisig proposal |
| any | rejected | rejected | False positive -- monitor only |
Fallback Behavior
Every AI endpoint has a deterministic fallback for when API keys are not configured or API calls fail:
| Endpoint | Fallback |
|---|---|
/api/analyze | Fixed verdict: 0.91 confidence, CRITICAL severity, reentrancy evidence |
/api/compare | Both models return identical fallback verdicts, agreement: true |
/api/audit | Static analysis result: HIGH risk, reentrancy + unchecked returns |
Agent IncidentAnalyzer | Keyword matching: reentrant -> 0.98, otherwise 0.01 |
This ensures the system always produces output -- the dashboard never shows errors -- but the source field (live vs fallback) lets operators know whether the result came from a real AI or the fallback.
Next Steps
- Respond Pipeline -- How verified threats trigger emergency responses
- GenLayer Consensus -- Deep dive into the consensus guard contract