正文
req = self.input_queue.get()
# =================================================================================
# 处理请求,具体包括:
# 1、将请求包装成Request形式(EngineCoreRequest -> Request)
# 2、将请求添加进Scheduler的waiting队列中
# =================================================================================
self._handle_client_request(*req)
# =================================================================================
# 2) Handle any new client requests.
# 当scheduler中有request,且input_queue也有request时
# =================================================================================
while
not
self.input_queue.empty():
# =================================================================================
# 从input_queue中获取request数据(非阻塞式取出)。
# 非阻塞式取出:若input_queue为空,不会暂停线程,而是会直接抛出 queue.Empty 异常
# (所以要通过not empty()确保队列非空)
# =================================================================================
req = self.input_queue.get_nowait()
# =================================================================================
# 处理请求
# =================================================================================
self._handle_client_request(*req)
# =================================================================================
# 3) Step the engine core.
# EngineCore执行单步推理,返回单步推理的结果,包括:
# - Scheduler确定在本步调度中,哪些请求要被送去推理(SchedulerOutputs)
# - Executor->Workers->ModelRunners架构执行实际推理(SchedulerOutputs -> ModelRunnerOutputs)
# - Scheduler处理推理结果:ModelRunnerOutputs-> EngineCoreOutputs
# =================================================================================
outputs = step_fn()
# =================================================================================
# 4) Put EngineCoreOutputs into the output queue.
# 将EngineCoreOutputs装入output_queue中
# =================================================================================
if
outputs
is
not
None
:
self.output_queue.put_nowait(outputs)
详细的内容都在注释中,这里额外提一些点。
-
当Scheduler中没有请求的时候,说明此时整个Scheduler都在空转
(例如先前的数据已经做完了推理,而此刻又没有新来的请求)。所以此时才使用阻塞式的取数方式,要求至少可以从input_queue中等到一条新请求,这样Scheduler才能继续干活。
-
当Scheduler中有请求时,说明Scheduler是有活干的,那么如果input_queue中有新请求,那就去处理它们,把它们add进Scheduler的waiting队列中
;如果没有也没关系,所以采用的是非阻塞式取数。
-
满足以下任意情况,说明Scheduler中有请求:
-
(a) 当前scheduler的waiting或running队列不为空
-
(b) 当前scheduler中存在已经推理完毕的请求(finished_req_ids这个集合不为空)
这里详细看(b):
-
当一轮推理结束后,我们会得到 ModelRunnerOutputs,Scheduler需要对其处理,转为EngineCoreOutputs
-
在处理的过程中,Scheduler会检查哪些请求已经完成推理,将其add进finished_req_ids中,并释放掉这些请求相关的资源(kv cache block等)
-
等到在下一轮调度结束后,Scheduler会强制重置finished_req_ids为空set
-
而在任意时刻,只要检测到finished_req_ids不为空,就认为调度器中还有请求
2.2 step_fn
由2.1中代码第9行可知,vllm中提供了2种step_fn函数:
# =================================================================================
# - pp=1,batch_queue = None,step_fn = self.step
# - pp>1, batch_queue的长度为pp的stage数,step_fn = self.step_with_batch_queue
# =================================================================================
step_fn = (self.step
if self.batch_queue is None else self.step_with_batch_queue)
整体来说,step_fn函数主要做了3件事情:
我们先来看较为简单的
step_fn = self.step
场景(这3件事串行执行);再来看更为复杂一些的
step_fn = self.step_with_batch_queue
(这3件事可以并行执行)。在后面细节的讲解中,你会更深入感受到这两者间的区别以及为什么要这么做。
(1)普通step:self.step
https://github.com/vllm-project/vllm/blob/refs/tags/v0.8.2/vllm/v1/engine/core.py#L184
当我们不使用pp做推理时(pp=1),vllm选择的是
self.step
,它的具体代码如下,这里3个步骤是串行执行的,不难理解: