RepoClarifaiClarifaipublished Jul 15, 2025seen 5d

Clarifai/pipeline-engine

Python

Open original ↗

Captured source

source ↗
published Jul 15, 2025seen 5dcaptured 9hhttp 200method plain

Clarifai/pipeline-engine

Language: Python

License: NOASSERTION

Stars: 1

Forks: 1

Open issues: 0

Created: 2025-07-15T19:15:16Z

Pushed: 2025-08-21T15:39:51Z

Default branch: main

Fork: no

Archived: no

README: video-pipeline (title todo) ==============

A simple pipeline engine in python with minimal dependencies.

Features

  • Simple and easy to use
  • Minimal dependencies
  • Multi-threaded with multi-process support
  • Adaptive rate limiting
  • DAG-based execution
  • Extensible and customizable

Installation

git clone git@github.com:deigen/video-pipeline.git
cd video-pipeline
pip install .

Usage

Example detect + track pipeline with multiprocessing and two detector instances:

import pipeline as pl
from pipeline.detector import HFDetector
from pipeline.tracker import Tracker

def main():
reader = pl.VideoReader("video.mp4")

detector = pl.Multiprocess(HFDetector, model_name='PekingU/rtdetr_v2_r18vd')
detector.num_instances(2) # Run two instances of the detector in parallel

tracker = pl.Multiprocess(Tracker)

printer = pl.Print(
lambda data:
f'pts={data.pts} tracked={len(data.tracked_objects.tracker_id)} detected={len(data.detections["boxes"])} fps={engine.get_fps():.3f}'
)

engine = pl.PipelineEngine()

engine.add(
reader
| detector
| tracker
| printer
)

engine.run()

if __name__ == "__main__":
main()

See the [examples](examples) directory for additional examples, including real-time processing, adaptive rate limiting, writing to video files and rtsp streams, and other components.

Implementation Overview =======================

Each Component in the pipeline runs in its own thread, consuming data as it becomes ready for that component. Data is sent to each component by the pipeline Engine, which continuously checks dependencies between components. Dependencies are declared using the | operator.

Components can also be run in a separate process by wrapping them in a Multiprocess, and can run with more than one process instance using .num_instances(n).

Even though components run in separate threads/processes, data (frames) are always passed through each component *in order*. This means that operations like tracking can be done on the in-order serialized stream, even when other components are run in parallel.

In contrast to some other pipeline engines, sources and sinks are not handled as special components. Instead, they are just regular components that can add or read data fields. An infinite stream of empty data objects is created by the engine for the first components to populate (e.g. VideoReader), which is then passed through the rest of the pipeline.

![Pipeline Illustration](readme-pipeline.png)

Components

Components are the building blocks of the pipeline. Each runs in its own thread (or threads) and processes data as it becomes available. Data processing is done by the process method, which takes a FrameData object as input and modifies it in place.

class MyComponent:
def process(self, data: FrameData):
# Process the data and modify it in place
frame = data.frame # Access the frame
data.img_mean = np.asarray(frame).mean() # Example processing

FrameData class

The FrameData class is used to pass data between components. It is a simple storage class that holds key-value pairs. Names of the keys ("frame", "pts", etc) can be established by convention and/or remapped to different names during pipeline setup.

Mapping Field Names

Data fields used by components can be mapped during pipeline setup in order to run components that expect different field names, using the .fields() method. For example:

class Annotate(pl.Component):
def process(self, data: pl.FrameData):
# Annotate the frame with detection boxes using an outside annotate_frame function
data.annotated_frame = annotate_frame(data.frame, data.detections)

# video writer whose data.frame in its process function is mapped to data.annotated_frame
writer = pl.VideoWriter("output.mp4").fields(frame='annotated_frame')

Dropping Frames and Rate Limiting

If a component decides a frame should be dropped from further processing, it can signal this to the engine by raising a Drop exception. The engine will then mark the frame as dropped and skip processing by downstream components.

Rate limiting can be done by using the FixedRateLimiter or AdaptiveRateLimiter components, which limit their output rate either to a fixed number of frames per second, or adaptively to match the throughput of the pipeline. Frames are dropped if the output rate is too high (though this can be configured to sleep instead of dropping frames). See the [examples](examples) directory for examples of rate limiting.

Stopping the Pipeline

The pipeline can be ended by calling the stop() method on the PipelineEngine instance, or by any component themselves by raising the StreamEnd exception. This will tell the engine to stop creating new data objects, finish processing any in-process data, and exit.

Notability

notability 1.0/10

Low traction new repo