跳到主要内容

Pytorch Fsdp

使用 PyTorch FSDP 进行全分片数据并行训练的专家指导——参数分片、混合精度、CPU 卸载、FSDP2

技能元数据

来源可选 — 通过 hermes skills install official/mlops/pytorch-fsdp 安装
路径optional-skills/mlops/pytorch-fsdp
版本1.0.0
作者Orchestra Research
许可证MIT
依赖torch>=2.0, transformers
标签分布式训练, PyTorch, FSDP, 数据并行, 分片, 混合精度, CPU 卸载, FSDP2, 大规模训练

参考:完整 SKILL.md

信息

以下是 Hermes 在触发此技能时加载的完整技能定义。这是 Agent 在技能激活时看到的指令。

Pytorch-Fsdp 技能

基于官方文档,为 pytorch-fsdp 开发提供全面协助。

何时使用此技能

以下情况应触发此技能:

  • 使用 pytorch-fsdp 时
  • 询问 pytorch-fsdp 特性或 API 时
  • 实现 pytorch-fsdp 解决方案时
  • 调试 pytorch-fsdp 代码时
  • 学习 pytorch-fsdp 最佳实践时

快速参考

常见模式

模式 1: 通用 Join 上下文管理器# 创建时间:2025 年 6 月 6 日 | 最后更新:2025 年 6 月 6 日 通用 join 上下文管理器有助于在不均匀输入上进行分布式训练。本页概述了相关类的 API:Join、Joinable 和 JoinHook。教程请参见《使用 Join 上下文管理器进行不均匀输入分布式训练》。 class torch.distributed.algorithms.Join(joinables, enable=True, throw_on_early_termination=False, **kwargs)[source]# 此类定义了通用 join 上下文管理器,允许在进程加入后调用自定义钩子。这些钩子应遮蔽未加入进程的集体通信,以防止挂起和报错,并确保算法正确性。有关钩子定义的详细信息,请参阅 JoinHook。 警告 上下文管理器要求每个参与的 Joinable 在自身的每次迭代集体通信之前调用 notify_join_context() 方法,以确保正确性。 警告 上下文管理器要求所有 JoinHook 对象中的 process_group 属性相同。如果有多个 JoinHook 对象,则使用第一个对象的设备。进程组和设备信息用于检查未加入的进程,以及在启用 throw_on_early_termination 时通知进程抛出异常,两者均使用 all-reduce。 参数 joinables (List[Joinable]) – 参与的 Joinable 列表;它们的钩子按给定顺序迭代。 enable (bool) – 启用不均匀输入检测的标志;设置为 False 会禁用上下文管理器的功能,仅当用户知道输入不会不均匀时才应设置(默认值:True)。 throw_on_early_termination (bool) – 控制是否在检测到不均匀输入时抛出异常的标志(默认值:False)。 示例: >>> import os >>> import torch >>> import torch.distributed as dist >>> import torch.multiprocessing as mp >>> import torch.nn.parallel.DistributedDataParallel as DDP >>> import torch.distributed.optim.ZeroRedundancyOptimizer as ZeRO >>> from torch.distributed.algorithms.join import Join >>> >>> # 在每个生成的 worker 上 >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> model = DDP(torch.nn.Linear(1, 1).to(rank), device_ids=[rank]) >>> optim = ZeRO(model.parameters(), torch.optim.Adam, lr=0.01) >>> # rank 1 比 rank 0 多一个输入 >>> inputs = [torch.tensor([1.]).to(rank) for _ in range(10 + rank)] >>> with Join([model, optim]): >>> for input in inputs: >>> loss = model(input).sum() >>> loss.backward() >>> optim.step() >>> # 所有 rank 都能到达这里,不会挂起或报错 static notify_join_context(joinable)[source]# 通知 join 上下文管理器调用进程尚未加入。然后,如果 throw_on_early_termination=True,则检查是否检测到不均匀输入(即某个进程已加入),如果是则抛出异常。Joinable 对象应在自身的每次迭代集体通信之前调用此方法。例如,在 DistributedDataParallel 的前向传播开始时调用。只有传入上下文管理器的第一个 Joinable 对象在此方法中执行集体通信,其他 Joinable 对象调用此方法无实际作用。 参数 joinable (Joinable) – 调用此方法的 Joinable 对象。 返回 如果 joinable 是传入上下文管理器的第一个对象,则返回用于通知上下文管理器进程尚未加入的 all-reduce 的异步工作句柄;否则返回 None。 class torch.distributed.algorithms.Joinable[source]# 这定义了可 join 类的抽象基类。可 join 类(继承自 Joinable)应实现 join_hook()(返回 JoinHook 实例),以及 join_device() 和 join_process_group()(分别返回设备和进程组信息)。 abstract property join_device: device# 返回用于执行 join 上下文管理器所需集体通信的设备。 abstract join_hook(**kwargs)[source]# 返回给定 Joinable 的 JoinHook 实例。 参数 kwargs (dict) – 包含在运行时修改 join 钩子行为的任何关键字参数的字典;共享同一 join 上下文管理器的所有 Joinable 实例会收到相同的 kwargs 值。 返回类型 JoinHook abstract property join_process_group: Any# 返回 join 上下文管理器自身所需集体通信的进程组。 class torch.distributed.algorithms.JoinHook[source]# 这定义了 join 钩子,它在 join 上下文管理器中提供两个入口点。 入口点:主钩子,在存在未加入进程时重复调用;后钩子,在所有进程都加入后调用一次。 要为通用 join 上下文管理器实现 join 钩子,请定义一个继承自 JoinHook 的类,并适当重写 main_hook() 和 post_hook()。 main_hook()[source]# 在存在未加入进程时调用此钩子,以遮蔽训练迭代中的集体通信。训练迭代即一次前向传播、反向传播和优化器步骤。 post_hook(is_last_joiner)[source]# 在所有进程都加入后调用钩子。它接收一个额外的布尔参数 is_last_joiner,指示该 rank 是否为最后加入的 rank 之一。 参数 is_last_joiner (bool) – 如果该 rank 是最后加入的 rank 之一,则为 True;否则为 False。

Join

