Engine Architecture
DeepZero’s orchestration engine operates on a strictly directional processing graph defined by pipelines. The execution engine guarantees fault-tolerant, resumable state management while fanning out parallel operations across a bounded thread pool (ThreadPoolExecutor).
Component Lifecycles
The data model transitions across these abstractions as a sample progresses:
Sample: The foundational struct yielded by anIngestProcessor. It binds a physicalsource_pathto a uniquesample_id(typically a SHA-256 hash or deterministic UUID) and initializes the sample’sdatadict.SampleState: The persistent record maintained by theStateStore. Tracks the sample’sverdict(PENDING,ACTIVE,FILTERED,FAILED,COMPLETED) and maintains ahistorymapping ofStageOutputinstances per processor.ProcessorEntry: The dynamically generated, memory-efficient facade passed into Map/BulkMap/Reduce processors. It injects a lazy-load mechanism (_store) which only retrieves historical execution data from disk when.historyor.upstream_data()is accessed.
Processing Paradigms
Processors dictate execution topology and concurrency models. All inherit from the base Processor class, exposing lifecycle hooks: validate(), setup(), process(), and teardown(). For instructions on writing your own, see Building Custom Processors.
IngestProcessor (1 → N)
- Type Signature:
process(ctx: ProcessorContext, target: Path) -> list[Sample] - Concurrency: Synchronous. Executes precisely once per pipeline run.
- Role: Generates the initial corpus. No upstream data exists prior to Ingest.
MapProcessor (1 → 1)
- Type Signature:
process(ctx: ProcessorContext, entry: ProcessorEntry) -> ProcessorResult - Concurrency: Threaded fan-out. Configured via the
parallel:field in pipeline YAML. - Constraints: Must be strictly thread-safe. Avoid shared mutable state.
BulkMapProcessor (N → N)
- Type Signature:
process(ctx: ProcessorContext, entries: list[ProcessorEntry]) -> list[ProcessorResult] - Concurrency: Synchronous execution over a subset or totality of active samples.
- Role: Optimizes external process invocations (e.g., launching a monolithic JVM tool or Semgrep batch scan) to amortize heavy startup costs across multiple samples. Outputs must be strictly indexed to match the input
entrieslist.
ReduceProcessor (N → M)
- Type Signature:
process(ctx: ProcessorContext, entries: list[ProcessorEntry]) -> list[str] - Concurrency: Global synchronization barrier. Pauses threaded execution until all active samples reach this stage.
- Role: Returns an ordered list of
sample_idstrings defining which samples survive. Any sample ID absent from the returned list is permanently tagged withSampleStatus.FILTERED.