Building Custom Processors
Custom logic is injected into DeepZero by subclassing the typed abstractions provided in deepzero.engine.stage. See the Core Architecture to understand how these stages map to execution phases.
Processor Context (ProcessorContext)
The ctx object injected into every lifecycle hook provides systemic context and utilities:
ctx.pipeline_dir: Root directory of the invoked pipeline, useful for resolving local templates or rulesets.ctx.global_config: TypedDict containing pipeline-levelsettings,knowledge, and the configuredmodel.ctx.llm: A generic proxy instance (implementingLLMProtocol) tied to LiteLLM, abstracting provider APIs with native backoff/retry.ctx.log: Pre-configured, namespacedlogging.Logger.
Execution Hooks
Processors implement specific lifecycle hooks invoked by the engine:
validate(ctx: ProcessorContext) -> list[str]: Invoked during initial schema validation (viadeepzero validateor prior to execution). Return a list of error strings if dependencies (e.g., missing binaries, unparseable YAMLs) fail.setup(global_config: dict) -> None: Executed exactly once before the thread pool activates.process(...): The core operation. Signature varies by ProcessorType.teardown() -> None: Executed upon pipeline completion or fatal interruption.
Defining Processor Configuration
Use an embedded @dataclass named Config to strictly define and validate accepted YAML configurations. The engine parses the Pipeline YAML dictionary and instantiates your Config object, fully expanding ${ENV_VARS} automatically using standard Python dataclasses.
from dataclasses import dataclass
from deepzero.engine.stage import MapProcessor, ProcessorContext, ProcessorEntry, ProcessorResult
class BinaryAnalyzer(MapProcessor):
description = "Static heuristics extraction"
@dataclass
class Config:
target_arch: str = "x86_64"
max_entropy: float = 7.5
def process(self, ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult:
# Retrieve data emitted by an upstream IngestProcessor
sha = entry.upstream_data("discover", "sha256", default="UNKNOWN")
# ProcessorEntry binds to an isolated sandbox per-sample
out_file = entry.sample_dir / "heuristics.json"
out_file.write_text('{"entropy": 7.1}')
# ProcessorResult determines routing
if self.config.target_arch != "x86_64":
return ProcessorResult.filter(reason="unsupported_arch")
return ProcessorResult.ok(
artifacts={"heuristics": "heuristics.json"},
data={"analyzed": True, "entropy_ok": True}
)
Result Outcomes (ProcessorResult)
Map/BulkMap processors must return a strictly formatted ProcessorResult:
.ok(data={...}, artifacts={...}): Flags execution asCOMPLETED. Thedatadict is namespaced to the processor’s history block.artifactsmaps keys to relative file paths..filter(reason="...", data={...}): Terminates sample processing silently.SampleState.verdictmutates toFILTERED..fail(error="..."): Terminates processing due to a fatal, unexpected error.SampleState.verdictmutates toFAILED.
Accessing Upstream Data
Processors inherently rely on artifacts and data generated by predecessors. The ProcessorEntry facade exposes lazy-loaded helper methods to retrieve this data seamlessly:
# Shorthand extraction of a specific field (with fallback)
sha = entry.upstream_data("discover", "sha256", default="")
# Full extraction of a previous stage's StageOutput object
output = entry.upstream("scan")
findings = output.data.get("finding_count", 0)
json_file = output.artifacts.get("findings_file")
Base Classes
Depending on your pipeline topology requirements, extend the appropriate base class:
| Base Class | Processor Type | process() Signature |
|---|---|---|
IngestProcessor |
ingest |
(ctx, target: Path) → list[Sample] |
MapProcessor |
map |
(ctx, entry: ProcessorEntry) → ProcessorResult |
BulkMapProcessor |
bulk_map |
(ctx, entries: list[ProcessorEntry]) → list[ProcessorResult] |
ReduceProcessor |
reduce |
(ctx, entries: list[ProcessorEntry]) → list[str] |