pipeline.webp

During my time as AI Engineer I was tasked with building an asynchronous, modular pipeline for real-time AI inference and decisional processes.

The functional requirements were the following:

Solution

The solution proposed is to have each pipeline block to be derived from an abstract block class like this:

class Block(ABC):
  def __init__(self, id: str) -> None:
    self.id = id
    self._output_callback: Optional[Callable[[BlockOutput], Awaitable[None]]] = None
  
  def set_output_callback(self, callback: Callable[[BlockOutput], Awaitable[None]]) -> None:
    """Called by pipeline to inject the router's output handler"""
    self._output_callback = callback
  
  @abstractmethod
  async def transform(self, input: BlockOutput) -> BlockOutput: ...
  
  async def process_input(self, input: BlockOutput) -> None:
    """Process data and emit results"""
    result = await self._transform(input)
    await self._output_callback(result)
  
  @abstractmethod
  async def run(self) -> None: ...

This is the blueprint for a pipeline block. This structure can be used to route a single block’s output to one or more connected subsequent blocks.

Last Update

Currently, I am re-building this pipeline in Rust. The strong language typedness should result in an easier to manage codebase.

Disclaimer

While the pipeline itself is open-source, the use cases are company property, therefore I am not allowed to disclose much about the pipeline architecture either.

For additional info, you can contact me at [email protected] .