摘要:指标收集是每个机器学习项目不可或缺的组成部分,它使我们能够跟踪模型性能并监控训练进度。理想情况下,我们希望在不给训练过程带来额外开销的前提下收集和计算指标。与训练循环的其他部分一样,低效的指标计算可能会引入不必要的开销,延长训练步骤的耗时,并增加训练成本。
指标收集是每个机器学习项目不可或缺的组成部分,它使我们能够跟踪模型性能并监控训练进度。理想情况下,我们希望在不给训练过程带来额外开销的前提下收集和计算指标。与训练循环的其他部分一样,低效的指标计算可能会引入不必要的开销,延长训练步骤的耗时,并增加训练成本。
本文是将聚焦于指标收集,演示指标收集的一种简单实现如何对运行时性能产生负面影响,并探讨用于分析和优化它的工具与技术。
为了实现指标收集,我们将使用 TorchMetrics,这是一个标准化 PyTorch 中指标计算的常用库。我们的目标是:
为了便于讨论,我们将定义一个简单的 PyTorch 模型,并评估指标收集对运行时性能的影响。我们将在 NVIDIA A40 GPU 上运行实验,使用的 PyTorch 版本为 2.5.1(通过 Docker 镜像),TorchMetrics 版本为 1.6.1。
需要注意的是:指标收集的行为可能因硬件、运行时环境和模型架构而异。本文中提供的代码片段仅用于演示目的。
在下面的代码块中,我们定义了一个简单的图像分类模型,它使用 ResNet-18 作为主干网络。
import timeimport torchimport torchvisiondevice = "cuda"model = torchvision.models.resnet18.to(device)criterion = torch.nn.CrossEntropyLossoptimizer = torch.optim.SGD(model.parameters)定义一个合成数据集,用于训练这个模型。
from torch.utils.data import Dataset, DataLoader# A dataset with random images and labels# 具有随机图像和标签的数据集class FakeDataset(Dataset):def __len__(self):return 100000000def __getitem__(self, index):rand_image = torch.randn([3, 224, 224], dtype=torch.float32)label = torch.tensor(data=index % 1000, dtype=torch.int64)return rand_image, labeltrain_set = FakeDatasetbatch_size = 128num_workers = 12train_loader = DataLoader(dataset=train_set,batch_size=batch_size,num_workers=num_workers,pin_memory=True)定义了一组来自 TorchMetrics 的标准指标,以及一个控制标志,用于启用或禁用指标计算。
from torchmetrics import (MeanMetric,Accuracy,Precision,Recall,F1Score,)# toggle to enable/disable metric collection# 切换以启用/禁用指标收集capture_metrics = Falseif capture_metrics:metrics = {"avg_loss": MeanMetric,"accuracy": Accuracy(task="multiclass", num_classes=1000),"precision": Precision(task="multiclass", num_classes=1000),"recall": Recall(task="multiclass", num_classes=1000),"f1_score": F1Score(task="multiclass", num_classes=1000),}# Move all metrics to the device# 将所有指标移动到设备metrics = {name: metric.to(device) for name, metric in metrics.items}接下来,定义一个 PyTorch Profiler 实例,以及一个控制标志,用于启用或禁用性能分析。
from torch import profiler# toggle to enable/disable profiling# 切换以启用/禁用分析enable_profiler = Trueif enable_profiler:prof = profiler.profile(schedule=profiler.schedule(wait=10, warmup=2, active=3, repeat=1),on_trace_ready=profiler.tensorboard_trace_handler("./logs/"),profile_memory=True,with_stack=True)prof.start最后定义一个标准的训练步骤:
model.traint0 = time.perf_countertotal_time = 0count = 0for idx, (data, target) in enumerate(train_loader):data = data.to(device, non_blocking=True)target = target.to(device, non_blocking=True)optimizer.zero_gradoutput = model(data)loss = criterion(output, target)loss.backwardoptimizer.stepif capture_metrics:# update metrics# 更新指标metrics["avg_loss"].update(loss)for name, metric in metrics.items:if name != "avg_loss":metric.update(output, target)if (idx + 1) % 100 == 0:# compute metrics# 计算指标metric_results = {name: metric.compute.itemfor name, metric in metrics.items}# print metrics# 打印指标print(f"Step {idx + 1}: {metric_results}")# reset metrics# 重置指标for metric in metrics.values:metric.resetelif (idx + 1) % 100 == 0:# print last loss value# 打印上次损失值print(f"Step {idx + 1}: Loss = {loss.item:.4f}")batch_time = time.perf_counter - t0t0 = time.perf_counterif idx > 10: # skip first stepstotal_time += batch_timecount += 1if enable_profiler:prof.stepif idx > 200:breakif enable_profiler:prof.stopavg_time = total_time/countprint(f'Average step time: {avg_time}')print(f'Throughput: {batch_size/avg_time:.2f} images/sec')为了衡量指标收集对训练步骤耗时的影响,我们分别在启用和禁用指标计算的情况下运行了训练脚本。结果如下表所示。
简单的指标收集实现导致运行时性能下降了近 10%!虽然指标收集对于机器学习开发至关重要,但它通常只涉及相对简单的数学运算,不应造成如此显著的开销。问题出在哪里?
为了更好地理解性能下降的根源,我们重新运行了训练脚本,并启用了 PyTorch Profiler。结果轨迹如下图所示:
该轨迹揭示了重复出现的“cudaStreamSynchronize”操作,这些操作与 GPU 利用率的显著下降相吻合。在一个典型的训练步骤中,CPU 和 GPU 并行工作:CPU 负责诸如将数据传输到 GPU 和加载 CUDA Kernel 等任务,而 GPU 则在输入数据上执行模型计算并更新权重。理想情况下应该尽量减少 CPU 和 GPU 之间的同步点,以最大限度地提高性能。但是从上图可以看出,指标收集通过执行 CPU 到 GPU 的数据复制来触发同步事件。这需要 CPU 暂停其处理,直到 GPU 完成数据复制,进而导致 GPU 等待 CPU 恢复加载后续的 Kernel 操作。这些同步点导致 CPU 和 GPU 的利用率降低。上面的指标收集代码实现为每个训练步骤增加了八个这样的同步事件。
仔细检查轨迹表明,同步事件源自 MeanMetric TorchMetric 的 update 调用。为了演示我们将更进一步,使用 torch.profiler.record_function 工具来识别确切的违规代码行。
为了查明同步事件的确切来源,我们扩展了 MeanMetric 类,并使用 record_function 上下文块覆盖了 update 方法。这种方法允许我们分析方法中的各个操作,并识别性能瓶颈。
class ProfileMeanMetric(MeanMetric):def update(self, value, weight = 1.0):# broadcast weight to value shape# 将权重广播到值形状with profiler.record_function("process value"):if not isinstance(value, torch.Tensor):value = torch.as_tensor(value, dtype=self.dtype,device=self.device)with profiler.record_function("process weight"):if weight is not None and not isinstance(weight, torch.Tensor):weight = torch.as_tensor(weight, dtype=self.dtype,device=self.device)with profiler.record_function("broadcast weight"):weight = torch.broadcast_to(weight, value.shape)with profiler.record_function("cast_and_nan_check"):value, weight = self._cast_and_nan_check_input(value, weight)if value.numel == 0:returnwith profiler.record_function("update value"):self.mean_value += (value * weight).sumwith profiler.record_function("update weight"):self.weight += weight.sum然后更新 avg_loss 指标,使其使用新创建的 ProfileMeanMetric 类,并重新运行训练脚本。
更新后的轨迹显示,同步事件源自以下代码行:
weight = torch.as_tensor(weight, dtype=self.dtype, device=self.device)
此操作将默认的标量值 weight=1.0 转换为 PyTorch 张量,并将其放置在 GPU 上。同步事件的发生原因是,此操作会触发 CPU 到 GPU 的数据复制,这需要 CPU 等待 GPU 处理复制的值。
既然我们已经找到了问题的根源,就可以通过在 update 调用中显式指定 weight 值来轻松解决它。这将避免在运行时将默认标量 weight=1.0 转换为 GPU 上的张量,从而避免同步事件:
# update metrics# 更新指标if capture_metric:metrics["avg_loss"].update(loss, weight=torch.ones_like(loss))应用此更改后重新运行脚本表明,我们已成功消除了初始同步事件,但又发现了一个新的同步事件,这次来自 _cast_and_NaN_check_input 函数:
为了探究这个新的同步事件,我们使用额外的分析探针扩展了自定义指标,并重新运行了脚本。
class ProfileMeanMetric(MeanMetric):def update(self, value, weight = 1.0):# broadcast weight to value shape# 将权重广播到值形状with profiler.record_function("process value"):if not isinstance(value, torch.Tensor):value = torch.as_tensor(value, dtype=self.dtype,device=self.device)with profiler.record_function("process weight"):if weight is not None and not isinstance(weight, torch.Tensor):weight = torch.as_tensor(weight, dtype=self.dtype,device=self.device)with profiler.record_function("broadcast weight"):weight = torch.broadcast_to(weight, value.shape)with profiler.record_function("cast_and_nan_check"):value, weight = self._cast_and_nan_check_input(value, weight)if value.numel == 0:returnwith profiler.record_function("update value"):self.mean_value += (value * weight).sumwith profiler.record_function("update weight"):self.weight += weight.sumdef _cast_and_nan_check_input(self, x, weight = None):"""Convert input ``x`` to a tensor and check for Nans."""# 将输入“x”转换为张量并检查 Nans。with profiler.record_function("process x"):if not isinstance(x, torch.Tensor):x = torch.as_tensor(x, dtype=self.dtype,device=self.device)with profiler.record_function("process weight"):if weight is not None and not isinstance(weight, torch.Tensor):weight = torch.as_tensor(weight, dtype=self.dtype,device=self.device)nans = torch.isnan(x)if weight is not None:nans_weight = torch.isnan(weight)else:nans_weight = torch.zeros_like(nans).boolweight = torch.ones_like(x)with profiler.record_function("any nans"):anynans = nans.any or nans_weight.anywith profiler.record_function("process nans"):if anynans:if self.nan_strategy == "error":raise RuntimeError("Encountered `nan` values in tensor")if self.nan_strategy in ("ignore", "warn"):if self.nan_strategy == "warn":print("Encountered `nan` values in tensor."" Will be removed.")x = x[~(nans | nans_weight)]weight = weight[~(nans | nans_weight)]else:if not isinstance(self.nan_strategy, float):raise ValueError(f"`nan_strategy` shall be float"f" but you pass {self.nan_strategy}")x[nans | nans_weight] = self.nan_strategyweight[nans | nans_weight] = self.nan_strategywith profiler.record_function("return value"):retval = x.to(self.dtype), weight.to(self.dtype)return retval结果轨迹如下图所示:
使用 record_function 进行指标收集的轨迹 — 第 2 部分
该轨迹直接指向以下违规代码行:
anynans = nans.any or nans_weight.any此操作检查输入张量中是否存在 NaN 值,但它引入了代价高昂的 CPU-GPU 同步事件,因为该操作涉及将数据从 GPU 复制到 CPU。
在仔细检查 TorchMetric BaseAggregator 类后,我们发现有几个用于处理 NAN 值更新的选项,但所有这些选项都会执行上述违规代码行。对于我们的用例(计算平均损失指标),此检查是不必要的,并且由此造成的运行时性能损失是不合理的。
为了消除开销,我们建议通过覆盖 _cast_and_nan_check_input 函数来禁用 NaN 值检查。我们没有采用静态覆盖,而是实现了一种动态解决方案,它可以灵活地应用于 BaseAggregator 类的任何子类。
from torchmetrics.aggregation import BaseAggregatordef suppress_nan_check(MetricClass):assert issubclass(MetricClass, BaseAggregator), MetricClassclass DisableNanCheck(MetricClass):def _cast_and_nan_check_input(self, x, weight=None):if not isinstance(x, torch.Tensor):x = torch.as_tensor(x, dtype=self.dtype,device=self.device)if weight is not None and not isinstance(weight, torch.Tensor):weight = torch.as_tensor(weight, dtype=self.dtype,device=self.device)if weight is None:weight = torch.ones_like(x)return x.to(self.dtype), weight.to(self.dtype)return DisableNanCheckNoNanMeanMetric = suppress_nan_check(MeanMetric)metrics["avg_loss"] = NoNanMeanMetric.to(device)在实施了两个优化(指定权重值和禁用 NaN 检查)之后,我们发现训练步骤的耗时和 GPU 利用率与基线实验的结果相匹配。此外,PyTorch Profiler 的结果轨迹表明,与指标收集相关的所有新增“cudaStreamSynchronize”事件都已消除。通过一些小的更改,我们在不改变指标收集行为的前提下,将训练成本降低了约 10%。
在上一节中,指标值存储在 GPU 上,因此在 GPU 上存储和计算指标是合理的。但是在需要聚合的值存储在 CPU 上的情况下,最好将指标存储在 CPU 上,以避免不必要的数据传输。
在下面的代码块中,我们修改了脚本,以使用 CPU 上的 MeanMetric 计算平均步骤耗时。此更改对训练步骤的运行时性能没有影响:
avg_time = NoNanMeanMetrict0 = time.perf_counterfor idx, (data, target) in enumerate(train_loader):# move data to device# 将数据移动到设备data = data.to(device, non_blocking=True)target = target.to(device, non_blocking=True)optimizer.zero_gradoutput = model(data)loss = criterion(output, target)loss.backwardoptimizer.stepif capture_metrics:metrics["avg_loss"].update(loss)for name, metric in metrics.items:if name != "avg_loss":metric.update(output, target)if (idx + 1) % 100 == 0:# compute metrics# 计算指标metric_results = {name: metric.compute.itemfor name, metric in metrics.items}# print metrics# 打印指标print(f"Step {idx + 1}: {metric_results}")# reset metrics# 重置指标for metric in metrics.values:metric.resetelif (idx + 1) % 100 == 0:# print last loss value# 打印上次损失值print(f"Step {idx + 1}: Loss = {loss.item:.4f}")batch_time = time.perf_counter - t0t0 = time.perf_counterif idx > 10: # skip first stepsavg_time.update(batch_time)if enable_profiler:prof.stepif idx > 200:breakif enable_profiler:prof.stopavg_time = avg_time.compute.itemprint(f'Average step time: {avg_time}')print(f'Throughput: {batch_size/avg_time:.2f} images/sec')当尝试扩展脚本以支持分布式训练时,问题出现了。为了演示这个问题,我们修改了模型定义,以使用 DistributedDataParallel (DDP):
# toggle to enable/disable ddp# 切换以启用/禁用 ddpuse_ddp = Trueif use_ddp:import osimport torch.distributed as distfrom torch.nn.parallel import DistributedDataParallel as DDPos.environ["MASTER_ADDR"] = "127.0.0.1"os.environ["MASTER_PORT"] = "29500"dist.init_process_group("nccl", rank=0, world_size=1)torch.cuda.set_device(0)model = DDP(torchvision.models.resnet18.to(device))else:model = torchvision.models.resnet18.to(device)# insert training loop# 插入训练循环# append to end of the script:# 附加到脚本末尾:if use_ddp:# destroy the process group# 销毁进程组dist.destroy_process_group默认情况下,分布式训练中的指标被设置为在所有使用的设备上同步。但是,DDP 使用的同步后端不支持存储在 CPU 上的指标。
解决此问题的一种方法是禁用跨设备指标同步:
avg_time = NoNanMeanMetric(sync_on_compute=False)由于我们测量的是平均时间,因此这种解决方案是可以接受的。但是在某些情况下,指标同步是必不可少的,我们可能别无选择,只能将指标移动到 GPU 上:
avg_time = NoNanMeanMetric.to(device)这种情况会导致从 update 函数产生一个新的 CPU-GPU 同步事件。
这必然需要内存复制。但是对于标量指标,可以通过简单的优化来完全避免这种数据传输。
解决方案很简单:在调用 update 之前,不要使用浮点值更新指标,而是将其转换为张量。
batch_time = torch.as_tensor(batch_time)avg_time.update(batch_time, torch.ones_like(batch_time))这个小小的更改绕过了有问题的代码行,消除了同步事件,并将步骤耗时恢复到基线性能。
乍一看,这个结果可能令人惊讶:使用 CPU 张量更新 GPU 指标仍然需要内存复制。但是PyTorch 通过使用专用 CUDA Kernel 来优化标量张量的运算,该 Kernel 执行加法运算而无需显式的数据传输。这避免了原本会发生的昂贵同步事件。
在本文中,我们探讨了 TorchMetrics 的简单用法如何引入 CPU-GPU 同步事件,并显著降低 PyTorch 训练性能。通过使用 PyTorch Profiler,我们识别了导致这些同步事件的代码行,并应用了有针对性的优化来消除它们:
在调用 MeanMetric.update 函数时,显式指定权重张量,而不是依赖于默认值。禁用基本 Aggregator 类中的 NaN 检查,或者将其替换为更有效的替代方案。仔细管理每个指标的设备放置,以最大限度地减少不必要的数据传输。在不需要时禁用跨设备指标同步。当指标存储在 GPU 上时,在将浮点标量传递给 update 函数之前,将其转换为张量,以避免隐式同步。来源:deephub