ESAA-Security Audit Skill
Skill by ara.so — Security Skills collection.
Overview
ESAA-Security applies the Event Sourcing for Autonomous Agents (ESAA) architecture to automated security auditing. It executes structured security audits across 16 security domains with 95 executable checks, governed by an immutable append-only event log. Every finding, classification, and remediation decision is recorded as a verifiable fact.
Key differentiators:
- Deterministic audits — same repository state produces same findings via event replay
- Hallucination prevention — schema-validated outputs with evidence requirements
- Complete audit trail —
.roadmap/activity.jsonlrecords every check execution - Governed agents — PARCER contracts enforce decision hygiene and token budgets
- Verifiable reports — SHA-256 hash verification from events to final output
Installation
# Clone the repository
git clone https://github.com/elzobrito/ESAA-Security.git
cd ESAA-Security
# Install Python dependencies
pip install -r requirements.txt
# Set up environment variables
export OPENAI_API_KEY=$YOUR_OPENAI_KEY
export ANTHROPIC_API_KEY=$YOUR_ANTHROPIC_KEY # if using Claude
export AUDIT_TARGET_REPO="/path/to/repo"
Requirements:
- Python 3.9+
- LLM API access (OpenAI GPT-4, Anthropic Claude, or compatible)
- Target repository must be readable by the audit agent
Repository Structure
.roadmap/ # Event sourcing core
├── activity.jsonl # Immutable event store
├── roadmap.json # Derived audit progress
├── issues.json # Structured findings
├── AGENT_CONTRACT.yaml # Agent boundaries
├── ORCHESTRATOR_CONTRACT.yaml # State mutation rules
└── PROJECTION_SPEC.md # Event → state mapping
playbooks/
├── playbooks.security.json # 95 security checks
└── global_input_contract.json # Input requirements
reports/
├── phase1/ # Reconnaissance
├── phase2/ # Domain audits
├── phase3/ # Risk classification
├── phase4/ # Recommendations
└── final/ # Compiled report
Core Concepts
Event Store (activity.jsonl)
Every audit action is an immutable event:
{
"event_id": "evt_001",
"timestamp": "2026-05-14T10:30:00Z",
"event_type": "task.started",
"task_id": "SEC-010",
"phase": "phase2",
"domain": "authentication",
"agent": "agent-impl"
}
{
"event_id": "evt_002",
"timestamp": "2026-05-14T10:32:15Z",
"event_type": "check.completed",
"task_id": "SEC-010",
"check_id": "AU-002",
"status": "fail",
"severity": "high",
"finding": "Password stored without bcrypt/argon2",
"evidence": {
"file": "auth/user.py",
"line": 45,
"code_snippet": "user.password = request.form['password']"
},
"hash": "a3f8b2..."
}
Read Model (roadmap.json)
Projection of audit progress (derived from events):
{
"version": "0.4.0",
"phases": {
"phase1": {
"status": "done",
"tasks": {
"SEC-001": {"status": "done", "output": "reports/phase1/tech-stack.md"}
}
},
"phase2": {
"status": "in_progress",
"domains": {
"authentication": {
"checks_passed": 5,
"checks_failed": 3,
"tasks": ["SEC-010", "SEC-011"]
}
}
}
}
}
Running an Audit
Phase 1: Reconnaissance
# orchestrator.py
from esaa_security import Orchestrator, Agent
# Initialize orchestrator
orchestrator = Orchestrator(
event_store=".roadmap/activity.jsonl",
roadmap_path=".roadmap/roadmap.json",
target_repo=os.getenv("AUDIT_TARGET_REPO")
)
# Initialize reconnaissance agent
agent_spec = Agent(
role="agent-spec",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-spec.yaml"
)
# Execute reconnaissance phase
recon_tasks = ["SEC-001", "SEC-002", "SEC-003", "SEC-004"]
for task_id in recon_tasks:
result = agent_spec.execute_task(task_id, orchestrator.get_context())
orchestrator.validate_and_append(task_id, result)
Task outputs:
SEC-001: Tech stack inventory (languages, frameworks, dependencies)SEC-002: Architecture map (components, trust boundaries)SEC-003: Data flow diagram (inputs, storage, outputs)SEC-004: Attack surface enumeration (endpoints, file uploads, APIs)
Phase 2: Domain Audit Execution
# Load security playbooks
with open("playbooks/playbooks.security.json") as f:
playbooks = json.load(f)
# Initialize audit execution agent
agent_impl = Agent(
role="agent-impl",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-impl.yaml"
)
# Execute checks for a domain (e.g., Authentication)
auth_checks = ["AU-001", "AU-002", "AU-003", "AU-004", "AU-005", "AU-006", "AU-007", "AU-008"]
for check_id in auth_checks:
playbook = playbooks["checks"][check_id]
result = agent_impl.execute_check(
check_id=check_id,
playbook=playbook,
context=orchestrator.get_context()
)
# Orchestrator validates against schema
orchestrator.validate_and_append(
task_id=f"SEC-{check_id}",
result=result
)
Example check result:
# agent_impl output for AU-002 (Password Storage)
{
"check_id": "AU-002",
"status": "fail",
"severity": "critical",
"title": "Weak Password Hashing",
"description": "Passwords stored using SHA-256 instead of bcrypt/argon2",
"evidence": {
"files": ["auth/models.py"],
"lines": [67],
"code": "hashlib.sha256(password.encode()).hexdigest()"
},
"cwe": "CWE-916",
"owasp": "A02:2021 Cryptographic Failures",
"recommendation": "Replace SHA-256 with bcrypt (cost factor 12+)",
"references": [
"https://cheatsheetseries.owasp.org/cheatsheets/Password_Storage_Cheat_Sheet.html"
]
}
Phase 3: Risk Classification
# Initialize QA/risk agent
agent_qa = Agent(
role="agent-qa",
contract_path=".roadmap/AGENT_CONTRACT.yaml",
parcer_profile="PARCER_PROFILE.agent-qa.yaml"
)
# Classify vulnerabilities
findings = orchestrator.get_all_findings()
risk_matrix = agent_qa.classify_risks(findings)
# Write risk classification
with open("reports/phase3/risk-matrix.json", "w") as f:
json.dump(risk_matrix, f, indent=2)
Risk matrix output:
{
"critical": [
{"id": "AU-002", "cvss": 9.1, "exploitability": "high"}
],
"high": [
{"id": "IV-003", "cvss": 7.5, "exploitability": "medium"}
],
"medium": [
{"id": "SH-001", "cvss": 5.3, "exploitability": "low"}
],
"remediation_priority": ["AU-002", "AZ-001", "IV-003"]
}
Phase 4: Report Generation
# Generate final report
report = agent_qa.generate_report(
findings=orchestrator.get_all_findings(),
risk_matrix=risk_matrix,
context=orchestrator.get_context()
)
# Write final outputs
with open("reports/final/security-audit-report.md", "w") as f:
f.write(report["markdown"])
with open("reports/final/security-audit-report.json", "w") as f:
json.dump(report["structured"], f, indent=2)
Event Replay and Verification
# Verify audit determinism
from esaa_security import EventReplay, HashVerifier
# Replay events from scratch
replayer = EventReplay(event_store=".roadmap/activity.jsonl")
replayed_roadmap = replayer.project_roadmap()
# Compare hash
original_hash = HashVerifier.compute_hash(".roadmap/roadmap.json")
replayed_hash = HashVerifier.compute_hash(replayed_roadmap)
assert original_hash == replayed_hash, "Non-deterministic projection detected"
Security Domain Coverage
Critical Domains (8 total)
Secrets & Configuration (SC-001 to SC-008):
# Example: Check for hardcoded secrets
playbook = {
"check_id": "SC-001",
"title": "Hardcoded Secrets Detection",
"patterns": [
r'password\s*=\s*["\'][^"\']+["\']',
r'api_key\s*=\s*["\'][^"\']+["\']',
r'AWS_SECRET_ACCESS_KEY'
],
"severity": "critical"
}
Authentication (AU-001 to AU-008):
- Password hashing strength
- MFA enforcement
- Session token generation
- Credential transmission (HTTPS)
Authorization (AZ-001 to AZ-006):
- RBAC implementation
- Privilege escalation checks
- IDOR vulnerabilities
- API authorization
Input Validation (IV-001 to IV-007):
- SQL injection (ORM usage, parameterized queries)
- XSS (output encoding)
- Command injection
- Path traversal
Data Security (DA-001 to DA-005):
- Encryption at rest
- PII handling
- Data retention policies
High Priority Domains (7 total)
AI/LLM Security (AI-001 to AI-005):
# Example: Check for prompt injection vulnerabilities
playbook = {
"check_id": "AI-001",
"title": "Prompt Injection Defense",
"checks": [
"user_input_sanitization",
"system_prompt_isolation",
"output_validation",
"context_length_limits"
],
"severity": "high"
}
Configuration
Agent Contract (.roadmap/AGENT_CONTRACT.yaml)
agent_impl:
can:
- read: ["**/*.py", "**/*.js", "**/*.java", "config/**"]
- write: ["reports/phase2/**"]
- execute_checks: true
cannot:
- write: [".roadmap/activity.jsonl", ".roadmap/roadmap.json"]
- modify_state: true
- append_events: true
output_schema: "agent_result.schema.json"
token_budget: 8000
PARCER Profile (Token Budgets)
# PARCER_PROFILE.agent-impl.yaml
budget:
max_tokens: 8000
per_check: 500
context_window: 4000
fallback:
strategy: "map_reduce"
chunk_size: 2000
validation:
require_evidence: true
require_cwe_mapping: true
schema: "agent_result.schema.json"
Programmatic Usage
Custom Audit Pipeline
from esaa_security import AuditPipeline, SecurityDomain
# Define custom domain subset
domains = [
SecurityDomain.AUTHENTICATION,
SecurityDomain.AUTHORIZATION,
SecurityDomain.INPUT_VALIDATION,
SecurityDomain.AI_LLM_SECURITY
]
# Initialize pipeline
pipeline = AuditPipeline(
target_repo=os.getenv("AUDIT_TARGET_REPO"),
domains=domains,
event_store=".roadmap/activity.jsonl"
)
# Execute with streaming
for event in pipeline.execute_streaming():
if event["event_type"] == "check.completed":
print(f"✓ {event['check_id']}: {event['status']}")
elif event["event_type"] == "finding.detected":
print(f"⚠ {event['severity']}: {event['title']}")
# Get final report
report = pipeline.get_report()
Query Event Store
from esaa_security import EventQuery
query = EventQuery(".roadmap/activity.jsonl")
# Find all critical findings
critical = query.filter(
event_type="check.completed",
status="fail",
severity="critical"
).to_list()
# Get domain coverage
coverage = query.aggregate_by("domain")
# {"authentication": 8, "authorization": 6, ...}
# Audit timeline
timeline = query.timeline(group_by="1h")
Common Patterns
Incremental Audit (Skip Completed)
roadmap = orchestrator.load_roadmap()
for task_id in all_tasks:
if roadmap.get_task_status(task_id) == "done":
print(f"Skip {task_id} (already completed)")
continue
result = agent.execute_task(task_id)
orchestrator.validate_and_append(task_id, result)
Parallel Domain Execution
from concurrent.futures import ThreadPoolExecutor
def audit_domain(domain_name, checks):
agent = Agent(role="agent-impl")
results = []
for check_id in checks:
result = agent.execute_check(check_id)
results.append(result)
return domain_name, results
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(audit_domain, domain, checks): domain
for domain, checks in domain_map.items()
}
for future in as_completed(futures):
domain, results = future.result()
for result in results:
orchestrator.validate_and_append(result)
Export to SARIF
from esaa_security import SARIFExporter
exporter = SARIFExporter(event_store=".roadmap/activity.jsonl")
sarif = exporter.to_sarif()
with open("security-audit.sarif", "w") as f:
json.dump(sarif, f, indent=2)
Troubleshooting
Issue: Schema Validation Failure
# Error: agent_result failed schema validation
# Fix: Check output structure matches agent_result.schema.json
# Validate manually
from jsonschema import validate
import json
with open(".roadmap/agent_result.schema.json") as f:
schema = json.load(f)
with open("reports/phase2/results/SEC-010.json") as f:
result = json.load(f)
validate(instance=result, schema=schema) # Raises ValidationError with details
Issue: Event Store Corruption
# Verify event store integrity
from esaa_security import EventStoreValidator
validator = EventStoreValidator(".roadmap/activity.jsonl")
errors = validator.validate()
if errors:
print("Corrupt events:")
for err in errors:
print(f"Line {err['line']}: {err['message']}")
else:
print("✓ Event store valid")
Issue: Non-Deterministic Replay
# Debug: Find which event causes divergence
from esaa_security import ReplayDebugger
debugger = ReplayDebugger(
event_store=".roadmap/activity.jsonl",
expected_roadmap=".roadmap/roadmap.json"
)
divergent_event = debugger.find_divergence()
print(f"Divergence at event: {divergent_event['event_id']}")
print(f"Expected: {divergent_event['expected_state']}")
print(f"Actual: {divergent_event['actual_state']}")
Issue: Agent Exceeds Token Budget
# Error: Agent exceeded 8000 token budget
# Fix: Enable Map-Reduce fallback in PARCER profile
# PARCER_PROFILE.agent-impl.yaml
fallback:
strategy: "map_reduce"
chunk_size: 2000
max_chunks: 10
# Or reduce context window
budget:
context_window: 3000 # from 4000
Issue: Missing Evidence in Findings
# Orchestrator rejects findings without evidence
# Fix: Ensure agent output includes code snippets
# Valid finding structure
{
"check_id": "IV-001",
"status": "fail",
"evidence": {
"file": "api/routes.py",
"line": 23,
"code_snippet": "query = f\"SELECT * FROM users WHERE id={user_id}\""
}
}
Integration with CI/CD
GitHub Actions
# .github/workflows/security-audit.yml
name: ESAA Security Audit
on: [push, pull_request]
jobs:
audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run ESAA-Security Audit
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
AUDIT_TARGET_REPO: ${{ github.workspace }}
run: |
pip install -r requirements.txt
python orchestrator.py --full-audit
- name: Upload SARIF
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: security-audit.sarif
- name: Fail on Critical Findings
run: |
python -c "import json; \
report = json.load(open('reports/final/security-audit-report.json')); \
exit(1 if report['critical_count'] > 0 else 0)"
Best Practices
- Always verify event store integrity before generating reports
- Use deterministic replay to validate audit reproducibility
- Configure token budgets per agent role to prevent runaway costs
- Enable Map-Reduce fallback for large repositories (>10k LOC)
- Review PARCER profiles to adjust validation strictness
- Export to SARIF for GitHub Security tab integration
- Archive
.roadmap/directory for audit forensics

