world_size,
rank,
local_rank,
distributed_init_method,
backend,
)
# 这里只是一些参数检查
if not torch.distributed.is_initialized():
assert distributed_init_method is not None, (
"distributed_init_method must be provided when initializing "
"distributed environment"
)
if timeout is not None:
assert isinstance(timeout, (int)), "timeout must be a number"
assert timeout > 0, "timeout must be positive"
timeout = timedelta(seconds=timeout)
# this backend is used for WORLD
## most important:全局通信进程组的初始化
torch.distributed.init_process_group(
backend=backend,
init_method=distributed_init_method,
world_size=world_size,
rank=rank,
timeout=timeout,
)
# set the local rank
# local_rank is not available in torch ProcessGroup,
# see https://github.com/pytorch/pytorch/issues/122816
if local_rank == -1:
# local rank not set, this usually happens in single-node
# setting, where we can use rank as local rank
if distributed_init_method == "env://":
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
else:
local_rank = rank
global _WORLD
if _WORLD is None:
ranks = list(range(torch.distributed.get_world_size()))
_WORLD = init_world_group(ranks, local_rank, backend)
else:
assert (
_WORLD.world_size == torch.distributed.get_world_size()
), "world group already initialized with a different world size"
def initialize_model_parallel( tensor_model_parallel_size: int = 1, pipeline_model_parallel_size: int = 1, backend: Optional[str] = None, ) -> None: """ Initialize model parallel groups. Arguments: tensor_model_parallel_size: number of GPUs used for tensor model parallelism. pipeline_model_parallel_size: number of GPUs used for pipeline model parallelism. Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize the model pipeline. The present function will create 4 tensor model-parallel groups and 2 pipeline model-parallel groups: 4 tensor model-parallel groups: [g0, g1], [g2, g3], [g4, g5], [g6, g7] 2 pipeline model-parallel groups: [g0, g2, g4, g6], [g1, g3, g5, g7] Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box. For example if we are using 2 DGX-1 boxes with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box. """ # Get world size and rank. Ensure some consistencies. # 先捞一波检查和元数据,比如world_size 是否匹配; backend 用device_group 的,实际上也就是用默认进程组的 assert torch.distributed.is_initialized() world_size: int = torch.distributed.get_world_size() backend = backend or torch.distributed.get_backend(get_world_group().device_group) if world_size != tensor_model_parallel_size * pipeline_model_parallel_size: raise RuntimeError( f"world_size ({world_size}) is not equal to " f"tensor_model_parallel_size ({tensor_model_parallel_size}) x " f"pipeline_model_parallel_size ({pipeline_model_parallel_size})" ) # Build the tensor model-parallel groups. # 这里主要是基于tp 切分group,比如一共8卡,tp=2,就可以切成[0-3],[4-7] num_tensor_model_parallel_groups: int = world_size // tensor_model_parallel_size global _TP assert _TP is None, "tensor model parallel group is already initialized" group_ranks = [] for i in range(num_tensor_model_parallel_groups): ranks = list( range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size) ) group_ranks.append(ranks) # 基本就是直接调用GroupCooridinator 构建,和init_world_group 差别不大 # 主要是入参许多改成了true(比如使用pynccl,使用mq等等) # message queue broadcaster is only used in tensor model parallel group _TP = init_model_parallel_group( group_ranks, get_world_group().local_rank, backend, use_message_queue_broadcaster=True, group_name="tp", ) # pp 同理,不过group 划分方式不同,比如world_size 为8, pp为2 # 此时会切割成几个group:[0, 4], [1, 5], [2, 6], [3, 7], # 不过pp 现在没有实际实现完,可能会改,体会一下就行,结合上述tp的切分方式,可以感受到二者的正交性 # Build the pipeline model-parallel groups. num_pipeline_model_parallel_groups: int = world_size // pipeline_model_parallel_size global _PP assert _PP is None, "pipeline model parallel group is already initialized" group_ranks = [] for i in range(num_pipeline_model_parallel_groups): ranks = list(range(i, world_size, num_pipeline_model_parallel_groups)) group_ranks.append(ranks) # pipeline parallel does not need custom allreduce _PP = init_model_parallel_group( group_ranks, get_world_group().local_rank, backend, use_custom_allreduce=False, group_name="pp", )
class ReplicatedLinear(LinearBase): """Replicated linear layer. Args: input_size: input dimension of the linear layer. output_size: output dimension of the linear layer. bias: If true, add bias. skip_bias_add: If true, skip adding bias but instead return it. params_dtype: Data type for the parameters. quant_config: Quantization configure. prefix: The name of the layer in the state dict, including all parents (e.g. model.layers.0.qkv_proj) """
class RowParallelLinear(LinearBase): # 如下注释已经比较清晰,RowParallelLinear 是按input 的dim0 进行分割来并行的 """Linear layer with row parallelism. The linear layer is defined as Y = XA + b. A is parallelized along its first dimension and X along its second dimension as: - - | A_1 | | . | A = | . | X = [X_1, ..., X_p] | . | | A_p | - - Arguments: input_size: first dimension of matrix A. output_size: second dimension of matrix A. bias: If true, add bias. Note that bias is not parallelized. input_is_parallel: If true, we assume that the input is already split across the GPUs and we do not split again. skip_bias_add: This was added to enable performance optimization where bias can be fused with other element-wise operations. We skip adding bias but instead return it. params_dtype: Data type for the parameters. quant_config: Quantization configure. """ def __init__( self, input_size: int, output_size: int, bias: bool = True, input_is_parallel: bool = True, # 默认True,基本用默认 skip_bias_add: bool = False, params_dtype: Optional[torch.dtype] = None, # 是否用allreduce 进行聚合,看layer 的需要,比如deepseek里moe里的linear就不用reduce,但mlp需要 reduce_results: bool = True, quant_config: Optional[QuantizationConfig] = None, prefix: str = "", tp_rank: Optional[int] = None, tp_size: Optional[int] = None, use_presharded_weights: bool = False, ):