Skip to content

Commit 0dcf274

Browse files
robertgshaw2-redhatIsotr0py
authored andcommitted
[V1][Core][1/n] Logging and Metrics (vllm-project#11962)
Signed-off-by: [email protected] <[email protected]> Signed-off-by: Isotr0py <[email protected]>
1 parent bd4b2d0 commit 0dcf274

File tree

11 files changed

+129
-84
lines changed

11 files changed

+129
-84
lines changed

tests/v1/engine/test_engine_core.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ def test_engine_core(monkeypatch):
8080
assert len(engine_core.scheduler.running) == 4
8181

8282
# Loop through until they are all done.
83-
while len(engine_core.step()) > 0:
83+
while len(engine_core.step().outputs) > 0:
8484
pass
8585

8686
assert len(engine_core.scheduler.waiting) == 0
@@ -170,7 +170,7 @@ def test_engine_core_advanced_sampling(monkeypatch):
170170
assert len(engine_core.scheduler.waiting) == 1
171171
assert len(engine_core.scheduler.running) == 0
172172
# Loop through until they are all done.
173-
while len(engine_core.step()) > 0:
173+
while len(engine_core.step().outputs) > 0:
174174
pass
175175

176176
assert len(engine_core.scheduler.waiting) == 0

tests/v1/engine/test_engine_core_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def make_request(params: SamplingParams) -> EngineCoreRequest:
4343
def loop_until_done(client: EngineCoreClient, outputs: Dict):
4444

4545
while True:
46-
engine_core_outputs = client.get_output()
46+
engine_core_outputs = client.get_output().outputs
4747

4848
if len(engine_core_outputs) == 0:
4949
break
@@ -61,7 +61,7 @@ def loop_until_done(client: EngineCoreClient, outputs: Dict):
6161
async def loop_until_done_async(client: EngineCoreClient, outputs: Dict):
6262

6363
while True:
64-
engine_core_outputs = await client.get_output_async()
64+
engine_core_outputs = await client.get_output_async().outputs
6565

6666
if len(engine_core_outputs) == 0:
6767
break

vllm/v1/core/scheduler.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from vllm.sampling_params import SamplingParams
99
from vllm.v1.core.encoder_cache_manager import EncoderCacheManager
1010
from vllm.v1.core.kv_cache_manager import KVCacheManager
11-
from vllm.v1.engine import EngineCoreOutput
11+
from vllm.v1.engine import EngineCoreOutput, EngineCoreOutputs
12+
from vllm.v1.metrics.stats import SchedulerStats
1213
from vllm.v1.outputs import ModelRunnerOutput
1314
from vllm.v1.request import Request, RequestStatus
1415

@@ -394,12 +395,12 @@ def update_from_output(
394395
self,
395396
scheduler_output: "SchedulerOutput",
396397
model_runner_output: "ModelRunnerOutput",
397-
) -> List[EngineCoreOutput]:
398+
) -> EngineCoreOutputs:
398399
# NOTE(woosuk): This method doesn't consider speculative decoding.
399400
sampled_token_ids = model_runner_output.sampled_token_ids
400401
num_scheduled_tokens = scheduler_output.num_scheduled_tokens
401402
new_running: List[Request] = []
402-
engine_core_outputs: List[EngineCoreOutput] = []
403+
outputs: List[EngineCoreOutput] = []
403404
for request in self.running:
404405
req_id = request.request_id
405406
request.num_computed_tokens += num_scheduled_tokens[req_id]
@@ -438,15 +439,18 @@ def update_from_output(
438439
finished=request.is_finished(),
439440
finish_reason=request.get_finished_reason(),
440441
stop_reason=request.stop_reason)
441-
engine_core_outputs.append(output)
442+
outputs.append(output)
442443

443444
# Breakout of the loop.
444445
if stopped:
445446
continue
446447

447448
new_running.append(request)
448449
self.running = new_running
449-
return engine_core_outputs
450+
return EngineCoreOutputs(
451+
outputs=outputs,
452+
scheduler_stats=self.make_stats(),
453+
)
450454

