Building Custom Processors

Custom logic is injected into DeepZero by subclassing the typed abstractions provided in deepzero.engine.stage. See the Core Architecture to understand how these stages map to execution phases.

Processor Context (ProcessorContext)

The ctx object injected into every lifecycle hook provides systemic context and utilities:

  • ctx.pipeline_dir: Root directory of the invoked pipeline, useful for resolving local templates or rulesets.
  • ctx.global_config: TypedDict containing pipeline-level settings, knowledge, and the configured model.
  • ctx.llm: A generic proxy instance (implementing LLMProtocol) tied to LiteLLM, abstracting provider APIs with native backoff/retry.
  • ctx.log: Pre-configured, namespaced logging.Logger.

Execution Hooks

Processors implement specific lifecycle hooks invoked by the engine:

  1. validate(ctx: ProcessorContext) -> list[str]: Invoked during initial schema validation (via deepzero validate or prior to execution). Return a list of error strings if dependencies (e.g., missing binaries, unparseable YAMLs) fail.
  2. setup(global_config: dict) -> None: Executed exactly once before the thread pool activates.
  3. process(...): The core operation. Signature varies by ProcessorType.
  4. teardown() -> None: Executed upon pipeline completion or fatal interruption.

Defining Processor Configuration

Use an embedded @dataclass named Config to strictly define and validate accepted YAML configurations. The engine parses the Pipeline YAML dictionary and instantiates your Config object, fully expanding ${ENV_VARS} automatically using standard Python dataclasses.

from dataclasses import dataclass
from deepzero.engine.stage import MapProcessor, ProcessorContext, ProcessorEntry, ProcessorResult

class BinaryAnalyzer(MapProcessor):
    description = "Static heuristics extraction"

    @dataclass
    class Config:
        target_arch: str = "x86_64"
        max_entropy: float = 7.5

    def process(self, ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult:
        # Retrieve data emitted by an upstream IngestProcessor
        sha = entry.upstream_data("discover", "sha256", default="UNKNOWN")
        
        # ProcessorEntry binds to an isolated sandbox per-sample
        out_file = entry.sample_dir / "heuristics.json"
        out_file.write_text('{"entropy": 7.1}')
        
        # ProcessorResult determines routing
        if self.config.target_arch != "x86_64":
            return ProcessorResult.filter(reason="unsupported_arch")
            
        return ProcessorResult.ok(
            artifacts={"heuristics": "heuristics.json"},
            data={"analyzed": True, "entropy_ok": True}
        )

Result Outcomes (ProcessorResult)

Map/BulkMap processors must return a strictly formatted ProcessorResult:

  • .ok(data={...}, artifacts={...}): Flags execution as COMPLETED. The data dict is namespaced to the processor’s history block. artifacts maps keys to relative file paths.
  • .filter(reason="...", data={...}): Terminates sample processing silently. SampleState.verdict mutates to FILTERED.
  • .fail(error="..."): Terminates processing due to a fatal, unexpected error. SampleState.verdict mutates to FAILED.

Accessing Upstream Data

Processors inherently rely on artifacts and data generated by predecessors. The ProcessorEntry facade exposes lazy-loaded helper methods to retrieve this data seamlessly:

# Shorthand extraction of a specific field (with fallback)
sha = entry.upstream_data("discover", "sha256", default="")

# Full extraction of a previous stage's StageOutput object
output = entry.upstream("scan")
findings = output.data.get("finding_count", 0)
json_file = output.artifacts.get("findings_file")

Base Classes

Depending on your pipeline topology requirements, extend the appropriate base class:

Base Class Processor Type process() Signature
IngestProcessor ingest (ctx, target: Path) → list[Sample]
MapProcessor map (ctx, entry: ProcessorEntry) → ProcessorResult
BulkMapProcessor bulk_map (ctx, entries: list[ProcessorEntry]) → list[ProcessorResult]
ReduceProcessor reduce (ctx, entries: list[ProcessorEntry]) → list[str]