模式 2: 分布式通信包 - torch.distributed# 创建日期:2017年7月12日 | 最后更新日期:2025年9月4日 注意 请参考 PyTorch 分布式概述,了解与分布式训练相关的所有功能的简要介绍。 后端# torch.distributed 支持四个内置后端,每个后端具有不同的能力。下表显示了每个后端在 CPU 或 GPU 上可用的功能。对于 NCCL,GPU 指 CUDA GPU,而对于 XCCL,GPU 指 XPU GPU。MPI 仅在用于构建 PyTorch 的实现支持 CUDA 时才支持 CUDA。 后端 gloo mpi nccl xccl 设备 CPU GPU CPU GPU CPU GPU CPU GPU send ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ recv ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ broadcast ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ all_reduce ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ reduce ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ all_gather ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ gather ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ scatter ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ reduce_scatter ✓ ✓ ✘ ✘ ✘ ✓ ✘ ✓ all_to_all ✓ ✓ ✓ ? ✘ ✓ ✘ ✓ barrier ✓ ✘ ✓ ? ✘ ✓ ✘ ✓ PyTorch 自带的后端# PyTorch 分布式包支持 Linux(稳定版)、MacOS(稳定版)和 Windows(原型版)。默认情况下,对于 Linux,Gloo 和 NCCL 后端会被构建并包含在 PyTorch 分布式包中(仅当使用 CUDA 构建时才包含 NCCL)。MPI 是一个可选后端,只有从源码构建 PyTorch 时才能包含(例如,在安装了 MPI 的主机上构建 PyTorch)。 注意 从 PyTorch v1.8 开始,Windows 支持除 NCCL 之外的所有集合通信后端。如果 init_process_group() 的 init_method 参数指向一个文件,则必须遵循以下模式: 本地文件系统,init_method="file:///d:/tmp/some_file" 共享文件系统,init_method="file://////{machine_name}/{share_folder_name}/some_file" 与 Linux 平台相同,你可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。 应该使用哪个后端?# 过去,我们经常被问到:“我应该使用哪个后端?”。经验法则 对于使用 CUDA GPU 的分布式训练,使用 NCCL 后端。 对于使用 XPU GPU 的分布式训练,使用 XCCL 后端。 对于使用 CPU 的分布式训练,使用 Gloo 后端。 具有 InfiniBand 互连的 GPU 主机 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。 具有以太网互连的 GPU 主机 使用 NCCL,因为它目前提供了最佳的分布式 GPU 训练性能,特别是对于多进程单节点或多节点分布式训练。如果你遇到任何 NCCL 问题,请使用 Gloo 作为后备选项。(注意,Gloo 目前在 GPU 上运行速度比 NCCL 慢。) 具有 InfiniBand 互连的 CPU 主机 如果你的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则使用 MPI。我们计划在未来的版本中为 Gloo 添加 InfiniBand 支持。 具有以太网互连的 CPU 主机 使用 Gloo,除非你有特定理由使用 MPI。 常见环境变量# 选择要使用的网络接口# 默认情况下,NCCL 和 Gloo 后端都会尝试找到正确的网络接口。如果自动检测到的接口不正确,你可以使用以下环境变量覆盖它(适用于各自的后端): NCCL_SOCKET_IFNAME,例如 export NCCL_SOCKET_IFNAME=eth0 GLOO_SOCKET_IFNAME,例如 export GLOO_SOCKET_IFNAME=eth0 如果你使用 Gloo 后端,可以通过逗号分隔指定多个接口,如下所示:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。后端将以轮询方式在这些接口之间分配操作。所有进程必须在此变量中指定相同数量的接口。 其他 NCCL 环境变量# 调试 - 如果 NCCL 失败,你可以设置 NCCL_DEBUG=INFO 来打印显式警告消息以及基本的 NCCL 初始化信息。你也可以使用 NCCL_DEBUG_SUBSYS 来获取 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL 会打印集合调用的日志,这可能有助于调试挂起,特别是由集合类型或消息大小不匹配引起的挂起。如果发生拓扑检测失败,设置 NCCL_DEBUG_SUBSYS=GRAPH 来检查详细的检测结果并保存为参考,以便在需要 NCCL 团队的进一步帮助时使用。 性能调优 - NCCL 基于其拓扑检测自动进行调优,以节省用户的调优工作。在一些基于套接字的系统上,用户可能仍然可以尝试调整 NCCL_SOCKET_NTHREADS 和 NCCL_NSOCKS_PERTHREAD 来增加套接字网络带宽。这两个环境变量已被 NCCL 为某些云提供商(如 AWS 或 GCP)预先调优。 有关 NCCL 环境变量的完整列表,请参考 NVIDIA NCCL 的官方文档 你可以使用 torch.distributed.ProcessGroupNCCL.NCCLConfig 和 torch.distributed.ProcessGroupNCCL.Options 进一步调整 NCCL 通信器。在解释器中使用 help(例如 help(torch.distributed.ProcessGroupNCCL.NCCLConfig))了解更多信息。 基础# torch.distributed 包为跨一个或多个机器上的多个计算节点的多进程并行提供了 PyTorch 支持和通信原语。类 torch.nn.parallel.DistributedDataParallel() 建立在此功能之上,作为任何 PyTorch 模型的包装器提供同步分布式训练。这与多进程包 - torch.multiprocessing 和 torch.nn.DataParallel() 提供的并行类型不同,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。在单机同步情况下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel() 包装器可能仍然比其他数据并行方法(包括 torch.nn.DataParallel())具有优势: 每个进程维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这看起来是冗余的,因为梯度已经跨进程收集并平均,因此每个进程的梯度相同,但这意味着不需要参数广播步骤,从而减少了在节点之间传输张量的时间。 每个进程包含一个独立的 Python 解释器,消除了从单个 Python 进程驱动多个执行线程、模型副本或 GPU 所带来的额外解释器开销和“GIL 抖动”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。 初始化# 在调用任何其他方法之前,需要使用 torch.distributed.init_process_group() 或 torch.distributed.device_mesh.init_device_mesh() 函数初始化包。两者都会阻塞,直到所有进程都加入。 警告 初始化不是线程安全的。进程组创建应在单个线程中执行,以防止跨 rank 的 'UUID' 分配不一致,并防止初始化期间的竞争导致挂起。 torch.distributed.is_available()[source]# 如果分布式包可用,则返回 True。否则,torch.distributed 不会公开任何其他 API。目前,torch.distributed 在 Linux、MacOS 和 Windows 上可用。从源码构建 PyTorch 时,设置 USE_DISTRIBUTED=1 以启用它。目前,Linux 和 Windows 的默认值为 USE_DISTRIBUTED=1,MacOS 的默认值为 USE_DISTRIBUTED=0。 返回类型 bool torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]# 初始化默认的分布式进程组。这也会初始化分布式包。 有两种主要方式初始化进程组: 显式指定 store、rank 和 world_size。 指定 init_method(一个 URL 字符串),指示如何/在哪里发现对等节点。可选地指定 rank 和 world_size,或者在 URL 中编码所有必需参数并省略它们。 如果两者都未指定,则 init_method 假定为 "env://"。 参数 backend (str 或 Backend, 可选) – 要使用的后端。根据构建时的配置,有效值包括 mpi、gloo、nccl、ucc、xccl 或由第三方插件注册的后端。从 2.6 开始,如果未提供 backend,c10d 将使用为 device_id 关键字参数指示的设备类型注册的后端(如果提供了)。目前已知的默认注册是:nccl 用于 cuda,gloo 用于 cpu,xccl 用于 xpu。如果既未提供 backend 也未提供 device_id,c10d 将检测运行时机器上的加速器,并使用为该检测到的加速器(或 cpu)注册的后端。此字段可以以小写字符串形式给出(例如,"gloo"),也可以通过 Backend 属性访问(例如,Backend.GLOO)。如果使用 nccl 后端每台机器有多个进程,则每个进程必须独占访问其使用的每个 GPU,因为在进程之间共享 GPU 可能导致死锁或 NCCL 无效使用。ucc 后端是实验性的。可以使用 get_default_backend_for_device() 查询设备的默认后端。 init_method (str, 可选) – 指定如何初始化进程组的 URL。如果未指定 init_method 或 store,则默认为 "env://"。与 store 互斥。 world_size (int, 可选) – 参与作业的进程数。如果指定了 store,则为必需。 rank (int, 可选) – 当前进程的 rank(应为 0 到 world_size-1 之间的数字)。如果指定了 store,则为必需。 store (Store, 可选) – 所有工作进程可访问的键/值存储,用于交换连接/地址信息。与 init_method 互斥。 timeout (timedelta, 可选) – 针对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端为 30 分钟。这是集合操作将被异步中止并且进程将崩溃的持续时间。这样做是因为 CUDA 执行是异步的,并且继续执行用户代码不再安全,因为失败的异步 NCCL 操作可能导致后续 CUDA 操作在损坏的数据上运行。当设置了 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。 group_name (str, 可选, 已弃用) – 组名。此参数被忽略。 pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组时需要传入的附加选项。目前,我们唯一支持的选项是用于 nccl 后端的 ProcessGroupNCCL.Options,可以指定 is_high_priority_stream,以便 nccl 后端在有计算内核等待时选择高优先级的 cuda 流。有关配置 nccl 的其他可用选项,请参见 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t device_id (torch.device | int, 可选) – 此进程将工作的单个特定设备,允许后端特定的优化。目前,仅在 NCCL 下有两个效果:立即形成通信器(立即调用 ncclCommInit* 而不是正常的惰性调用),并且子组将尽可能使用 ncclCommSplit 以避免组创建的不必要开销。如果你想尽早知道 NCCL 初始化错误,也可以使用此字段。如果提供了 int,API 假定将使用编译时的加速器类型。 注意 要启用 backend == Backend.MPI,需要在支持 MPI 的系统上从源码构建 PyTorch。 注意 对多个后端的支持是实验性的。目前,当未指定后端时,将同时创建 gloo 和 nccl 后端。gloo 后端将用于 CPU 张量的集合操作,nccl 后端将用于 CUDA 张量的集合操作。可以通过传入格式为 "<device_type>:<backend_name>,<device_type>:<backend_name>" 的字符串来指定自定义后端,例如 "cpu:gloo,cuda:custom_backend"。 torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]# 基于 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度被标记为 mesh_dim_names[i]。 注意 init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有 rank 上相同。不一致的 mesh_shape 可能导致挂起。 注意 如果未找到进程组,init_device_mesh 将在后台初始化分布式通信所需的分布式进程组/组。 参数 device_type (str) – 网格的设备类型。目前支持:"cpu"、"cuda/cuda-like"、"xpu"。不允许传入带有 GPU 索引的设备类型,例如 "cuda:0"。 mesh_shape (Tuple[int]) – 定义描述设备布局的多维数组维度的元组。 mesh_dim_names (Tuple[str], 可选) – 网格维度名称的元组,用于分配给描述设备布局的多维数组的每个维度。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须唯一。 backend_override (Dict[int | str, tuple[str, Options] | str | Options], 可选) – 对将为每个网格维度创建的部分或全部 ProcessGroup 的覆盖。每个键可以是维度的索引或其名称(如果提供了 mesh_dim_names)。每个值可以是一个包含后端名称及其选项的元组,或者只是这两个组件之一(在这种情况下,另一个将设置为其默认值)。 返回 表示设备布局的 DeviceMesh 对象。 返回类型 DeviceMesh 示例: >>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) torch.distributed.is_initialized()[source]# 检查默认进程组是否已初始化。 返回类型 bool torch.distributed.is_mpi_available()[source]# 检查 MPI 后端是否可用。 返回类型 bool torch.distributed.is_nccl_available()[source]# 检查 NCCL 后端是否可用。 返回类型 bool torch.distributed.is_gloo_available()[source]# 检查 Gloo 后端是否可用。 返回类型 bool torch.distributed.distributed_c10d.is_xccl_available()[source]# 检查 XCCL 后端是否可用。 返回类型 bool torch.distributed.is_torchelastic_launched()[source]# 检查此进程是否由 torch.distributed.elastic(又名 torchelastic)启动。TORCHELASTIC_RUN_ID 环境变量的存在被用作代理,以确定当前进程是否由 torchelastic 启动。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,该 id 始终是非空值,表示用于对等发现目的的作业 id。 返回类型 bool torch.distributed.get_default_backend_for_device(device)[source]# 返回给定设备的默认后端。 参数 device (Union[str, torch.device]) – 要获取默认后端的设备。 返回 给定设备的默认后端,以小写字符串形式返回。 返回类型 str 目前支持三种初始化方法: TCP 初始化# 有两种使用 TCP 初始化的方法,都需要一个所有进程可达的网络地址和一个期望的 world_size。第一种方法需要指定属于 rank 0 进程的地址。此初始化方法要求所有进程手动指定 rank。请注意,最新的分布式包中不再支持多播地址。group_name 也已弃用。 import torch.distributed as dist # 使用其中一台机器的地址 dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4) 共享文件系统初始化# 另一种初始化方法利用组中所有机器共享且可见的文件系统,以及期望的 world_size。URL 应以 file:// 开头,并包含共享文件系统上不存在的文件(在现有目录中)的路径。文件系统初始化将自动创建该文件(如果不存在),但不会删除该文件。因此,你有责任确保在下次对同一文件路径/名称调用 init_process_group() 之前清理该文件。请注意,最新的分布式包中不再支持自动 rank 分配,并且 group_name 也已弃用。 警告 此方法假定文件系统支持使用 fcntl 进行锁定 - 大多数本地系统和 NFS 都支持。 警告 此方法将始终创建文件,并尽力在程序结束时清理和删除该文件。换句话说,每次使用文件 init 方法进行初始化都需要一个全新的空文件才能成功。如果再次使用前一次初始化(恰好未被清理)使用的同一文件,则会出现意外行为,并且通常会导致死锁和失败。因此,即使此方法会尽力清理文件,如果自动删除不成功,你有责任确保在训练结束时删除该文件,以防止下次再次使用同一文件。如果你计划在同一文件名上多次调用 init_process_group(),这一点尤其重要。换句话说,如果文件未被删除/清理,并且你再次对该文件调用 init_process_group(),则预期会失败。这里的经验法则是,确保每次调用 init_process_group() 时文件不存在或为空。 import torch.distributed as dist # 应始终指定 rank dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank) 环境变量初始化# 此方法将从环境变量读取配置,允许完全自定义如何获取信息。要设置的变量是: MASTER_PORT - 必需;必须是 rank 0 机器上的空闲端口 MASTER_ADDR - 必需(rank 0 除外);rank 0 节点的地址 WORLD_SIZE - 必需;可以在此处设置,也可以在调用 init 函数时设置 RANK - 必需;可以在此处设置,也可以在调用 init 函数时设置 rank 0 的机器将用于建立所有连接。这是默认方法,意味着不必指定 init_method(或者可以指定为 env://)。 改进初始化时间# TORCH_GLOO_LAZY_INIT - 按需建立连接,而不是使用全网格,这可以大大改善非 all2all 操作的初始化时间。 初始化后# 一旦运行了 torch.distributed.init_process_group(),就可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()。 class torch.distributed.Backend(name)[source]# 一个类似枚举的类,用于后端。可用的后端:GLOO、NCCL、UCC、MPI、XCCL 以及其他注册的后端。此类的值是小写字符串,例如 "gloo"。它们可以作为属性访问,例如 Backend.NCCL。此类可以直接调用来解析字符串,例如 Backend(backend_str) 将检查 backend_str 是否有效,如果有效则返回解析后的小写字符串。它也接受大写字符串,例如 Backend("GLOO") 返回 "gloo"。 注意 Backend.UNDEFINED 条目存在,但仅用作某些字段的初始值。用户不应直接使用它,也不应假定其存在。 classmethod register_backend(name, func, extended_api=False, devices=None)[source]# 使用给定的名称和实例化函数注册一个新的后端。此 classmethod 由第三方 ProcessGroup 扩展用于注册新后端。 参数 name (str) – ProcessGroup 扩展的后端名称。它应与 init_process_group() 中的名称匹配。 func (function) – 实例化后端的函数处理程序。该函数应在后端扩展中实现,并接受四个参数,包括 store、rank、world_size 和 timeout。 extended_api (bool, 可选) – 后端是否支持扩展参数结构。默认值:False。如果设置为 True,后端将获得一个 c10d::DistributedBackendOptions 实例,以及一个由后端实现定义的进程组选项对象。 device (str 或 list of str, 可选) – 此后端支持的设备类型,例如 "cpu"、"cuda" 等。如果为 None,则假定同时支持 "cpu" 和 "cuda"。 注意 对第三方后端的支持是实验性的,可能会发生变化。 torch.distributed.get_backend(group=None)[source]# 返回给定进程组的后端。 参数 group (ProcessGroup, 可选) – 要操作的进程组。默认是通用的主进程组。如果指定了另一个特定组,则调用进程必须是该组的一部分。 返回 给定进程组的后端,以小写字符串形式返回。 返回类型 Backend torch.distributed.get_rank(group=None)[source]# 返回当前进程在提供的组中的 rank,否则返回默认值。Rank 是分配给分布式进程组中每个进程的唯一标识符。它们始终是从 0 到 world_size 的连续整数。 参数 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 返回 进程组的 rank -1,如果不是该组的一部分 返回类型 int torch.distributed.get_world_size(group=None)[source]# 返回当前进程组中的进程数。 参数 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 返回 进程组的 world size -1,如果不是该组的一部分 返回类型 int 关闭# 通过调用 destroy_process_group() 在退出时清理资源非常重要。最简单的模式是在训练脚本中不再需要通信时(通常接近 main() 的末尾),使用 group 参数的默认值 None 调用 destroy_process_group() 来销毁每个进程组和后端。每个训练器进程应调用一次,而不是在外部进程启动器级别调用。 如果进程组中的所有 rank 未在超时时间内调用 destroy_process_group(),特别是当应用程序中有多个进程组时(例如用于 N-D 并行),则可能发生退出挂起。这是因为 ProcessGroupNCCL 的析构函数调用 ncclCommAbort,该函数必须集体调用,但如果由 Python 的 GC 调用 ProcessGroupNCCL 的析构函数,则调用顺序是不确定的。调用 destroy_process_group() 有助于确保跨 rank 以一致顺序调用 ncclCommAbort,并避免在 ProcessGroupNCCL 的析构函数期间调用 ncclCommAbort。 重新初始化# destroy_process_group 也可用于销毁单个进程组。一个用例可能是容错训练,其中进程组可能在运行时被销毁,然后初始化一个新的。在这种情况下,至关重要的是在调用 destroy 之后和随后初始化之前,使用 torch.distributed 原语以外的某种方式同步训练器进程。由于实现这种同步的困难,此行为目前不受支持/未经测试,并被视为已知问题。如果这个用例阻碍了你,请提交 github issue 或 RFC。 组# 默认情况下,集合操作在默认组(也称为 world)上执行,并要求所有进程进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这就是分布式组发挥作用的地方。new_group() 函数可用于创建新组,包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group 参数传递给所有集合操作(集合操作是以某些众所周知的编程模式交换信息的分布式函数)。 torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]# 创建一个新的分布式组。此函数要求主组中的所有进程(即分布式作业的所有进程)都进入此函数,即使它们不打算成为该组的成员。此外,应在所有进程中以相同顺序创建组。 警告 安全并发使用:当使用 NCCL 后端使用多个进程组时,用户必须确保跨 rank 的集合操作执行顺序全局一致。如果进程内的多个线程发出集合操作,则需要显式同步以确保一致排序。当使用 torch.distributed 通信 API 的异步变体时,会返回一个 work 对象,并且通信内核被排队到单独的 CUDA 流上,允许通信和计算重叠。一旦在一个进程组上发出一个或多个异步操作,必须通过调用 work.wait() 将它们与其他 cuda 流同步,然后再使用另一个进程组。有关更多详细信息,请参见同时使用多个 NCCL 通信器 <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently>。 参数 ranks (list[int]) – 组成员的 rank 列表。如果为 None,将设置为所有 rank。默认值为 None。 timeout (timedelta, 可选) – 有关详细信息和默认值,请参见 init_process_group。 backend (str 或 Backend, 可选) – 要使用的后端。根据构建时的配置,有效值为 gloo 和 nccl。默认情况下,使用与全局组相同的后端。此字段应作为小写字符串给出(例如,"gloo"),也可以通过 Backend 属性访问(例如,Backend.GLOO)。如果传入 None,将使用与默认进程组对应的后端。默认值为 None。 pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组时需要传入的附加选项。例如,对于 nccl 后端,可以指定 is_high_priority_stream,以便进程组可以选择高优先级的 cuda 流。有关配置 nccl 的其他可用选项,请参见 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t use_local_synchronization (bool, 可选): 在进程组创建结束时执行组本地屏障。不同之处在于非成员 rank 不需要调用 API 并且不加入屏障。 group_desc (str, 可选) – 描述进程组的字符串。 device_id (torch.device, 可选) – 将此进程“绑定”到的单个特定设备。如果提供了此字段,new_group 调用将尝试立即为设备初始化通信后端。 返回 分布式组的句柄,可以传递给集合调用,如果 rank 不是 ranks 的一部分,则返回 GroupMember.NON_GROUP_MEMBER。 N.B. use_local_synchronization 不适用于 MPI。 N.B. 虽然 use_local_synchronization=True 在较大集群和小型进程组中可以显著加快速度,但必须小心,因为它会改变集群行为,因为非成员 rank 不加入组 barrier()。 N.B. use_local_synchronization=True 可能导致死锁,当每个 rank 创建多个重叠的进程组时。为避免这种情况,请确保所有 rank 遵循相同的全局创建顺序。 torch.distributed.get_group_rank(group, global_rank)[source]# 将全局 rank 转换为组 rank。global_rank 必须是 group 的一部分,否则会引发 RuntimeError。 参数 group (ProcessGroup) – 要查找相对 rank 的 ProcessGroup。 global_rank (int) – 要查询的全局 rank。 返回 global_rank 相对于 group 的组 rank 返回类型 int N.B. 在默认进程组上调用此函数返回恒等映射 torch.distributed.get_global_rank(group, group_rank)[source]# 将组 rank 转换为全局 rank。group_rank 必须是 group 的一部分,否则会引发 RuntimeError。 参数 group (ProcessGroup) – 要从中查找全局 rank 的 ProcessGroup。 group_rank (int) – 要查询的组 rank。 返回 group_rank 相对于 group 的全局 rank 返回类型 int N.B. 在默认进程组上调用此函数返回恒等映射 torch.distributed.get_process_group_ranks(group)[source]# 获取与 group 关联的所有 rank。 参数 group (Optional[ProcessGroup]) – 要从中获取所有 rank 的 ProcessGroup。如果为 None,将使用默认进程组。 返回 按组 rank 排序的全局 rank 列表。 返回类型 list[int] DeviceMesh# DeviceMesh 是一个更高级别的抽象,用于管理进程组(或 NCCL 通信器)。它允许用户轻松创建节点间和节点内进程组,而无需担心如何为不同的子进程组正确设置 rank,并帮助轻松管理这些分布式进程组。init_device_mesh() 函数可用于创建新的 DeviceMesh,其中包含描述设备拓扑的网格形状。 class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, backend_override=None, _init_backend=True)[source]# DeviceMesh 表示设备的网格,其中设备的布局可以表示为 n 维数组,n 维数组的每个值是默认进程组 rank 的全局 id。DeviceMesh 可用于设置跨集群的 N 维设备连接,并管理用于 N 维并行的 ProcessGroup。通信可以在 DeviceMesh 的每个维度上分别发生。DeviceMesh 尊重用户已经选择的设备(即,如果用户在 DeviceMesh 初始化之前调用了 torch.cuda.set_device),并且如果用户事先没有设置设备,将选择/设置当前进程的设备。请注意,手动设备选择应在 DeviceMesh 初始化之前进行。DeviceMesh 也可以在与 DTensor API 一起使用时用作上下文管理器。 注意 DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。因此,用户需要确保网格数组(描述设备布局)在所有 rank 上相同。不一致的网格将导致静默挂起。 参数 device_type (str) – 网格的设备类型。目前支持:"cpu"、"cuda/cuda-like"。 mesh (ndarray) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。 返回 表示设备布局的 DeviceMesh 对象。 返回类型 DeviceMesh 以下程序以 SPMD 方式在每个进程/rank 上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。在网格的第一个维度上进行归约将跨列 (0, 4), .. 和 (3, 7) 进行归约,在网格的第二个维度上进行归约将跨行 (0, 1, 2, 3) 和 (4, 5, 6, 7) 进行归约。 示例: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 将设备网格初始化为 (2, 4) 以表示拓扑 >>> # 跨主机 (dim 0) 和主机内 (dim 1)。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source]# 从现有的 ProcessGroup 或 ProcessGroup 列表中使用 device_type 构造 DeviceMesh。构造的设备网格的维度数等于传入的组数。例如,如果传入单个进程组,则生成的 DeviceMesh 是 1D 网格。如果传入 2 个进程组的列表,则生成的 DeviceMesh 是 2D 网格。如果传入多个组,则 mesh 和 mesh_dim_names 参数是必需的。传入的进程组的顺序决定了网格的拓扑。例如,第一个进程组将是 DeviceMesh 的第 0 维。传入的网格张量必须具有与传入的进程组数量相同的维度数,并且网格张量中维度的顺序必须与传入的进程组中的顺序匹配。 参数 group (ProcessGroup 或 list[ProcessGroup]) – 现有的 ProcessGroup 或 ProcessGroup 列表。 device_type (str) – 网格的设备类型。目前支持:"cpu"、"cuda/cuda-like"。不允许传入带有 GPU 索引的设备类型,例如 "cuda:0"。 mesh (torch.Tensor 或 ArrayLike, 可选) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。默认值为 None。 mesh_dim_names (tuple[str], 可选) – 网格维度名称的元组,用于分配给描述设备布局的多维数组的每个维度。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须唯一。默认值为 None。 返回 表示设备布局的 DeviceMesh 对象。 返回类型 DeviceMesh get_all_groups()[source]# 返回所有网格维度的 ProcessGroup 列表。 返回 ProcessGroup 对象列表。 返回类型 list[torch.distributed.distributed_c10d.ProcessGroup] get_coordinate()[source]# 返回此 rank 相对于网格所有维度的相对索引。如果此 rank 不是网格的一部分,则返回 None。 返回类型 Optional[list[int]] get_group(mesh_dim=None)[source]# 返回由 mesh_dim 指定的单个 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是 1 维的,则返回网格中唯一的 ProcessGroup。 参数 mesh_dim (str/python:int, 可选) – 可以是网格维度的名称或索引 None。(网格维度的索引。默认值为) – 返回 ProcessGroup 对象。 返回类型 ProcessGroup get_local_rank(mesh_dim=None)[source]# 返回 DeviceMesh 的给定 mesh_dim 的本地 rank。 参数 mesh_dim (str/python:int, 可选) – 可以是网格维度的名称或索引 None。(网格维度的索引。默认值为) – 返回表示本地 rank 的整数。 返回类型 int 以下程序以 SPMD 方式在每个进程/rank 上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。在 rank 0、1、2、3 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 0。在 rank 4、5、6、7 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 1。在 rank 0、4 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 0。在 rank 1、5 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 1。在 rank 2、6 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 2。在 rank 3、7 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 3。 示例: >>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 将设备网格初始化为 (2, 4) 以表示拓扑 >>> # 跨主机 (dim 0) 和主机内 (dim 1)。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]]) get_rank()[source]# 返回当前全局 rank。 返回类型 int 点对点通信# torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# 同步发送张量。 警告 NCCL 后端不支持 tag。 参数 tensor (Tensor) – 要发送的张量。 dst (int) – 全局进程组上的目标 rank(无论 group 参数如何)。目标 rank 不应与当前进程的 rank 相同。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 tag (int, 可选) – 用于匹配发送与远程接收的标签 group_dst (int, 可选) – 组上的目标 rank。不允许同时指定 dst 和 group_dst。 torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source]# 同步接收张量。 警告 NCCL 后端不支持 tag。 参数 tensor (Tensor) – 要用接收数据填充的张量。 src (int, 可选) – 全局进程组上的源 rank(无论 group 参数如何)。如果未指定,将从任何进程接收。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 tag (int, 可选) – 用于匹配接收与远程发送的标签 group_src (int, 可选) – 组上的目标 rank。不允许同时指定 src 和 group_src。 返回 发送方 rank -1,如果不是该组的一部分 返回类型 int isend() 和 irecv() 在使用时返回分布式请求对象。通常,此对象的类型未指定,因为它们永远不应手动创建,但保证支持两种方法: is_completed() - 如果操作已完成,则返回 True wait() - 将阻塞进程,直到操作完成。 is_completed() 保证在返回后返回 True。 torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source]# 异步发送张量。 警告 在请求完成之前修改张量会导致未定义的行为。 警告 NCCL 后端不支持 tag。 与阻塞的 send 不同,isend 允许 src == dst rank,即发送给自己。 参数 tensor (Tensor) – 要发送的张量。 dst (int) – 全局进程组上的目标 rank(无论 group 参数如何) group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 tag (int, 可选) – 用于匹配发送与远程接收的标签 group_dst (int, 可选) – 组上的目标 rank。不允许同时指定 dst 和 group_dst 返回 分布式请求对象。None,如果不是该组的一部分 返回类型 Optional[Work] torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source]# 异步接收张量。 警告 NCCL 后端不支持 tag。 与阻塞的 recv 不同,irecv 允许 src == dst rank,即从自己接收。 参数 tensor (Tensor) – 要用接收数据填充的张量。 src (int, 可选) – 全局进程组上的源 rank(无论 group 参数如何)。如果未指定,将从任何进程接收。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 tag (int, 可选) – 用于匹配接收与远程发送的标签 group_src (int, 可选) – 组上的目标 rank。不允许同时指定 src 和 group_src。 返回 分布式请求对象。None,如果不是该组的一部分 返回类型 Optional[Work] torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None, use_batch=False)[source]# 同步发送 object_list 中的可 pickle 对象。类似于 send(),但可以传入 Python 对象。请注意,object_list 中的所有对象必须可 pickle 才能发送。 参数 object_list (List[Any]) – 要发送的输入对象列表。每个对象必须可 pickle。接收方必须提供相同大小的列表。 dst (int) – 要将 object_list 发送到的目标 rank。目标 rank 基于全局进程组(无论 group 参数如何) group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要操作的进程组。如果为 None,将使用默认进程组。默认值为 None。 device (torch.device, 可选) – 如果不为 None,则对象被序列化并转换为张量,然后在发送前移动到该设备。默认值为 None。 group_dst (int, 可选) – 组上的目标 rank。必须指定 dst 和 group_dst 之一,但不能同时指定 use_batch (bool, 可选) – 如果为 True,则使用批量 p2p 操作而不是常规发送操作。这避免了初始化 2-rank 通信器,并使用现有的整个组通信器。有关用法和假设,请参见 batch_isend_irecv。默认值为 False。 返回 None。 注意 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任通过 torch.cuda.set_device() 确保设置此设备,以便每个 rank 都有一个单独的 GPU。 警告 对象集合操作有许多严重的性能和可扩展性限制。有关详细信息,请参见对象集合操作。 警告 send_object_list() 隐式使用 pickle 模块,该模块已知不安全。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。仅对您信任的数据调用此函数。 警告 使用 GPU 张量调用 send_object_list() 不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被 pickle。请考虑改用 send()。 示例::>>> # 注意:每个 rank 上省略了进程组初始化。 >>> import torch.distributed as dist >>> # 假设后端不是 NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # 假设 world_size 为 2。 >>> objects = ["foo", 12, {1: 2}] # 任何可 pickle 的对象 >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None, use_batch=False)[source]# 同步接收 object_list 中的可 pickle 对象。类似于 recv(),但可以接收 Python 对象。 参数 object_list (List[Any]) – 要接收到的对象列表。必须提供与发送列表大小相等的列表。 src (int, 可选) – 要从中接收 object_list 的源 rank。源 rank 基于全局进程组(无论 group 参数如何)。如果设置为 None,将从任何 rank 接收。默认值为 None。 group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要操作的进程组。如果为 None,将使用默认进程组。默认值为 None。 device (torch.device, 可选) – 如果不为 None,则在此设备上接收。默认值为 None。 group_src (int, 可选) – 组上的目标 rank。不允许同时指定 src 和 group_src。 use_batch (bool, 可选) – 如果为 True,则使用批量 p2p 操作而不是常规发送操作。这避免了初始化 2-rank 通信器,并使用现有的整个组通信器。有关用法和假设,请参见 batch_isend_irecv。默认值为 False。 返回 发送方 rank。如果 rank 不是组的一部分,则为 -1。如果 rank 是组的一部分,则 object_list 将包含来自 src rank 的发送对象。 注意 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任通过 torch.cuda.set_device() 确保设置此设备,以便每个 rank 都有一个单独的 GPU。 警告 对象集合操作有许多严重的性能和可扩展性限制。有关详细信息,请参见对象集合操作。 警告 recv_object_list() 隐式使用 pickle 模块,该模块已知不安全。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。仅对您信任的数据调用此函数。 警告 使用 GPU 张量调用 recv_object_list() 不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被 pickle。请考虑改用 recv()。 示例::>>> # 注意:每个 rank 上省略了进程组初始化。 >>> import torch.distributed as dist >>> # 假设后端不是 NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # 假设 world_size 为 2。 >>> objects = ["foo", 12, {1: 2}] # 任何可 pickle 的对象 >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.batch_isend_irecv(p2p_op_list)[source]# 异步发送或接收一批张量,并返回请求列表。处理 p2p_op_list 中的每个操作,并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。 参数 p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – 点对点操作列表(每个操作符的类型为 torch.distributed.P2POp)。列表中 isend/irecv 的顺序很重要,并且需要与远程端相应的 isend/irecv 匹配。 返回 通过调用 op_list 中相应操作返回的分布式请求对象列表。 返回类型 list[torch.distributed.distributed_c10d.Work] 示例 >>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size) >>> recv_op = dist.P2POp( ... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size ... ) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1 注意 请注意,当此 API 与 NCCL PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前 GPU 设备,否则会导致意外的挂起问题。此外,如果此 API 是传递给 dist.P2POp 的组中的第一个集合调用,则该组的所有 rank 都必须参与此 API 调用;否则,行为未定义。如果此 API 调用不是该组中的第一个集合调用,则允许仅涉及该组 rank 子集的批量 P2P 操作。 class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source]# 一个用于为 batch_isend_irecv 构建点对点操作的类。此类构建 P2P 操作的类型、通信缓冲区、对等 rank、进程组和标签。此类的实例将传递给 batch_isend_irecv 进行点对点通信。 参数 op (Callable) – 向对等进程发送数据或从对等进程接收数据的函数。op 的类型是 torch.distributed.isend 或 torch.distributed.irecv。 tensor (Tensor) – 要发送或接收的张量。 peer (int, 可选) – 目标或源 rank。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 tag (int, 可选) – 用于匹配发送与接收的标签。 group_peer (int, 可选) – 目标或源 rank。 同步和异步集合操作# 每个集合操作函数都支持以下两种操作,具体取决于传递给集合的 async_op 标志的设置: 同步操作 - 默认模式,当 async_op 设置为 False 时。当函数返回时,保证集合操作已执行。对于 CUDA 操作,不保证 CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集合操作,任何进一步利用集合调用输出的函数调用都将按预期运行。对于 CUDA 集合操作,在同一 CUDA 流上利用输出的函数调用将按预期运行。用户必须注意在不同流下运行时的同步。有关 CUDA 语义(如流同步)的详细信息,请参见 CUDA 语义。请参阅下面的脚本,了解 CPU 和 CUDA 操作在这些语义上的差异示例。 异步操作 - 当 async_op 设置为 True 时。集合操作函数返回一个分布式请求对象。通常,你不需要手动创建它,它保证支持两种方法: is_completed() - 对于 CPU 集合操作,如果完成则返回 True。对于 CUDA 操作,如果操作已成功排队到 CUDA 流上,并且可以在默认流上使用输出而无需进一步同步,则返回 True。 wait() - 对于 CPU 集合操作,将阻塞进程直到操作完成。对于 CUDA 集合操作,将阻塞当前活动的 CUDA 流直到操作完成(但不会阻塞 CPU)。 get_future() - 返回 torch.C.Future 对象。支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,但点对点操作除外。注意:随着我们继续采用 Future 并合并 API,get_future() 调用可能会变得多余。 示例 以下代码可以作为使用分布式集合时 CUDA 操作语义的参考。它显示了在不同 CUDA 流上使用集合输出时需要显式同步: # 代码在每个 rank 上运行。 dist.init_process_group("nccl", rank=rank, world_size=2) output = torch.tensor([rank]).cuda(rank) s = torch.cuda.Stream() handle = dist.all_reduce(output, async_op=True) # Wait 确保操作已排队,但不一定完成。 handle.wait() # 在非默认流上使用结果。 with torch.cuda.stream(s): s.wait_stream(torch.cuda.default_stream()) output.add(100) if rank == 0: # 如果省略了对 wait_stream 的显式调用,则下面的输出将是 # 非确定性的 1 或 101,具体取决于 allreduce 是否在 add 完成后 # 覆盖了该值。 print(output) 集合函数# torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source]# 将张量广播到整个组。参与集合的所有进程中的 tensor 必须具有相同数量的元素。 参数 tensor (Tensor) – 如果 src 是当前进程的 rank,则为要发送的数据;否则为用于保存接收数据的张量。 src (int) – 全局进程组上的源 rank(无论 group 参数如何)。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 group_src (int) – 组上的源 rank。必须指定 group_src 和 src 之一,但不能同时指定。 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不是组的一部分,则返回 None。 torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source]# 将 object_list 中的可 pickle 对象广播到整个组。类似于 broadcast(),但可以传入 Python 对象。请注意,object_list 中的所有对象必须可 pickle 才能广播。 参数 object_list (List[Any]) – 要广播的输入对象列表。每个对象必须可 pickle。只有 src rank 上的对象会被广播,但每个 rank 必须提供相同大小的列表。 src (int) – 要从中广播 object_list 的源 rank。源 rank 基于全局进程组(无论 group 参数如何) group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要操作的进程组。如果为 None,将使用默认进程组。默认值为 None。 device (torch.device, 可选) – 如果不为 None,则对象被序列化并转换为张量,然后在广播前移动到该设备。默认值为 None。 group_src (int) – 组上的源 rank。必须指定 group_src 和 src 之一,但不能同时指定。 返回 None。如果 rank 是组的一部分,则 object_list 将包含来自 src rank 的广播对象。 注意 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任通过 torch.cuda.set_device() 确保设置此设备,以便每个 rank 都有一个单独的 GPU。 注意 请注意,此 API 与 broadcast() 集合略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。 警告 对象集合操作有许多严重的性能和可扩展性限制。有关详细信息,请参见对象集合操作。 警告 broadcast_object_list() 隐式使用 pickle 模块,该模块已知不安全。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。仅对您信任的数据调用此函数。 警告 使用 GPU 张量调用 broadcast_object_list() 不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被 pickle。请考虑改用 broadcast()。 示例::>>> # 注意:每个 rank 上省略了进程组初始化。 >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # 假设 world_size 为 3。 >>> objects = ["foo", 12, {1: 2}] # 任何可 pickle 的对象 >>> else: >>> objects = [None, None, None] >>> # 假设后端不是 NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}] torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]# 以所有机器都获得最终结果的方式跨所有机器归约张量数据。调用后,tensor 在所有进程中按位相同。支持复数张量。 参数 tensor (Tensor) – 集合的输入和输出。该函数就地操作。 op (可选) – torch.distributed.ReduceOp 枚举中的值之一。指定用于逐元素归约的操作。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不是组的一部分,则返回 None。 示例 >>> # 以下所有张量均为 torch.int64 类型。 >>> # 我们有 2 个进程组,2 个 rank。 >>> device = torch.device(f"cuda:{rank}") >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # 以下所有张量均为 torch.cfloat 类型。 >>> # 我们有 2 个进程组,2 个 rank。 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1 torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source]# 跨所有机器归约张量数据。只有 rank dst 的进程将接收最终结果。 参数 tensor (Tensor) – 集合的输入和输出。该函数就地操作。 dst (int) – 全局进程组上的目标 rank(无论 group 参数如何) op (可选) – torch.distributed.ReduceOp 枚举中的值之一。指定用于逐元素归约的操作。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 group_dst (int) – 组上的目标 rank。必须指定 group_dst 和 dst 之一,但不能同时指定。 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不是组的一部分,则返回 None。 torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]# 从整个组收集张量到列表中。支持复数和大小不等的张量。 参数 tensor_list (list[Tensor]) – 输出列表。它应包含正确大小的张量,用于集合的输出。支持大小不等的张量。 tensor (Tensor) – 要从当前进程广播的张量。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不是组的一部分,则返回 None。 示例 >>> # 以下所有张量均为 torch.int64 类型。 >>> # 我们有 2 个进程组,2 个 rank。 >>> device = torch.device(f"cuda:{rank}") >>> tensor_list = [ ... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1 >>> # 以下所有张量均为 torch.cfloat 类型。 >>> # 我们有 2 个进程组,2 个 rank。 >>> tensor_list = [ ... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1 torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]# 从所有 rank 收集张量并将它们放入单个输出张量中。此函数要求每个进程上的所有张量大小相同。 参数 output_tensor (Tensor) – 用于容纳来自所有 rank 的张量元素的输出张量。它必须正确大小,具有以下形式之一:(i) 沿主维度连接所有输入张量;有关“连接”的定义,请参见 torch.cat();(ii) 沿主维度堆叠所有输入张量;有关“堆叠”的定义,请参见 torch.stack()。下面的示例可以更好地解释支持的输出形式。 input_tensor (Tensor) – 要从当前 rank 收集的张量。与 all_gather API 不同,此 API 中的输入张量在所有 rank 上必须具有相同的大小。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不是组的一部分,则返回 None。 示例 >>> # 以下所有张量均为 torch.int64 类型,位于 CUDA 设备上。 >>> # 我们有两个 rank。 >>> device = torch.device(f"cuda:{rank}") >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # 连接形式的输出 >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # 堆叠形式的输出 >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1 torch.distributed.all_gather_object(object_list, obj, group=None)[source]# 从整个组收集可 pickle 对象到列表中。类似于 all_gather(),但可以传入 Python 对象。请注意,对象必须可 pickle 才能收集。 参数 object_list (list[Any]) – 输出列表。它的大小应正确,等于此集合的组大小,并将包含输出。 obj (Any) – 要从当前进程广播的可 pickle Python 对象。 group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。默认值为 None。 返回 None。如果调用 rank 是此组的一部分,则集合的输出将填充到输入的 object_list 中。如果调用 rank 不是组的一部分,则传入的 object_list 将保持不变。 注意 请注意,此 API 与 all_gather() 集合略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。 注意 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任通过 torch.cuda.set_device() 确保设置此设备,以便每个 rank 都有一个单独的 GPU。 警告 对象集合操作有许多严重的性能和可扩展性限制。有关详细信息,请参见对象集合操作。 警告 all_gather_object() 隐式使用 pickle 模块,该模块已知不安全。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。仅对您信任的数据调用此函数。 警告 使用 GPU 张量调用 all_gather_object() 不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被 pickle。请考虑改用 all_gather()。 示例::>>> # 注意:每个 rank 上省略了进程组初始化。 >>> import torch.distributed as dist >>> # 假设 world_size 为 3。 >>> gather_objects = ["foo", 12, {1: 2}] # 任何可 pickle 的对象 >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}] torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source]# 在单个进程中收集张量列表。此函数要求每个进程上的所有张量大小相同。 参数 tensor (Tensor) – 输入张量。 gather_list (list[Tensor], 可选) – 用于收集数据的适当大小且大小相同的张量列表(默认值为 None,必须在目标 rank 上指定) dst (int, 可选) – 全局进程组上的目标 rank(无论 group 参数如何)。(如果 dst 和 group_dst 都为 None,则默认为全局 rank 0) group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 group_dst (int, 可选) – 组上的目标 rank。不允许同时指定 dst 和 group_dst 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不是组的一部分,则返回 None。 注意 请注意,gather_list 中的所有张量必须具有相同的大小。 示例::>>> # 我们有 2 个进程组,2 个 rank。 >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.ones(tensor_size, device=device) + rank >>> if dist.get_rank() == 0: >>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)] >>> else: >>> gather_list = None >>> dist.gather(tensor, gather_list, dst=0) >>> # Rank 0 获取收集的数据。 >>> gather_list [tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0 None # Rank 1 torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source]# 在单个进程中从整个组收集可 pickle 对象。类似于 gather(),但可以传入 Python 对象。请注意,对象必须可 pickle 才能收集。 参数 obj (Any) – 输入对象。必须可 pickle。 object_gather_list (list[Any]) – 输出列表。在 dst rank 上,它的大小应正确,等于此集合的组大小,并将包含输出。在非 dst rank 上必须为 None。(默认值为 None) dst (int, 可选) – 全局进程组上的目标 rank(无论 group 参数如何)。(如果 dst 和 group_dst 都为 None,则默认为全局 rank 0) group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要操作的进程组。如果为 None,将使用默认进程组。默认值为 None。 group_dst (int, 可选) – 组上的目标 rank。不允许同时指定 dst 和 group_dst 返回 None。在 dst rank 上,object_gather_list 将包含集合的输出。 注意 请注意,此 API 与 gather 集合略有不同,因为它不提供 async_op 句柄,因此将是阻塞调用。 注意 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任通过 torch.cuda.set_device() 确保设置此设备,以便每个 rank 都有一个单独的 GPU。 警告 对象集合操作有许多严重的性能和可扩展性限制。有关详细信息,请参见对象集合操作。 警告 gather_object() 隐式使用 pickle 模块,该模块已知不安全。可以构造恶意的 pickle 数据,在反序列化时执行任意代码。仅对您信任的数据调用此函数。 警告 使用 GPU 张量调用 gather_object() 不受良好支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被 pickle。请考虑改用 gather()。 示例::>>> # 注意:每个 rank 上省略了进程组初始化。 >>> import torch.distributed as dist >>> # 假设 world_size 为 3。 >>> gather_objects = ["foo", 12, {1: 2}] # 任何可 pickle 的对象 >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # 在 rank 0 上 >>> output ['foo', 12, {1: 2}] torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source]# 将张量列表分散到组中的所有进程。每个进程将恰好接收一个张量,并将其数据存储在 tensor 参数中。支持复数张量。 参数 tensor (Tensor) – 输出张量。 scatter_list (list[Tensor]) – 要分散的张量列表(默认值为 None,必须在源 rank 上指定) src (int) – 全局进程组上的源 rank(无论 group 参数如何)。(如果 src 和 group_src 都为 None,则默认为全局 rank 0) group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,将使用默认进程组。 async_op (bool, 可选) – 此操作是否应为异步操作 group_src (int, 可选) – 组上的源 rank。不允许同时指定 src 和 group_src 返回 如果 async_op 设置为 True,则返回异步工作句柄。如果不是 async_op 或不是组的一部分,则返回 None。 注意 请注意,scatter_list 中的所有张量必须具有相同的大小。 示例::>>> # 注意:每个 rank 上省略了进程组初始化。 >>> import torch.distributed as dist >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> output_tensor = torch.zeros(tensor

