
During my time as AI Engineer I was tasked with building an asynchronous, modular pipeline for real-time AI inference and decisional processes.
The functional requirements were the following:
Solution
The solution proposed is to have each pipeline block to be derived from an abstract block class like this:
class Block(ABC):
def __init__(self, id: str) -> None:
self.id = id
self._output_callback: Optional[Callable[[BlockOutput], Awaitable[None]]] = None
def set_output_callback(self, callback: Callable[[BlockOutput], Awaitable[None]]) -> None:
"""Called by pipeline to inject the router's output handler"""
self._output_callback = callback
@abstractmethod
async def transform(self, input: BlockOutput) -> BlockOutput: ...
async def process_input(self, input: BlockOutput) -> None:
"""Process data and emit results"""
result = await self._transform(input)
await self._output_callback(result)
@abstractmethod
async def run(self) -> None: ...
This is the blueprint for a pipeline block. This structure can be used to route a single block’s output to one or more connected subsequent blocks.
Last Update
Currently, I am re-building this pipeline in Rust. The strong language typedness should result in an easier to manage codebase.
Disclaimer
While the pipeline itself is open-source, the use cases are company property, therefore I am not allowed to disclose much about the pipeline architecture either.
For additional info, you can contact me at [email protected] .