专栏名称: GiantPandaLLM
专注于机器学习、深度学习、计算机视觉、图像处理等多个方向技术分享。团队由一群热爱技术且热衷于分享的小伙伴组成。我们坚持原创,每天一到两篇原创技术分享。希望在传播知识、分享知识的同时能够启发你,大家一起共同进步(・ω<)☆
目录
相关文章推荐
GiantPandaLLM  ·  图解Vllm ... ·  3 天前  
51好读  ›  专栏  ›  GiantPandaLLM

Sglang 源码学习笔记(三)- 分布式和并行(以deepseek 为例)(WIP)

GiantPandaLLM  · 公众号  · 3D  · 2025-05-20 22:02

正文

请到「今天看啥」查看全文



world_size,
rank,
local_rank,
distributed_init_method,
backend,
)
# 这里只是一些参数检查
if not torch.distributed.is_initialized():
assert distributed_init_method is not None, (
"distributed_init_method must be provided when initializing "
"distributed environment"
)
if timeout is not None:
assert isinstance(timeout, (int)), "timeout must be a number"
assert timeout > 0, "timeout must be positive"
timeout = timedelta(seconds=timeout)

# this backend is used for WORLD
## most important:全局通信进程组的初始化
torch.distributed.init_process_group(
backend=backend,
init_method=distributed_init_method,
world_size=world_size,
rank=rank,
timeout=timeout,
)

# set the local rank
# local_rank is not available in torch ProcessGroup,
# see https://github.com/pytorch/pytorch/issues/122816
if local_rank == -1:
# local rank not set, this usually happens in single-node
# setting, where we can use rank as local rank
if distributed_init_method == "env://":
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
else:
local_rank = rank
global _WORLD
if _WORLD is None:
ranks = list(range(torch.distributed.get_world_size()))
_WORLD = init_world_group(ranks, local_rank, backend)
else:
assert (
_WORLD.world_size == torch.distributed.get_world_size()
), "world group already initialized with a different world size"

init_world_group这个函数实现非常平凡,入参基本都是False,所以基本不太参与通信任务,它主要是完成了两个大group的创建,基于上述默认进程组分别创建了device_group和cpu_group两个通信组,world_size和默认进程组一致(猜是为了方便管理,这样管理和其他TP/PP的group 拉平了,都在一个dict里)。

def init_world_group(
    ranks: List[int], local_rank: int, backend: str
) -> GroupCoordinator:
    return GroupCoordinator(
        group_ranks=[ranks],
        local_rank=local_rank,
        torch_distributed_backend=backend,
        use_pynccl=False,
        use_custom_allreduce=False,
        use_hpu_communicator=False,
        use_xpu_communicator=False,
        group_name="world",
    )

##In class GroupCoordinator __init__():
## 注意这里的cpu group 用了gloo,而device_group 基于默认组的backend,cuda下就是nccl
## 注意这里虽然是一个循环,但是init_world_group的传参可见,group_ranks 的size 是1
## 而ranks 就是range(world_size)所得
       for ranks in group_ranks:
            device_group = torch.distributed.new_group(
                ranks, backend=torch_distributed_backend
            )
            # a group with `gloo` backend, to allow direct coordination between
            # processes through the CPU.
            cpu_group = torch.distributed.new_group(ranks, backend="gloo")
            if self.rank in ranks:
                self.ranks = ranks
                self.world_size = len(ranks)
                self.rank_in_group = ranks.index(self.rank)
                self.device_group = device_group
                self.cpu_group = cpu_group

initialize_model_parallel

大环境有了,可以看tp的初始化了。首先是入参,可以看到,默认pp和tp 都是1,根据当前sglang的使用,只有tp的传参,还没有pp的,但后续会支持pp。

