Engine Architecture

DeepZero’s orchestration engine operates on a strictly directional processing graph defined by pipelines. The execution engine guarantees fault-tolerant, resumable state management while fanning out parallel operations across a bounded thread pool (ThreadPoolExecutor).

INGEST 1 → N MAP MAP MAP REDUCE N → M

Component Lifecycles

The data model transitions across these abstractions as a sample progresses:

  1. Sample: The foundational struct yielded by an IngestProcessor. It binds a physical source_path to a unique sample_id (typically a SHA-256 hash or deterministic UUID) and initializes the sample’s data dict.
  2. SampleState: The persistent record maintained by the StateStore. Tracks the sample’s verdict (PENDING, ACTIVE, FILTERED, FAILED, COMPLETED) and maintains a history mapping of StageOutput instances per processor.
  3. ProcessorEntry: The dynamically generated, memory-efficient facade passed into Map/BulkMap/Reduce processors. It injects a lazy-load mechanism (_store) which only retrieves historical execution data from disk when .history or .upstream_data() is accessed.

Processing Paradigms

Processors dictate execution topology and concurrency models. All inherit from the base Processor class, exposing lifecycle hooks: validate(), setup(), process(), and teardown(). For instructions on writing your own, see Building Custom Processors.

IngestProcessor (1 → N)

  • Type Signature: process(ctx: ProcessorContext, target: Path) -> list[Sample]
  • Concurrency: Synchronous. Executes precisely once per pipeline run.
  • Role: Generates the initial corpus. No upstream data exists prior to Ingest.

MapProcessor (1 → 1)

  • Type Signature: process(ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult
  • Concurrency: Threaded fan-out. Configured via the parallel: field in pipeline YAML.
  • Constraints: Must be strictly thread-safe. Avoid shared mutable state.

BulkMapProcessor (N → N)

  • Type Signature: process(ctx: ProcessorContext, entries: list[ProcessorEntry]) -> list[ProcessorResult]
  • Concurrency: Synchronous execution over a subset or totality of active samples.
  • Role: Optimizes external process invocations (e.g., launching a monolithic JVM tool or Semgrep batch scan) to amortize heavy startup costs across multiple samples. Outputs must be strictly indexed to match the input entries list.

ReduceProcessor (N → M)

  • Type Signature: process(ctx: ProcessorContext, entries: list[ProcessorEntry]) -> list[str]
  • Concurrency: Global synchronization barrier. Pauses threaded execution until all active samples reach this stage.
  • Role: Returns an ordered list of sample_id strings defining which samples survive. Any sample ID absent from the returned list is permanently tagged with SampleStatus.FILTERED.