451455
def _check_stop(self, request: Request) -> bool:
452456
if (request.num_tokens >= self.max_model_len
@@ -515,6 +519,12 @@ def get_num_unfinished_requests(self) -> int:
515519
def has_unfinished_requests(self) -> bool:
516520
return self.get_num_unfinished_requests() > 0
517521

522+
def make_stats(self) -> SchedulerStats:
523+
return SchedulerStats(
524+
num_running_reqs=len(self.running),
525+
num_waiting_reqs=len(self.waiting),
526+
)
527+
518528

519529
@dataclass
520530
class NewRequestData:

vllm/v1/engine/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import msgspec
66

7+
from vllm.v1.metrics.stats import SchedulerStats
8+
79
if TYPE_CHECKING:
810
from vllm.lora.request import LoRARequest
911
from vllm.multimodal import MultiModalKwargs
@@ -56,6 +58,7 @@ class EngineCoreOutputs(
5658

5759
# [num_reqs]
5860
outputs: List[EngineCoreOutput]
61+
scheduler_stats: SchedulerStats
5962

6063

6164
@dataclass

vllm/v1/engine/async_llm.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
from vllm.config import ModelConfig, VllmConfig
66
from vllm.engine.arg_utils import AsyncEngineArgs
7-
from vllm.engine.metrics_types import StatLoggerBase
87
from vllm.engine.protocol import EngineClient
98
from vllm.inputs import INPUT_REGISTRY, InputRegistry, PromptType
109
from vllm.inputs.preprocess import InputPreprocessor
@@ -22,6 +21,8 @@
2221
from vllm.v1.engine.detokenizer import Detokenizer
2322
from vllm.v1.engine.processor import Processor
2423
from vllm.v1.executor.abstract import Executor
24+
from vllm.v1.metrics.loggers import LoggingStatLogger, StatLoggerBase
25+
from vllm.v1.metrics.stats import SchedulerStats
2526

2627
logger = init_logger(__name__)
2728

@@ -34,7 +35,6 @@ def __init__(
3435
executor_class: Type[Executor],
3536
log_stats: bool,
3637
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
37-
stat_loggers: Optional[Dict[str, StatLoggerBase]] = None,
3838
input_registry: InputRegistry = INPUT_REGISTRY,
3939
use_cached_outputs: bool = False,
4040
log_requests: bool = True,
@@ -45,7 +45,10 @@ def __init__(
4545

4646
self.log_requests = log_requests
4747
self.log_stats = log_stats
48-
self.stat_loggers = stat_loggers
48+
self.stat_loggers: List[StatLoggerBase] = [
49+
LoggingStatLogger(),
50+
# TODO(rob): PrometheusStatLogger(),
51+
]
4952
self.model_config = vllm_config.model_config
5053

5154
# Tokenizer (+ ensure liveness if running in another process).
@@ -82,7 +85,6 @@ def __init__(
8285
asyncio_mode=True,
8386
vllm_config=vllm_config,
8487
executor_class=executor_class,
85-
log_stats=self.log_stats,
8688
)
8789

8890
self.output_handler: Optional[asyncio.Task] = None
@@ -94,7 +96,6 @@ def from_engine_args(
9496
engine_config: Optional[VllmConfig] = None,
9597
start_engine_loop: bool = True,
9698
usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
97-
stat_loggers: Optional[Dict[str, StatLoggerBase]] = None,
9899
) -> "AsyncLLM":
99100
"""Create an AsyncLLM from the EngineArgs."""
100101

@@ -114,7 +115,6 @@ def from_engine_args(
114115
log_stats=not engine_args.disable_log_stats,
115116
start_engine_loop=start_engine_loop,
116117
usage_context=usage_context,
117-
stat_loggers=stat_loggers,
118118
)
119119

120120
def shutdown(self):
@@ -254,14 +254,18 @@ async def _run_output_handler(self):
254254
outputs = await self.engine_core.get_output_async()
255255

256256
# 2) Detokenize based on the output.
257-
request_outputs, reqs_to_abort = self.detokenizer.step(outputs)
257+
request_outputs, reqs_to_abort = self.detokenizer.step(
258+
outputs.outputs)
258259

259260
# 3) Put the RequestOutputs into the per-request queues.
260261
self._process_request_outputs(request_outputs)
261262

262263
# 4) Abort any requests that finished due to stop strings.
263264
await self.engine_core.abort_requests_async(reqs_to_abort)
264265

266+
# 5) Log any stats.
267+
await self._log_stats(scheduler_stats=outputs.scheduler_stats)
268+
265269
except Exception as e:
266270
logger.exception("EngineCore output handler hit an error: %s", e)
267271
kill_process_tree(os.getpid())
@@ -278,6 +282,14 @@ async def abort(self, request_id: str) -> None:
278282
if request_id in self.rid_to_queue:
279283
del self.rid_to_queue[request_id]
280284

285+
async def _log_stats(self, scheduler_stats: SchedulerStats):
286+
"""Log stats to the stat loggers."""
287+
if not self.log_stats:
288+
return
289+
290+
for logger in self.stat_loggers:
291+
logger.log(scheduler_stats=scheduler_stats)
292+
281293
def encode(
282294
self,
283295
prompt: PromptType,

vllm/v1/engine/core.py

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
maybe_register_config_serialize_by_value)
1818
from vllm.utils import get_exception_traceback, zmq_socket_ctx
1919
from vllm.v1.core.scheduler import Scheduler
20-
from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs,
21-
EngineCoreProfile, EngineCoreRequest,
22-
EngineCoreRequestType, EngineCoreRequestUnion)
20+
from vllm.v1.engine import (EngineCoreOutputs, EngineCoreProfile,
21+
EngineCoreRequest, EngineCoreRequestType,
22+
EngineCoreRequestUnion)
2323
from vllm.v1.engine.mm_input_mapper import MMInputMapperServer
2424
from vllm.v1.executor.abstract import Executor
2525
from vllm.v1.request import Request, RequestStatus
@@ -28,9 +28,7 @@
2828

2929
logger = init_logger(__name__)
3030

31-
POLLING_TIMEOUT_MS = 5000
32-
POLLING_TIMEOUT_S = POLLING_TIMEOUT_MS // 1000
33-
LOGGING_TIME_S = 5
31+
POLLING_TIMEOUT_S = 2.5
3432

3533

3634
class EngineCore:
@@ -40,10 +38,8 @@ def __init__(
4038
self,
4139
vllm_config: VllmConfig,
4240
executor_class: Type[Executor],
43-
log_stats: bool = False,
4441
):
4542
assert vllm_config.model_config.runner_type != "pooling"
46-
self.log_stats = log_stats
4743

4844
logger.info("Initializing an LLM engine (v%s) with config: %s",
4945
VLLM_VERSION, vllm_config)
@@ -62,8 +58,6 @@ def __init__(
6258
vllm_config.cache_config,
6359
vllm_config.lora_config)
6460

65-
self._last_logging_time = time.time()
66-
6761
self.mm_input_mapper_server = MMInputMapperServer(
6862
vllm_config.model_config)
6963

@@ -114,11 +108,12 @@ def abort_requests(self, request_ids: List[str]):
114108
self.scheduler.finish_requests(request_ids,
115109
RequestStatus.FINISHED_ABORTED)
116110

117-
def step(self) -> List[EngineCoreOutput]:
111+
def step(self) -> EngineCoreOutputs:
118112
"""Schedule, execute, and make output."""
119113

120114
if not self.scheduler.has_unfinished_requests():
121-
return []
115+
return EngineCoreOutputs(
116+
outputs=[], scheduler_stats=self.scheduler.make_stats())
122117

123118
scheduler_output = self.scheduler.schedule()
124119
output = self.model_executor.execute_model(scheduler_output)
@@ -145,15 +140,17 @@ def __init__(
145140
executor_class: Type[Executor],
146141
log_stats: bool = False,
147142
):
148-
super().__init__(vllm_config, executor_class, log_stats)
143+
super().__init__(vllm_config, executor_class)
144+
145+
self.log_stats = log_stats
149146

150147
# Background Threads and Queues for IO. These enable us to
151148
# overlap ZMQ socket IO with GPU since they release the GIL,
152149
# and to overlap some serialization/deserialization with the
153150
# model forward pass.
154151
# Threads handle Socket <-> Queues and core_busy_loop uses Queue.
155152
self.input_queue: queue.Queue[EngineCoreRequestUnion] = queue.Queue()
156-
self.output_queue: queue.Queue[List[EngineCoreOutput]] = queue.Queue()
153+
self.output_queue: queue.Queue[EngineCoreOutputs] = queue.Queue()
157154
threading.Thread(target=self.process_input_socket,
158155
args=(input_path, ),
159156
daemon=True).start()
@@ -217,8 +214,10 @@ def run_busy_loop(self):
217214
self._handle_client_request(req)
218215
break
219216
except queue.Empty:
220-
self._log_stats()
221217
logger.debug("EngineCore busy loop waiting.")
218+
# Break out the loop so we can log_stats in step().
219+
if self.log_stats:
220+
break
222221
except BaseException:
223222
raise
224223

@@ -230,28 +229,9 @@ def run_busy_loop(self):
230229
# 3) Step the engine core.
231230
outputs = self.step()
232231

233-
# 4) Put EngineCoreOutputs into the output queue.
232+
# 5) Put EngineCoreOutputs into the output queue.
234233
self.output_queue.put_nowait(outputs)
235234

236-
self._log_stats()
237-
238-
def _log_stats(self):
239-
"""Log basic stats every LOGGING_TIME_S"""
240-
241-
if not self.log_stats:
242-
return
243-
244-
now = time.time()
245-
246-
if now - self._last_logging_time > LOGGING_TIME_S:
247-
logger.info(
248-
"RUNNING: %s | WAITING: %s",
249-
len(self.scheduler.running),
250-
len(self.scheduler.waiting),
251-
)
252-
253-
self._last_logging_time = now
254-
255235
def _handle_client_request(self, request: EngineCoreRequestUnion) -> None:
256236
"""Handle EngineCoreRequest or EngineCoreABORT from Client."""
257237

@@ -301,7 +281,6 @@ def process_output_socket(self, output_path: str):
301281

302282
with zmq_socket_ctx(output_path, zmq.constants.PUSH) as socket:
303283
while True:
304-
engine_core_outputs = self.output_queue.get()
305-
outputs = EngineCoreOutputs(outputs=engine_core_outputs)
284+
outputs = self.output_queue.get()
306285
encoder.encode_into(outputs, buffer)
307286
socket.send_multipart((buffer, ), copy=False)

0 commit comments

Comments
 (0)