Custom Pipeline Extension Guide¶
Transformer already support Custom Pipeline via https://github.com/huggingface/diffusers/blob/main/docs/source/en/using-diffusers/custom_pipeline_overview.md
This guide demonstrates how to use the newly added features for extending vLLM-Omni's diffusion pipeline with custom functionality.
Overview¶
Three main features enable custom pipeline extension:
WorkerWrapperBase: A wrapper class that enables dynamic worker extension with custom functionalityload_format: A parameter that controls how diffusion models are loaded, including support for custom pipelinesCustomPipelineWorkerExtension: An extension class that enables pipeline re-initialization with custom implementations
Features¶
WorkerWrapperBase¶
WorkerWrapperBase is a wrapper class that creates DiffusionWorker instances with optional extension support. It enables dynamic inheritance, allowing you to add custom methods and functionality to workers without modifying the base worker class.
Key capabilities: - Dynamic worker class extension via worker_extension_cls - Support for custom pipeline initialization via custom_pipeline_args - Method delegation to underlying worker - Attribute access forwarding
Location: vllm_omni/diffusion/worker/diffusion_worker.py
load_format Parameter¶
The load_format parameter controls how diffusion models are loaded. It supports the following values:
"default": Standard model loading using the model registry (default behavior)"custom_pipeline": Load a custom pipeline class specified bycustom_pipeline_name"dummy": Skip model loading (useful for testing or when pipeline will be initialized separately)
Location: vllm_omni/diffusion/model_loader/diffusers_loader.py
CustomPipelineWorkerExtension¶
CustomPipelineWorkerExtension is a mixin class that extends DiffusionWorker with the ability to re-initialize the pipeline with a custom implementation.
Key method: - re_init_pipeline(custom_pipeline_args): Re-initializes the pipeline with custom arguments, properly cleaning up the old pipeline first
Location: vllm_omni/diffusion/worker/diffusion_worker.py
Usage Example¶
Step 1: Create a Custom Pipeline¶
Create a custom pipeline class that extends an existing pipeline. In this example, we extend QwenImageEditPipeline to add trajectory tracking:
# custom_pipeline.py
from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig
from vllm_omni.diffusion.models.qwen_image.pipeline_qwen_image_edit import QwenImageEditPipeline
import torch
class CustomPipeline(QwenImageEditPipeline):
def __init__(self, *, od_config: OmniDiffusionConfig, prefix: str = ""):
super().__init__(od_config=od_config, prefix=prefix)
def forward(self, req, prompt=None, negative_prompt=None, **kwargs):
# Call parent's forward to get normal output
output = super().forward(req=req, prompt=prompt, negative_prompt=negative_prompt, **kwargs)
# Add custom trajectory data
actual_num_steps = req.sampling_params.num_inference_steps or kwargs.get('num_inference_steps', 50)
output.trajectory_timesteps = torch.linspace(1000, 0, actual_num_steps, dtype=torch.float32)
output.trajectory_latents = torch.randn(actual_num_steps, 1, 16, 64, 64, dtype=torch.float32)
return output
Step 2: Use the Custom Pipeline with Omni¶
Initialize the Omni engine with custom pipeline configuration:
from vllm_omni.entrypoints.omni import Omni
from vllm_omni.inputs.data import OmniDiffusionSamplingParams
# Initialize with custom pipeline
omni = Omni(
model="Qwen/Qwen-Image-Edit",
diffusion_load_format="dummy", # Skip initial loading
custom_pipeline_args={
"pipeline_class": "custom_pipeline.CustomPipeline"
},
)
# Generate with the custom pipeline
outputs = omni.generate(
...
)
# Access custom trajectory data
output = outputs[0].request_output
print(f"Trajectory timesteps shape: {output.metrics['trajectory_timesteps'].shape}")
print(f"Trajectory latents shape: {output.latents.shape}")
Step 3: Run the Example¶
The example provided in this directory demonstrates the complete workflow:
cd examples/offline_inference/custom_pipeline/image_to_image
# Run with custom pipeline
python image_edit.py \
--model Qwen/Qwen-Image-Edit-2511 \
--image cherry_blossom.jpg \
--prompt "Let this mascot dance under the moon, surrounded by floating stars" \
--output output_image_edit.png \
--num-inference-steps 10
Advanced Usage¶
Custom Worker Extension¶
You can create custom worker extensions to add new methods beyond pipeline re-initialization:
from typing import Any
from vllm_omni.diffusion.worker.diffusion_worker import CustomPipelineWorkerExtension
class MyCustomExtension(CustomPipelineWorkerExtension):
def custom_method(self):
"""Your custom worker method."""
return "custom_result"
def another_method(self, data: Any):
"""Another custom method."""
# Access worker internals via self
return self.model_runner.some_operation(data)
omni = Omni(
model="Qwen/Qwen-Image-Edit",
diffusion_load_format="dummy",
custom_pipeline_args={
"pipeline_class": "custom_pipeline.CustomPipeline"
},
worker_extension_cls=MyCustomExtension,
# Note: worker_extension_cls is an internal parameter
# CustomPipelineWorkerExtension will automatically init pipeline when custom_pipeline_args is provided
)