def install_engine_core_patch() -> None:
"""Install a Gaudi-only EngineCore reconfigure hook."""
from vllm.v1.engine.core import EngineCore
if hasattr(EngineCore, "gaudi_reconfigure_engine"):
return
def gaudi_reconfigure_engine(
self: Any,
vllm_config_bytes: bytes,
quant_config_path: str | None | object = _QUANT_CONFIG_UNCHANGED,
) -> dict[str, float | None]:
"""Reconfigure EngineCore for a new model/config in-process.
This rebuilds KV cache configs, scheduler, and related runtime state
after reloading model weights on workers.
Args:
vllm_config_bytes: cloudpickle-serialised VllmConfig for the new model.
quant_config_path: Optional path to the INC FP8 calibration JSON
(``QUANT_CONFIG`` env var value).
"""
start = time.perf_counter()
previous_config = self.vllm_config
new_config = _deserialize_reconfigure_config(vllm_config_bytes)
logger.info("[gaudi_reconfigure] start: target_model=%s", new_config.model_config.model)
memory_before_mb = _collect_total_hpu_used_memory_mb(self)
unload_result = None
memory_after_unload_mb = None
# Pause scheduling and clear caches to avoid mixed state.
try:
self.pause_scheduler(mode="abort", clear_cache=True)
logger.info("[gaudi_reconfigure] scheduler paused and caches reset")
except Exception as exc: # pragma: no cover - best effort
logger.warning("Failed to pause scheduler before reconfigure: %s", exc)
# Sleep to release device memory before reloading weights.
try:
if getattr(self.model_executor, "is_sleeping", False):
logger.warning("[gaudi_reconfigure] executor already marked sleeping before reconfigure")
else:
self.model_executor.sleep(level=1)
logger.info("[gaudi_reconfigure] executor slept (level=1)")
except Exception as exc: # pragma: no cover - best effort
logger.warning("Failed to sleep executor before reconfigure: %s", exc)
try:
# Unload model put to sleep, reload new model on worker
unload_result = self.collective_rpc("unload_model")
# Validate unload_result: collective_rpc returns a list of per-worker results.
if not isinstance(unload_result, (list, tuple)) or len(unload_result) == 0:
logger.warning(
"[gaudi_reconfigure] unexpected unload_model result type: %s (expected non-empty list)",
type(unload_result).__name__,
)
else:
for i, worker_result in enumerate(unload_result):
if not isinstance(worker_result, dict):
logger.warning(
"[gaudi_reconfigure] worker %d returned non-dict from unload_model: %s",
i,
type(worker_result).__name__,
)
memory_after_unload_mb = _collect_total_hpu_used_memory_mb(self)
load_kwargs: dict[str, Any] = {"vllm_config": new_config}
if quant_config_path is not _QUANT_CONFIG_UNCHANGED:
load_kwargs["quant_config_path"] = quant_config_path
self.collective_rpc("load_model", kwargs=load_kwargs)
logger.info("[gaudi_reconfigure] worker model reload complete")
# Update config and reinitialize KV caches.
self.vllm_config = new_config
self.available_gpu_memory_for_kv_cache = -1
kv_cache_config = self._initialize_kv_caches(new_config)
num_gpu_blocks = new_config.cache_config.num_gpu_blocks
logger.info(
"[gaudi_reconfigure] kv cache reinitialized: num_gpu_blocks=%d",
num_gpu_blocks,
)
# Rebuild structured output manager.
self.structured_output_manager = StructuredOutputManager(new_config)
logger.info("[gaudi_reconfigure] structured output manager rebuilt")
# Rebuild scheduler.
Scheduler = new_config.scheduler_config.get_scheduler_cls()
if len(kv_cache_config.kv_cache_groups) == 0 and new_config.scheduler_config.enable_chunked_prefill:
logger.warning("Disabling chunked prefill for model without KVCache")
new_config.scheduler_config.enable_chunked_prefill = False
scheduler_block_size = (new_config.cache_config.block_size *
new_config.parallel_config.decode_context_parallel_size *
new_config.parallel_config.prefill_context_parallel_size)
self.scheduler = Scheduler(
vllm_config=new_config,
kv_cache_config=kv_cache_config,
structured_output_manager=self.structured_output_manager,
include_finished_set=False,
log_stats=self.log_stats,
block_size=scheduler_block_size,
)
logger.info("[gaudi_reconfigure] scheduler rebuilt")
self.use_spec_decode = new_config.speculative_config is not None
if self.scheduler.connector is not None: # type: ignore[has-type]
self.model_executor.init_kv_output_aggregator(self.scheduler.connector) # type: ignore[arg-type]
logger.info("[gaudi_reconfigure] kv output aggregator initialized")
# Rebuild multimodal receiver cache.
self.mm_registry = mm_registry = MULTIMODAL_REGISTRY
self.mm_receiver_cache = mm_registry.engine_receiver_cache_from_config(new_config)
logger.info("[gaudi_reconfigure] multimodal receiver cache rebuilt")
kv_connector = self.scheduler.get_kv_connector()
if kv_connector is not None:
xfer_handshake_metadata = self.model_executor.get_kv_connector_handshake_metadata()
if xfer_handshake_metadata:
content: dict[int, Any] = {}
for worker_dict in xfer_handshake_metadata:
if worker_dict is not None:
content.update(worker_dict)
kv_connector.set_xfer_handshake_metadata(content)
logger.info("[gaudi_reconfigure] kv connector handshake metadata refreshed")
# Rebuild batch queue and scheduling helpers.
self.batch_queue_size = self.model_executor.max_concurrent_batches
self.batch_queue = deque(maxlen=self.batch_queue_size) if self.batch_queue_size > 1 else None
self.is_ec_consumer = (new_config.ec_transfer_config is None
or new_config.ec_transfer_config.is_ec_consumer)
self.is_pooling_model = new_config.model_config.runner_type == "pooling"
self.request_block_hasher = None
if new_config.cache_config.enable_prefix_caching or kv_connector is not None:
caching_hash_fn = get_hash_fn_by_name(new_config.cache_config.prefix_caching_hash_algo)
init_none_hash(caching_hash_fn)
self.request_block_hasher = get_request_block_hasher(scheduler_block_size, caching_hash_fn)
self.step_fn = self.step if self.batch_queue is None else self.step_with_batch_queue
self.async_scheduling = new_config.scheduler_config.async_scheduling
self.aborts_queue = queue.Queue()
logger.info("[gaudi_reconfigure] execution state rebuilt")
_reset_executor_sleep_state(self.model_executor)
# Resume scheduler after reconfigure.
self.resume_scheduler()
elapsed = time.perf_counter() - start
logger.info("[gaudi_reconfigure] completed in %.2fs", elapsed)
except Exception as exc:
logger.error("[gaudi_reconfigure] failed: %s: %s", exc.__class__.__name__, exc)
try:
restore_result = self.collective_rpc(
"restore_stashed_model",
kwargs={
"vllm_config": previous_config,
"restore_kv_cache": True,
},
)
logger.warning("[gaudi_reconfigure] rollback restore_stashed_model result=%s", restore_result)
except Exception as restore_exc:
logger.error("[gaudi_reconfigure] rollback restore_stashed_model failed: %s: %s",
restore_exc.__class__.__name__, restore_exc)
self.vllm_config = previous_config
try:
self.resume_scheduler()
except Exception as resume_exc: # pragma: no cover - best effort
logger.error("[gaudi_reconfigure] rollback resume_scheduler failed: %s: %s",
resume_exc.__class__.__name__, resume_exc)
raise
freed_memory_mb = None
if memory_before_mb is not None and memory_after_unload_mb is not None:
freed_memory_mb = max(memory_before_mb - memory_after_unload_mb, 0.0)
stash_memory_after_mb = _sum_named_numeric_values(unload_result, "stash_memory_after_mb")
return {
"memory_before_mb": memory_before_mb,
"memory_after_unload_mb": memory_after_unload_mb,
"freed_memory_mb": freed_memory_mb,
"stash_memory_after_mb": stash_memory_after_mb,
}
EngineCore.gaudi_reconfigure_engine = gaudi_reconfigure_engine