torch.distributed

模式 3:初始化# 在调用任何其他方法之前,需要使用 torch.distributed.init_process_group()torch.distributed.device_mesh.init_device_mesh() 函数来初始化该包。这两个函数都会阻塞,直到所有进程都加入。警告 初始化不是线程安全的。进程组的创建应该从单个线程执行,以防止跨 rank 的 'UUID' 分配不一致,并防止初始化期间可能导致挂起的竞态条件。torch.distributed.is_available()[source]# 如果分布式包可用,则返回 True。否则,torch.distributed 不会公开任何其他 API。目前,torch.distributed 在 Linux、MacOS 和 Windows 上可用。从源码构建 PyTorch 时,设置 USE_DISTRIBUTED=1 以启用它。目前,Linux 和 Windows 的默认值是 USE_DISTRIBUTED=1,MacOS 是 USE_DISTRIBUTED=0。返回类型 bool torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]# 初始化默认的分布式进程组。这也会初始化分布式包。初始化进程组主要有两种方式:显式指定 storerankworld_size。指定 init_method(一个 URL 字符串),它指示在哪里/如何发现对等节点。可以选择指定 rankworld_size,或者在 URL 中编码所有必需参数并省略它们。如果两者都未指定,则 init_method 默认为 "env://"。参数 backend (str 或 Backend, 可选) – 要使用的后端。取决于构建时的配置,有效值包括 mpigloonccluccxccl 或由第三方插件注册的后端。从 2.6 版本开始,如果未提供 backend,c10d 将使用为 device_id 参数(如果提供)指示的设备类型注册的后端。目前已知的默认注册是:cuda 使用 ncclcpu 使用 glooxpu 使用 xccl。如果既未提供 backend 也未提供 device_id,c10d 将检测运行时机器上的加速器,并使用为该检测到的加速器(或 cpu)注册的后端。此字段可以以小写字符串形式给出(例如 "gloo"),也可以通过 Backend 属性访问(例如 Backend.GLOO)。如果每台机器上使用多个进程且使用 nccl 后端,则每个进程必须独占访问其使用的每个 GPU,因为在进程之间共享 GPU 可能导致死锁或 NCCL 无效使用。ucc 后端是实验性的。可以使用 get_default_backend_for_device() 查询设备的默认后端。init_method (str, 可选) – 指定如何初始化进程组的 URL。如果未指定 init_methodstore,则默认为 "env://"。与 store 互斥。world_size (int, 可选) – 参与作业的进程数。如果指定了 store,则为必需。rank (int, 可选) – 当前进程的 rank(应为 0 到 world_size-1 之间的数字)。如果指定了 store,则为必需。store (Store, 可选) – 所有工作进程可访问的键/值存储,用于交换连接/地址信息。与 init_method 互斥。timeout (timedelta, 可选) – 针对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端为 30 分钟。这是集合操作将被异步中止并且进程将崩溃的持续时间。这样做是因为 CUDA 执行是异步的,并且继续执行用户代码不再安全,因为失败的异步 NCCL 操作可能导致后续 CUDA 操作在损坏的数据上运行。当设置了 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。group_name (str, 可选, 已弃用) – 组名。此参数被忽略。pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组期间需要传入哪些额外选项。目前,我们唯一支持的选项是用于 nccl 后端的 ProcessGroupNCCL.Options,可以指定 is_high_priority_stream,以便在有计算内核等待时,nccl 后端可以拾取高优先级的 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t device_id (torch.device | int, 可选) – 此进程将使用的单个特定设备,允许进行后端特定的优化。目前,这仅在 NCCL 下有两个效果:通信器立即形成(立即调用 ncclCommInit* 而不是通常的惰性调用),并且子组将在可能的情况下使用 ncclCommSplit 以避免不必要的组创建开销。如果您想尽早了解 NCCL 初始化错误,也可以使用此字段。如果提供了 int,API 假定将使用编译时的加速器类型。注意 要启用 backend == Backend.MPI,PyTorch 需要在支持 MPI 的系统上从源码构建。注意 对多个后端的支持是实验性的。目前,当未指定后端时,将同时创建 gloonccl 后端。gloo 后端将用于 CPU 张量的集合操作,nccl 后端将用于 CUDA 张量的集合操作。可以通过传入格式为 "&lt;device_type&gt;:&lt;backend_name&gt;,&lt;device_type&gt;:&lt;backend_name&gt;" 的字符串来指定自定义后端,例如 "cpu:gloo,cuda:custom_backend"torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None, backend_override=None)[source]# 基于 device_typemesh_shapemesh_dim_names 参数初始化一个 DeviceMesh。这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度都标记为 mesh_dim_names[i]。注意 init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有 rank 上相同。不一致的 mesh_shape 可能导致挂起。注意 如果未找到进程组,init_device_mesh 将在后台初始化分布式通信所需的分布式进程组。参数 device_type (str) – 网格的设备类型。目前支持:"cpu""cuda/cuda-like""xpu"。不允许传入带有 GPU 索引的设备类型,例如 "cuda:0"mesh_shape (Tuple[int]) – 一个元组,定义描述设备布局的多维数组的维度。mesh_dim_names (Tuple[str], 可选) – 一个网格维度名称元组,用于分配给描述设备布局的多维数组的每个维度。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须是唯一的。backend_override (Dict[int | str, tuple[str, Options] | str | Options], 可选) – 为将为每个网格维度创建的部分或全部 ProcessGroup 提供的覆盖。每个键可以是维度的索引或其名称(如果提供了 mesh_dim_names)。每个值可以是一个包含后端名称及其选项的元组,或者只是这两个组件之一(在这种情况下,另一个将设置为其默认值)。返回 表示设备布局的 DeviceMesh 对象。返回类型 DeviceMesh 示例:>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp")) torch.distributed.is_initialized()[source]# 检查默认进程组是否已初始化。返回类型 bool torch.distributed.is_mpi_available()[source]# 检查 MPI 后端是否可用。返回类型 bool torch.distributed.is_nccl_available()[source]# 检查 NCCL 后端是否可用。返回类型 bool torch.distributed.is_gloo_available()[source]# 检查 Gloo 后端是否可用。返回类型 bool torch.distributed.distributed_c10d.is_xccl_available()[source]# 检查 XCCL 后端是否可用。返回类型 bool torch.distributed.is_torchelastic_launched()[source]# 检查此进程是否由 torch.distributed.elastic(又名 torchelastic)启动。使用 TORCHELASTIC_RUN_ID 环境变量的存在作为代理,来确定当前进程是否由 torchelastic 启动。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,该 id 始终是一个非空值,表示用于对等节点发现的作业 id。返回类型 bool torch.distributed.get_default_backend_for_device(device)[source]# 返回给定设备的默认后端。参数 device (Union[str, torch.device]) – 要获取默认后端的设备。返回 给定设备的默认后端,以小写字符串形式返回。返回类型 str 目前支持三种初始化方法:TCP 初始化# 有两种使用 TCP 初始化的方法,都需要一个所有进程可达的网络地址和一个期望的 world_size。第一种方法需要指定一个属于 rank 0 进程的地址。此初始化方法要求所有进程都手动指定了 rank。注意,最新的分布式包中不再支持多播地址。group_name 也已弃用。import torch.distributed as dist # 使用其中一台机器的地址 dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4) 共享文件系统初始化# 另一种初始化方法利用一个在组内所有机器上共享且可见的文件系统,以及一个期望的 world_size。URL 应以 file:// 开头,并包含共享文件系统上(现有目录中)一个不存在的文件的路径。如果文件不存在,文件系统初始化将自动创建该文件,但不会删除该文件。因此,您有责任确保在对同一文件路径/名称进行下一次 init_process_group() 调用之前清理该文件。注意,最新的分布式包中不再支持自动 rank 分配,并且 group_name 也已弃用。警告 此方法假设文件系统支持使用 fcntl 进行锁定——大多数本地系统和 NFS 都支持。警告 此方法将始终创建该文件,并尽力在程序结束时清理和删除该文件。换句话说,每次使用文件 init 方法进行初始化都需要一个全新的空文件才能成功初始化。如果再次使用前一次初始化(恰好未被清理)使用的同一个文件,这是意外行为,并且通常会导致死锁和失败。因此,即使此方法会尽力清理文件,如果自动删除不成功,您有责任确保在训练结束时删除该文件,以防止下次再次重用同一个文件。如果您计划对同一文件名多次调用 init_process_group(),这一点尤其重要。换句话说,如果文件未被删除/清理,并且您再次对该文件调用 init_process_group(),则预期会失败。这里的经验法则是,确保每次调用 init_process_group() 时该文件都不存在或为空。import torch.distributed as dist # 应始终指定 rank dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile', world_size=4, rank=args.rank) 环境变量初始化# 此方法将从环境变量读取配置,允许完全自定义信息的获取方式。需要设置的变量是:MASTER_PORT - 必需;必须是 rank 0 机器上的空闲端口 MASTER_ADDR - 必需(rank 0 除外);rank 0 节点的地址 WORLD_SIZE - 必需;可以在此处设置,也可以在调用 init 函数时设置 RANK - 必需;可以在此处设置,也可以在调用 init 函数时设置 rank 0 的机器将用于建立所有连接。这是默认方法,意味着不必指定 init_method(或者可以指定为 env://)。改善初始化时间# TORCH_GLOO_LAZY_INIT - 按需建立连接,而不是使用全网格,这可以显著改善非 all2all 操作的初始化时间。

