Skip to content

vllm_omni.metrics.stats

E2E_EXCLUDE module-attribute

E2E_EXCLUDE = {'request_id'}

E2E_FIELDS module-attribute

E2E_FIELDS = _build_field_defs(
    RequestE2EStats, E2E_EXCLUDE, FIELD_TRANSFORMS
)

FIELD_TRANSFORMS module-attribute

FIELD_TRANSFORMS: dict[
    str, tuple[str, Callable[[Any], Any]]
] = {
    "rx_transfer_bytes": (
        "rx_transfer_kbytes",
        lambda v: v / 1024.0,
    ),
    "size_bytes": ("size_kbytes", lambda v: v / 1024.0),
    "transfers_total_bytes": (
        "transfers_total_kbytes",
        lambda v: v / 1024.0,
    ),
}

OVERALL_FIELDS module-attribute

OVERALL_FIELDS: list[str] | None = None

STAGE_EXCLUDE module-attribute

STAGE_EXCLUDE = {
    "stage_stats",
    "stage_id",
    "request_id",
    "rx_transfer_bytes",
    "rx_decode_time_ms",
    "rx_in_flight_time_ms",
    "final_output_type",
    "pipeline_timings",
}

STAGE_FIELDS module-attribute

STAGE_FIELDS = _build_field_defs(
    StageRequestStats, STAGE_EXCLUDE, FIELD_TRANSFORMS
)

TRANSFER_EXCLUDE module-attribute

TRANSFER_EXCLUDE = {
    "from_stage",
    "to_stage",
    "request_id",
    "used_shm",
}

TRANSFER_FIELDS module-attribute

TRANSFER_FIELDS = _build_field_defs(
    TransferEdgeStats, TRANSFER_EXCLUDE, FIELD_TRANSFORMS
)

logger module-attribute

logger = init_logger(__name__)

OrchestratorAggregator

e2e_events instance-attribute

e2e_events: list[RequestE2EStats] = []

final_stage_id_for_e2e instance-attribute

final_stage_id_for_e2e = final_stage_id_for_e2e

log_stats instance-attribute

log_stats = bool(log_stats)

num_stages instance-attribute

num_stages = int(num_stages)

stage_events instance-attribute

stage_events: dict[str, list[StageRequestStats]] = {}

transfer_events instance-attribute

transfer_events: dict[
    tuple[int, int, str], TransferEdgeStats
] = {}

accumulate_diffusion_metrics

accumulate_diffusion_metrics(
    stage_type: str, req_id: Any, engine_outputs: Any
) -> None

Accumulate diffusion metrics for a request.

Handles extraction and accumulation of diffusion stage metrics.

Parameters:

Name Type Description Default
req_id Any

Request ID

required
engine_outputs Any

Engine output object containing metrics

required

build_and_log_summary

build_and_log_summary() -> dict[str, Any]

init_run_state

init_run_state(wall_start_ts: float) -> None

on_finalize_request

on_finalize_request(
    stage_id: int, req_id: Any, req_start_ts: float
) -> None

on_forward

on_forward(
    from_stage: int,
    to_stage: int,
    req_id: Any,
    size_bytes: int,
    tx_ms: float,
    used_shm: bool,
) -> None

on_stage_metrics

on_stage_metrics(
    stage_id: int,
    req_id: Any,
    metrics: StageRequestStats,
    final_output_type: str | None = None,
) -> None

process_stage_metrics

process_stage_metrics(
    *,
    result: dict[str, Any],
    stage_type: str,
    stage_id: int,
    req_id: str,
    engine_outputs: Any,
    finished: bool,
    final_output_type: str | None,
    output_to_yield: Any | None,
) -> None

Process and record stage metrics.

Parameters:

Name Type Description Default
result dict[str, Any]

Result dict containing metrics from stage

required
stage_type str

Type of the stage (e.g., 'llm', 'diffusion')

required
stage_id int

Stage identifier

required
req_id str

Request identifier

required
engine_outputs Any

Engine output object

required
finished bool

Whether stage processing is finished

required
final_output_type str | None

Type of final output (e.g., 'text', 'audio')

required
output_to_yield Any | None

Output object to attach metrics to

required

record_audio_generated_frames

record_audio_generated_frames(
    output_to_yield: Any, stage_id: int, request_id: str
) -> None

record_stage_postprocess_time

record_stage_postprocess_time(
    stage_id: int, req_id: Any, postproc_time_ms: float
) -> None

record_transfer_rx

record_transfer_rx(
    stats: StageRequestStats,
) -> TransferEdgeStats | None