def initialize_model_parallel(
    tensor_model_parallel_size: int = 1,
    pipeline_model_parallel_size: int = 1,
    backend: Optional[str] = None,
) -> None:
    """
    Initialize model parallel groups.

    Arguments:
        tensor_model_parallel_size: number of GPUs used for tensor model
            parallelism.
        pipeline_model_parallel_size: number of GPUs used for pipeline model
            parallelism.

    Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
    create 4 tensor model-parallel groups and 2 pipeline model-parallel groups:
        4 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7]
        2 pipeline model-parallel groups:
            [g0, g2, g4, g6], [g1, g3, g5, g7]
    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    """
    # Get world size and rank. Ensure some consistencies.
    # 先捞一波检查和元数据,比如world_size 是否匹配; backend 用device_group 的,实际上也就是用默认进程组的
    assert torch.distributed.is_initialized()
    world_size: int = torch.distributed.get_world_size()
    backend = backend or torch.distributed.get_backend(get_world_group().device_group)

    if world_size != tensor_model_parallel_size * pipeline_model_parallel_size:
        raise RuntimeError(
            f"world_size ({world_size}) is not equal to "
            f"tensor_model_parallel_size ({tensor_model_parallel_size}) x "
            f"pipeline_model_parallel_size ({pipeline_model_parallel_size})"
        )

    # Build the tensor model-parallel groups.
    # 这里主要是基于tp 切分group,比如一共8卡,tp=2,就可以切成[0-3],[4-7]
    num_tensor_model_parallel_groups: int = world_size // tensor_model_parallel_size
    global _TP
    assert _TP is None, "tensor model parallel group is already initialized"
    group_ranks = []
    for i in range(num_tensor_model_parallel_groups):
        ranks = list(
            range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size)
        )
        group_ranks.append(ranks)

    # 基本就是直接调用GroupCooridinator 构建,和init_world_group 差别不大
    # 主要是入参许多改成了true(比如使用pynccl,使用mq等等) 
    # message queue broadcaster is only used in tensor model parallel group
    _TP = init_model_parallel_group(
        group_ranks,
        get_world_group().local_rank,
        backend,
        use_message_queue_broadcaster=True,
        group_name="tp",
    )

    # pp 同理,不过group 划分方式不同,比如world_size 为8, pp为2
    # 此时会切割成几个group:[0, 4], [1, 5], [2, 6], [3, 7],
    # 不过pp 现在没有实际实现完,可能会改,体会一下就行,结合上述tp的切分方式,可以感受到二者的正交性
    # Build the pipeline model-parallel groups.
    num_pipeline_model_parallel_groups: int = world_size // pipeline_model_parallel_size
    global _PP
    assert _PP is None, "pipeline model parallel group is already initialized"
    group_ranks = []
    for i in range(num_pipeline_model_parallel_groups):
        ranks = list(range(i, world_size, num_pipeline_model_parallel_groups))
        group_ranks.append(ranks)
    # pipeline parallel does not need custom allreduce
    _PP = init_model_parallel_group(
        group_ranks,
        get_world_group().local_rank,
        backend,
        use_custom_allreduce=False,
        group_name="pp",
    )

用一张图总结几个group的关系。这里没有画pp(因为感觉画不下),pp =1 的情况下,也是8张卡一个pp group。

scheduler 中的TP(模型无关的部分)

scheduler 这个东西我们之前讲解过,可以移步进击的Bruce:sglang 源码学习笔记(一)- Cache、Req与Scheduler ( https://zhuanlan.zhihu.com/p/17186885141) 。但之前我们更多是只讲了tp1 下的情况,在tp2 下scheduler 又多了一些细节。我们这里浅浅介绍一下。涉及到的数据结构主要是scheduler和logitsProcessor。

class Scheduler(...):
    ...   
    def recv_requests(self) -> List[Req]:
        if self.attn_tp_size != 1:
                attn_tp_rank_0 = self.dp_rank * self.attn_tp_size
                work_reqs = broadcast_pyobj(
                    work_reqs,
                    self.attn_tp_rank,
                    self.attn_tp_cpu_group,
                    src=attn_tp_rank_0,
                )
            if self.tp_size != 1:
                control_reqs = broadcast_pyobj(
                    control_reqs, self.tp_rank, self.tp_cpu_group
                )
            recv_reqs = work_reqs + control_reqs
        elif self.tp_size != 1:
            recv_reqs = broadcast_pyobj(recv_reqs, self.tp_rank, self.tp_cpu_group)
        return recv_reqs

class LogitsProcessor(nn.Module):
     ....
     def _get_logits(
        self,
        hidden_states: torch.Tensor,
        lm_head: VocabParallelEmbedding,
        logits_metadata: LogitsMetadata,
        embedding_bias: Optional[torch.Tensor] = None,
    ) -> torch.Tensor:
        ....
        if self.do_tensor_parallel_all_gather:
            logits = tensor_model_parallel_all_gather(logits)
        ....

我们解释一下这里的两段代码,顺便做和vllm的方案做一个简单的辨析。

sglang
vllm
1. 新请求从rank0进来,整个请求被rank0 broadcast 到其他rank 上,因此其他rank 获得了初始的pompt
2. 每一次forward,all gather logits,这样所有rank 都拿到了准确的完整logits,自己sampling就可以获得token,获得token后,所有rank 就获得了一致的新输入进行下一轮forward
1. 新请求进来,broadcast 输入到其他rank上,所有rank输入就一致了
2. forward 完,rank0 gather 其他rank的logits,经过sampling,获得新token,与之前tokens 拼接获得完整新输入
3. forward 开始前,将rank0 的输入broadcast到其他rank 上

至于sglang 需不需要和vllm 采取接近的逻辑呢,小弟正在尝试搞pr看看有没有收益)