torch.distributed.init_process_group()

模式 4: 示例:

>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))

模式 5: 进程组

默认情况下,集合通信操作在默认组(也称为 world)上执行,并且要求所有进程都进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这时就需要用到分布式组。new_group() 函数可用于创建新组,这些组可以包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group 参数传递给所有集合通信操作(集合通信操作是用于在特定编程模式中交换信息的分布式函数)。

torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source]

创建一个新的分布式组。此函数要求主组(即所有参与分布式任务的进程)中的所有进程都调用此函数,即使它们不打算成为该组的成员。此外,在所有进程中,组应该以相同的顺序创建。

警告

安全并发使用:当使用 NCCL 后端与多个进程组时,用户必须确保跨所有 rank 的集合通信操作具有全局一致的执行顺序。如果进程内的多个线程发出集合通信操作,则需要显式同步以确保一致的顺序。当使用 torch.distributed 通信 API 的异步变体时,会返回一个 work 对象,并且通信内核会在一个单独的 CUDA 流上排队,从而实现通信和计算的重叠。一旦在一个进程组上发起了一个或多个异步操作,在切换到另一个进程组之前,必须通过调用 work.wait() 与其它 CUDA 流进行同步。更多详情,请参见 同时使用多个 NCCL 通信器

参数

  • ranks (list[int]) – 组成员的 rank 列表。如果为 None,则设置为所有 rank。默认为 None
  • timeout (timedelta, 可选) – 有关详细信息和默认值,请参见 init_process_group
  • backend (str 或 Backend, 可选) – 要使用的后端。取决于构建时的配置,有效值为 gloonccl。默认使用与全局组相同的后端。此字段应作为小写字符串提供(例如 "gloo"),也可以通过 Backend 属性访问(例如 Backend.GLOO)。如果传入 None,将使用默认进程组对应的后端。默认为 None
  • pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组时需要传入哪些额外选项。例如,对于 nccl 后端,可以指定 is_high_priority_stream,以便进程组可以选择高优先级的 CUDA 流。有关配置 nccl 的其他可用选项,请参见 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t
  • use_local_synchronization (bool, 可选) – 在进程组创建结束时执行一个组内屏障。不同之处在于,非成员 rank 不需要调用此 API,也不会加入屏障。
  • group_desc (str, 可选) – 描述进程组的字符串。
  • device_id (torch.device, 可选) – 一个单一的、特定的设备,用于将此进程“绑定”到该设备。如果提供了此字段,new_group 调用将尝试立即为该设备初始化通信后端。