record_transfer_tx

record_transfer_tx(
    from_stage: int,
    to_stage: int,
    request_id: Any,
    size_bytes: int,
    tx_time_ms: float,
    used_shm: bool,
) -> TransferEdgeStats | None

stage_postprocess_timer

stage_postprocess_timer(stage_id: int, req_id: Any)

Context manager for measuring and recording stage postprocessing time.

Usage

with metrics.stage_postprocess_timer(stage_id, request_id): next_inputs = next_stage.process_engine_inputs(...)

RequestE2EStats dataclass

e2e_total_ms instance-attribute

e2e_total_ms: float

e2e_total_tokens instance-attribute

e2e_total_tokens: int

e2e_tpt property

e2e_tpt: float

request_id instance-attribute

request_id: str

transfers_total_bytes instance-attribute

transfers_total_bytes: int

transfers_total_time_ms instance-attribute

transfers_total_time_ms: float

StageRequestStats dataclass

audio_duration_s class-attribute instance-attribute

audio_duration_s: float = 0.0

audio_generated_frames class-attribute instance-attribute

audio_generated_frames: int = 0

audio_rtf class-attribute instance-attribute

audio_rtf: float = 0.0

audio_sample_rate class-attribute instance-attribute

audio_sample_rate: int = 0

batch_id instance-attribute

batch_id: int

batch_size instance-attribute

batch_size: int

denoise_step_latency_ms class-attribute instance-attribute

denoise_step_latency_ms: float = 0.0

diffusion_metrics class-attribute instance-attribute

diffusion_metrics: dict[str, int] = None

final_output_type class-attribute instance-attribute

final_output_type: str | None = None

image_pixels class-attribute instance-attribute

image_pixels: int = 0

inter_output_latencies_ms class-attribute instance-attribute

inter_output_latencies_ms: list[float] | None = None

inter_output_latency_ms class-attribute instance-attribute

inter_output_latency_ms: float = 0.0

num_tokens_in instance-attribute

num_tokens_in: int

num_tokens_out instance-attribute

num_tokens_out: int

output_unit_count class-attribute instance-attribute

output_unit_count: int = 0

output_unit_type class-attribute instance-attribute

output_unit_type: str | None = None

pipeline_timings class-attribute instance-attribute

pipeline_timings: dict[str, float] | None = None

postprocess_time_ms class-attribute instance-attribute

postprocess_time_ms: float = 0.0

replica_id class-attribute instance-attribute

replica_id: int | None = None

request_id class-attribute instance-attribute

request_id: str | None = None

rx_decode_time_ms instance-attribute

rx_decode_time_ms: float

rx_in_flight_time_ms instance-attribute

rx_in_flight_time_ms: float

rx_mbps property

rx_mbps: float

rx_transfer_bytes instance-attribute

rx_transfer_bytes: int

serving_time_to_first_output_ms class-attribute instance-attribute

serving_time_to_first_output_ms: float = 0.0

stage_gen_time_ms instance-attribute

stage_gen_time_ms: float

stage_id class-attribute instance-attribute

stage_id: int | None = None

stage_stats instance-attribute

stage_stats: StageStats

time_per_output_unit_ms class-attribute instance-attribute

time_per_output_unit_ms: float = 0.0

tokens_per_s property

tokens_per_s: float

vllm_itl_ms class-attribute instance-attribute

vllm_itl_ms: float = 0.0

vllm_itls_ms class-attribute instance-attribute

vllm_itls_ms: list[float] | None = None

vllm_tpot_ms class-attribute instance-attribute

vllm_tpot_ms: float = 0.0

vllm_ttft_ms class-attribute instance-attribute

vllm_ttft_ms: float = 0.0

StageStats dataclass

avg_tokens_per_s property

avg_tokens_per_s: float

total_gen_time_ms class-attribute instance-attribute

total_gen_time_ms: float = 0.0

total_token class-attribute instance-attribute

total_token: int = 0

TransferEdgeStats dataclass

from_stage instance-attribute

from_stage: int

in_flight_time_ms class-attribute instance-attribute

in_flight_time_ms: float = 0.0

request_id instance-attribute

request_id: str

rx_decode_time_ms class-attribute instance-attribute

rx_decode_time_ms: float = 0.0

size_bytes instance-attribute

size_bytes: int

to_stage instance-attribute

to_stage: int

total_time_ms property

total_time_ms: float

tx_time_ms instance-attribute

tx_time_ms: float

used_shm class-attribute instance-attribute

used_shm: bool = False