Linear 层里的TP(基本模型通用)-- WIP

这一部分核心代码在python/sglang/srt/layers/linear.py。目前的架构中,linear 承担了很大一部分modelparallelism的功能,allreduce/allgather 发生在这一层比较多,而sglang 为此单独做了一些抽象。

在linear 中,我们主要研究三个类:ReplicatedLinear, ColumnParallelLinear, RowParallelLinear。他们分别有着不同的功能,也是基本上所有模型都会用到的类。

ReplicatedLinear

这是最简单的linear层,几乎没有什么特别操作,与linearbase(基类基本一致)。介绍它主要是好区分和另外两类的差别。如下是replicatedLinear的forward实现,主要就是quant_method,如果有量化策略选择,就会进行对应的量化,否则默认走的是UnquantizedLinearMethod。

   def forward(self, x: torch.Tensor) -> torch.Tensor:
        bias = self.bias if not self.skip_bias_add else None
        assert self.quant_method is not None
        output = self.quant_method.apply(self, x, bias)
        output_bias = self.bias if self.skip_bias_add else None
        return output, output_bias

既然是Linear,自然可以给tensor 塑形,也简单看看初始化接口)这里的塑形(input_size -> output_size)主要通过quant_method完成。

class ReplicatedLinear(LinearBase):
    """Replicated linear layer.

    Args:
        input_size: input dimension of the linear layer.
        output_size: output dimension of the linear layer.
        bias: If true, add bias.
        skip_bias_add: If true, skip adding bias but instead return it.
        params_dtype: Data type for the parameters.
        quant_config: Quantization configure.
        prefix: The name of the layer in the state dict, including all parents
                        (e.g. model.layers.0.qkv_proj)
    """

RowParallelLinear & ColumnParallelLinear

现在我们可以聊一聊和TP 行为有关的linear了。先来一个简单的RowParallelLinear。

class RowParallelLinear(LinearBase):
    # 如下注释已经比较清晰,RowParallelLinear 是按input 的dim0 进行分割来并行的
    """Linear layer with row parallelism.

    The linear layer is defined as Y = XA + b. A is parallelized along
    its first dimension and X along its second dimension as:
               -   -
              | A_1 |
              | .   |
          A = | .   |        X = [X_1, ..., X_p]
              | .   |
              | A_p |
               -   -
    Arguments:
        input_size: first dimension of matrix A.
        output_size: second dimension of matrix A.
        bias: If true, add bias. Note that bias is not parallelized.
        input_is_parallel: If true, we assume that the input is already
                           split across the GPUs and we do not split
                           again.
        skip_bias_add: This was added to enable performance optimization where
                       bias can be fused with other element-wise operations.
                       We skip adding bias but instead return it.
        params_dtype: Data type for the parameters.
        quant_config: Quantization configure.
    """
    def __init__(
        self,
        input_size: int,
        output_size: int,
        bias: bool = True,
        input_is_parallel: bool = True,                   # 默认True,基本用默认
        skip_bias_add: bool = False,
        params_dtype: Optional[torch.dtype] = None,
        # 是否用allreduce 进行聚合,看layer 的需要,比如deepseek里moe里的linear就不用reduce,但mlp需要
        reduce_results: bool = True,
        quant_config: Optional[QuantizationConfig] = None,
        prefix: str = "",
        tp_rank: Optional[int] = None,           
        tp_size: Optional[int] = None,
        use_presharded_weights: bool = False,
    ):






请到「今天看啥」查看全文