返回

一个分布式组的句柄,可以传递给集合通信调用;如果该 rank 不是 ranks 的一部分,则返回 GroupMember.NON_GROUP_MEMBER

注意

  • use_local_synchronization 不适用于 MPI。
  • 虽然 use_local_synchronization=True 在集群较大且进程组较小时可以显著提高速度,但必须小心使用,因为它会改变集群行为,因为非成员 rank 不会加入组的 barrier()
  • use_local_synchronization=True 可能导致死锁,当每个 rank 创建多个重叠的进程组时。为避免这种情况,请确保所有 rank 遵循相同的全局创建顺序。

torch.distributed.get_group_rank(group, global_rank)[source]

将全局 rank 转换为组内 rank。global_rank 必须是组的一部分,否则会引发 RuntimeError

参数

  • group (ProcessGroup) – 要查找相对 rank 的 ProcessGroup。
  • global_rank (int) – 要查询的全局 rank。

返回

global_rank 相对于 group 的组内 rank。

返回类型

int

注意

在默认进程组上调用此函数将返回其自身。

torch.distributed.get_global_rank(group, group_rank)[source]

将组内 rank 转换为全局 rank。group_rank 必须是组的一部分,否则会引发 RuntimeError

参数

  • group (ProcessGroup) – 要从中查找全局 rank 的 ProcessGroup。
  • group_rank (int) – 要查询的组内 rank。

