AsyncOmni Architecture (Qwen3-Omni Example)¶
1. System Architecture¶
• ┌─────────────────────────────────────────────────────────────────────────────────┐
│ API Layer │
│ ┌─────────────────────────────────────┐ ┌──────────────────────────────────┐ │
│ │ AsyncOmni (EngineClient) │ │ Omni │ │
│ │ • generate() / abort() / shutdown() │ │ • generate() │ │
│ │ • _final_output_handler() │ │ | │
│ └─────────────────────────────────────┘ └──────────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────────────────────┤
│ Engine Layer (Proxy) │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ AsyncOmniEngine │ │
│ │ • _bootstrap_orchestrator() & _initialize_stages() │ │
│ │ • add_request() / add_request_async() -> input_processor.process_inputs() │ │
│ │ • try_get_output() / try_get_output_async() │ │
│ └───────────────────┬─────────────────────────────────▲─────────────────────┘ │
│ request_queue (janus.Queue) output_queue (janus.Queue) │
├──────────────────────┼─────────────────────────────────┼────────────────────────┤
│ ▼ Orchestration Layer │ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ Orchestrator [background thread] │ │
│ │ • _request_handler() │ │
│ │ - stage_client.add_request_async() & _prewarm_async_chunk_stages() │ │
│ │ • _orchestration_output_handler() │ │
│ │ - _process_stage_outputs() -> output_processors[i].process_outputs() │ │
│ │ - _route_output() & _forward_to_next_stage() │ │
│ └──────────┬─────────────────────────┬────────────────────────┬─────────────┘ │
├─────────────┼─────────────────────────┼────────────────────────┼────────────────┤
│ │ Communication Layer │ │
│ ┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ │
│ │ StageEngineCoreClient │ │ StageEngineCoreClient │ │ StageDiffusionClient │ │
│ │ • ZMQ ROUTER / PULL │ │ • ZMQ ROUTER / PULL │ │ • ZMQ ROUTER / PULL │ │
│ │ • Msgpack codec │ │ • Msgpack codec │ │ • Msgpack codec │ │
│ └──────────┬────────────┘ └──────────┬────────────┘ └──────────┬────────────┘ │
│ ▼ ZMQ IPC ▼ ZMQ IPC ▼ ZMQ IPC │
├─────────────────────────────────────────────────────────────────────────────────┤
│ Execution Layer │
│ ┌───────────────────────┐ ┌───────────────────────┐ ┌───────────────────────┐ │
│ │ StageCoreProc │ │ StageCoreProc │ │ DiffusionEngine │ │
│ │ [background process] │ │ [background process] │ │ [background process] │ │
│ └───────────────────────┘ └───────────────────────┘ └───────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────────┘
2. Execution Flow (Arrow Steps, one generate request)¶
[1] App
-> AsyncOmni.generate(prompt, request_id)
[2] AsyncOmni
-> _final_output_handler() (started on first request)
-> AsyncOmniEngine.add_request(stage_id=0, ...)
[3] AsyncOmniEngine.add_request
-> (if stage-0 is llm and input is not EngineCoreRequest)
InputProcessor.process_inputs()
OutputProcessor[0].add_request()
-> request_queue.put(add_request_msg)
[4] Orchestrator._request_handler
-> _handle_add_request(msg)
-> stage_clients[0].add_request_async(...)
[5] Orchestrator._orchestration_loop (loop)
-> poll stage output
- llm stage: await get_output_async()
- diffusion stage: get_diffusion_output_nowait()
-> (llm stage) output_processors[i].process_outputs(...)
-> _route_output(...)
-> if finished and not final_stage and non-async-chunk:
_forward_to_next_stage(...)
-> next_stage.add_request_async(...)
-> output_queue.put(output)
[6] AsyncOmni._final_output_loop (background coroutine)
-> AsyncOmniEngine.try_get_output_async()
-> route by request_id to ClientRequestState.queue
[7] AsyncOmni._process_orchestrator_results
-> read from ClientRequestState.queue
-> _process_single_result(...)
-> yield OmniRequestOutput
[8] Exit condition
-> receive result["finished"] == True
-> generate() ends
3. Runtime Sequence (one generate request)¶
sequenceDiagram
participant APP as App
participant AO as AsyncOmni
participant ENG as AsyncOmniEngine
participant ORCH as Orchestrator
participant S0 as Stage-0 Client
participant SN as Next Stage Client
APP->>AO: generate
AO->>AO: start output_handler once
AO->>ENG: add_request(stage_id=0, ...)
ENG->>ENG: input_processor.process_inputs()
ENG->>ORCH: request_queue.put(add_request)
ORCH->>ORCH: _handle_add_request
ORCH->>S0: add_request_async
loop poll route forward
ORCH->>S0: get_output_async / get_diffusion_output_nowait
ORCH->>ORCH: _route_output
alt need forward to next stage
ORCH->>SN: add_request_async
end
ORCH-->>ENG: output_queue.put
end
AO->>ENG: try_get_output_async
ENG-->>AO: message
AO-->>APP: yield OmniRequestOutput 4. Comparison¶
Previous topology (reference):
┌────────────────────────────────────────────────────────────────────────────┐
│ Main Process │
│ ┌──────────────────────┐ ┌────────────────────────────────────────────┐ │
│ │ generate() │ │ final_output_handler() │ │
│ └──────────────────────┘ └────────────────────────────────────────────┘ │
└──────────┬─────────────────────────┬─────────────────────────┬─────────────┘
mp.Queue (in_q/out_q) mp.Queue (in_q/out_q) mp.Queue (in_q/out_q)
▼▲ ▼▲ ▼▲
┌───────────────────────┐ ┌───────────────────────┐ ┌──────────────────────┐
│ Worker Proc-0 │ │ Worker Proc-1 │ │ Worker Proc-2 │
│ (Thinker LLM) │ │ (Talker LLM) │ │ (Vocoder) │
│ ┌────────────────┐ │ │ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │_stage_worker │ │ │ │_stage_worker │ │ │ │_stage_worker │ │
│ │_async() │ │ │ │_async() │ │ │ │_async() │ │
│ └────────────────┘ │ │ └────────────────┘ │ │ └────────────────┘ │
│ ┌────────────────┐ │ │ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │output_handler()│ │ │ │output_handler()│ │ │ │output_handler()│ │
│ └────────────────┘ │ │ └────────────────┘ │ │ └────────────────┘ │
└──────────┬────────────┘ └──────────┬────────────┘ └──────────┬───────────┘
ZMQ ▼ ▲ ZMQ ZMQ ▼ ▲ ZMQ ZMQ ▼ ▲ ZMQ
┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐
│ EngineCore Proc-0 │ │ EngineCore Proc-1 │ │ EngineCore Proc-2 │
│ (Thinker) │ │ (Talker) │ │ (Vocoder) │
└──────────────────────┘ └──────────────────────┘ └──────────────────────┘
Current topology:
┌────────────────────────────────────────────────────────────────────────────┐
│ Main Process │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Main Thread │ │
│ │ ┌──────────────────────┐ ┌─────────────────────────────────────┐ │ │
│ │ │ generate() │ │ final_output_handler() │ │ │
│ │ └──────────────────────┘ └─────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ janus.Queue (request_queue) ▼ ▲ janus.Queue (output_queue) │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ Orchestrator Thread │ │
│ │ ┌──────────────────────┐ ┌──────────────────────────────────────┐ │ │
│ │ │ _request_handler() │ │ _orchestration_output_handler() │ │ │
│ │ └──────────────────────┘ └──────────────────────────────────────┘ │ │
│ │ ┌────────────────────────────────────────────────────────────────┐ │ │
│ │ │ _orchestration_loop(): poll/process/route outputs for all stages│ │ │
│ │ └────────────────────────────────────────────────────────────────┘ │ │
│ └───────┬─────────────────────────┬─────────────────────────┬──────────┘ │
└──────────┬─────────────────────────┬─────────────────────────┬─────────────┘
ZMQ ▼ ▲ ZMQ ZMQ ▼ ▲ ZMQ ZMQ ▼ ▲ ZMQ
┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────────┐
│ EngineCore Proc-0 │ │ EngineCore Proc-1 │ │ EngineCore Proc-2 │
│ (Thinker) │ │ (Talker) │ │ (Vocoder) │
└──────────────────────┘ └──────────────────────┘ └──────────────────────┘
Test scripts:
# enter offline inference folder.
cd examples/offline_inference/qwen2_5_omni
python end2end.py --output-dir output_audio --query-type use_mixed_modalities
cd ../qwen3_omni
python end2end.py --output-dir output_audio --query-type text --async-chunk --enable-stats
cd ../bagel
python end2end.py --prompts "A cute cat"
cd ../text_to_image
python text_to_image.py --prompt "a cup of coffee on the table" --output output.png