返回

group_rank 相对于 group 的全局 rank。

返回类型

int

注意

在默认进程组上调用此函数将返回其自身。

torch.distributed.get_process_group_ranks(group)[source]

获取与组关联的所有 rank。

参数

  • group (Optional[ProcessGroup]) – 要从中获取所有 rank 的 ProcessGroup。如果为 None,将使用默认进程组。

返回

按组内 rank 排序的全局 rank 列表。

返回类型

list[int]

new_group()

模式 6: 警告 安全并发使用:当使用 NCCL 后端并涉及多个进程组时,用户必须确保各 rank 之间集合通信的全局执行顺序一致。如果进程内的多个线程发起集合通信,则需要显式同步以保证顺序一致。使用 torch.distributed 通信 API 的异步变体时,会返回一个 work 对象,通信内核会在独立的 CUDA 流上排队,从而实现通信与计算的重叠。一旦在一个进程组上发起一个或多个异步操作,在切换到另一个进程组之前,必须通过调用 work.wait() 将这些操作与其他 CUDA 流同步。更多详情请参见 同时使用多个 NCCL 通信器 <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html#using-multiple-nccl-communicators-concurrently>。

NCCL

模式 7: 注意 如果你将 DistributedDataParallel 与 Distributed RPC 框架结合使用,应始终使用 torch.distributed.autograd.backward() 计算梯度,并使用 torch.distributed.optim.DistributedOptimizer 优化参数。示例:>>> import torch.distributed.autograd as dist_autograd >>> from torch.nn.parallel import DistributedDataParallel as DDP >>> import torch >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> rref = rpc.remote("worker1", torch.add, args=(t1, t2)) >>> ddp_model = DDP(my_model) >>> >>> # 设置优化器 >>> optimizer_params = [rref] >>> for param in ddp_model.parameters(): >>> optimizer_params.append(RRef(param)) >>> >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> optimizer_params, >>> lr=0.05, >>> ) >>> >>> with dist_autograd.context() as context_id: >>> pred = ddp_model(rref.to_here()) >>> loss = loss_func(pred, target) >>> dist_autograd.backward(context_id, [loss]) >>> dist_optim.step(context_id)

torch.distributed.autograd.backward()

模式 8: static_graph(布尔值)– 当设置为 True 时,DDP 知道训练图是静态的。静态图意味着:1)在整个训练循环中,使用和未使用的参数集合不会改变;在这种情况下,用户是否设置 find_unused_parameters = True 无关紧要。2)在整个训练循环中,图的训练方式不会改变(即没有依赖于迭代的控制流)。当 static_graph 设置为 True 时,DDP 将支持过去无法支持的情况:1)可重入反向传播。2)多次激活检查点。3)模型存在未使用参数时的激活检查点。4)存在不在前向函数中的模型参数。5)当存在未使用参数时,可能提升性能,因为当 static_graph 设置为 True 时,DDP 不会在每次迭代中搜索图来检测未使用参数。要检查是否可以设置 static_graph = True,一种方法是在之前模型训练结束时检查 ddp 日志数据,如果 ddp_logging_data.get("can_set_static_graph") == True,则通常可以设置 static_graph = True。示例:>>> model_DDP = torch.nn.parallel.DistributedDataParallel(model) >>> # 训练循环 >>> ... >>> ddp_logging_data = model_DDP._get_ddp_logging_data() >>> static_graph = ddp_logging_data.get("can_set_static_graph")

True

参考文件

本技能包含 references/ 中的完整文档:

  • other.md - 其他文档

当需要详细信息时,使用 view 读取特定的参考文件。

使用此技能

初学者

getting_startedtutorials 参考文件开始,了解基础概念。

特定功能

使用相应的分类参考文件(如 apiguides 等)获取详细信息。

代码示例

上面的快速参考部分包含了从官方文档中提取的常见模式。

资源

references/

从官方来源整理的组织化文档。这些文件包含:

  • 详细说明
  • 带有语言标注的代码示例
  • 指向原始文档的链接
  • 便于快速导航的目录

scripts/

在此添加用于常见自动化任务的辅助脚本。

assets/

在此添加模板、样板代码或示例项目。

备注

  • 此技能是从官方文档自动生成的
  • 参考文件保留了源文档的结构和示例
  • 代码示例包含语言检测,以实现更好的语法高亮
  • 快速参考模式是从文档中的常见用法示例中提取的

更新

要使用更新的文档刷新此技能:

  1. 使用相同的配置重新运行爬取工具
  2. 技能将使用最新信息重新构建