MindSpore训练模型

本文将从如下4个部分进行介绍。

  1. 定义数据集
  2. 定义神经网络
  3. 定义损失函数
  4. 训练

一. 定义数据集

  • mindspore提供了多种使用数据集的方式。

    • 使用已经封装好的现有数据集(以coco数据集为例):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    import mindspore.dataset as ds
    coco_dataset_dir = "/path/to/coco_dataset_directory/images"
    coco_annotation_file = "/path/to/coco_dataset_directory/annotation_file"

    # 1) Read COCO data for Detection task
    dataset = ds.CocoDataset(dataset_dir=coco_dataset_dir,
    annotation_file=coco_annotation_file,
    task='Detection')

    • 自定义数据集(这个应该是最为重要也是我们正常训练时可能最常用的方式)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import mindspore.dataset as ds
    import numpy as np
    # 4) Random accessible dataset as random accessible input.
    class MyDataset:
    def __init__(self, data,label):
    self.x = data
    self.label = label

    def __getitem__(self, index):
    return self.x[index], self.label[index]

    def __len__(self):
    return len(self.x)

    上述代码完成的是自定义数据集的构建,这里使用的是支持随机存取的数据集。

    下面的代码展示了实例化该类的效果。

    1
    2
    3
    4
    5
    x = [1,2,3,4]
    label = [7,10,13,16]
    dataset = MyDataset(x,label)
    print(dataset[2])
    (3, 13)
    • 接下来就是重点也就是如何使用batch方式提取数据。(MindSpore与Pytorch不同没有DataLoader)
    1
    2
    3
    4
    5
    6
    7
    # 先使用自定义数据集生成函数再使用dataset的自带函数即可
    dataset = ds.GeneratorDataset(source=dataset, column_names=["image", "caption"], shuffle=True)
    dataset = dataset.batch(batch_size=3,drop_remainder=True)
    for i in dataset:
    print(i)
    [Tensor(shape=[3], dtype=Int64, value= [1, 4, 3]), Tensor(shape=[3], dtype=Int64, value= [ 7, 16, 13])]
    [Tensor(shape=[1], dtype=Int64, value= [2]), Tensor(shape=[1], dtype=Int64, value= [10])]

    上面的代码可以发现我们将batch_size设置为3时,它就会从全部数据集中随机抽取3条并合在一起,一轮完成一次遍历也就是一个epoch。batch函数可以选择如下参数舍去不足batch的数据:

    drop_remainder (bool, 可选) - 当最后一个批处理数据包含的数据条目小于 batch_size 时,是否将该批处理丢弃,不传递给下一个操作。默认值: False ,不丢弃。

    num_shards (int, 可选) - 指定分布式训练时将数据集进行划分的分片数。默认值: None 。指定此参数后, num_samples 表示每个分片的最大样本数。

    shard_id (int, 可选) - 指定分布式训练时使用的分片ID号。默认值: None 。只有当指定了 num_shards 时才能指定此参数。

    • 在迭代时可以选择使用迭代器进行:
    1
    iterator = dataset.create_tuple_iterator(num_epochs=epochs)# 这个经实验和直接.batch没啥区别,唯一的区别是设置完epochs后循环epochs次之后会报错

    num_epochs参数的默认值为-1即可以无限迭代,但如果出现自定义后,一旦迭代次数超过就会报错,因此强烈建议不要修改因为改了除了给自己加一个坑别的什么用也没有(它跟DataLoader完全不一样,这里设置完epochs不会有任何用)

二. 定义网络

接下来这个网络实现的其实就是一个一次函数 $ y=a*x+b$ ,网络里封装了两个可学习的常数。

1
2
3
4
5
6
7
8
9
10
import mindspore.nn as nn
from mindspore import Parameter, Tensor, ops, nn
class test(nn.Cell):
def __init__(self):
super(test, self).__init__()
self.w = Parameter(Tensor(6).astype(ms.float32))
self.b = Parameter(Tensor(6).astype(ms.float32))
def construct(self, x):
return self.w*x + self.b

这里的construct和Pytorch里的forward一样。

3. 定义损失函数

本节将分为两个部分进行:

  1. 对于正常使用来说最普遍的还是直接调用已经封装好的损失函数。
  2. 如果有特殊需求需要自定义损失函数即可按照如下方式:
1
2
3
4
5
6
7
8
9
# 定义自定义损失函数
class CustomLoss(nn.loss.LossBase):
def __init__(self):
super(CustomLoss, self).__init__()

def construct(self, output, label):
# 自定义损失计算逻辑
loss = ms.ops.square((output-label)) # 自定义损失计算
return loss

4. 训练

  • WithLossCell封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class WithLossCell(Cell):
def __init__(self, backbone, loss_fn):
super(WithLossCell, self).__init__(auto_prefix=False)
self._backbone = backbone
self._loss_fn = loss_fn
if isinstance(backbone, Cell) and backbone.jit_config_dict:
self._jit_config_dict = backbone.jit_config_dict

def construct(self, data, label):
out = self._backbone(data)
return self._loss_fn(out, label)

def backbone_network(self):# 返回backbone网络
return self._backbone

根据源码我们可以发现这个封装其实就是将loss函数和原始的骨干网络结合成一个网络。

并且这个网络的输入只接受两个,一个是传入骨干网络的data,一个是传入loss函数的label。

如果你的网络特殊或者说想要更灵活可以使用如下两个方式进行:

  1. 将多输入或者多输出封装成tuple形式。
  2. 舍弃这一个封装,直接在自己网络里封装上loss函数。
  • TrainOneStepCell封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class TrainOneStepCell(Cell):
def __init__(self, network, optimizer, sens=None, return_grad=False):
super(TrainOneStepCell, self).__init__(auto_prefix=False)
self.network = network
self.network.set_grad()
self.optimizer = optimizer
self.weights = self.optimizer.parameters
self.grad = C.GradOperation(get_by_list=True, sens_param=True)
self.grad_no_sens = C.GradOperation(get_by_list=True)
self.sens = sens
if self.sens == 0:
raise ValueError("The input argument of 'sens' can not be 0.")
self.sense_flag = True
if self.sens is None:
self.sense_flag = False
self.sens = 1.0
self.return_grad = return_grad
if return_grad:
self.weights_name = [i.name for i in self.optimizer.parameters]
self.reducer_flag = False
self.grad_reducer = nn.Identity()
self.parallel_mode = _get_parallel_mode()
self.reducer_flag = self.parallel_mode in (ParallelMode.DATA_PARALLEL, ParallelMode.HYBRID_PARALLEL) or \
_is_pynative_parallel()
if self.reducer_flag:
self.mean = _get_gradients_mean()
self.degree = _get_device_num()
from mindspore.communication.management import GlobalComm
group = GlobalComm.WORLD_COMM_GROUP
if isinstance(self.optimizer, (nn.AdaSumByGradWrapCell, nn.AdaSumByDeltaWeightWrapCell)):
from mindspore.communication.management import get_group_size, create_group, get_rank
group_number = get_group_size() // 8
self.degree = int(self.degree / group_number)
group_list = [list(range(x * self.degree, (x + 1) * self.degree)) for x in range(group_number)]
current_index = get_rank() // 8
server_group_name = "allreduce_" + str(current_index)
create_group(server_group_name, group_list[current_index])
group = server_group_name
self.grad_reducer = DistributedGradReducer(self.weights, self.mean, self.degree, group=group)
if isinstance(network, Cell) and network.jit_config_dict:
self._jit_config_dict = network.jit_config_dict

def construct(self, *inputs):
if not self.sense_flag:
return self._no_sens_impl(*inputs)
loss = self.network(*inputs)
sens = F.fill(loss.dtype, loss.shape, self.sens)
grads = self.grad(self.network, self.weights)(*inputs, sens)
grads = self.grad_reducer(grads)
loss = F.depend(loss, self.optimizer(grads))
if self.return_grad:
grad_with_param_name = {}
for index, value in enumerate(grads):
grad_with_param_name[self.weights_name[index]] = value
return loss, grad_with_param_name
return loss

def _no_sens_impl(self, *inputs):
"""construct implementation when the 'sens' parameter is passed in."""
loss = self.network(*inputs)
grads = self.grad_no_sens(self.network, self.weights)(*inputs)
grads = self.grad_reducer(grads)
loss = F.depend(loss, self.optimizer(grads))
if self.return_grad:
grad_with_param_name = {}
for index, value in enumerate(grads):
grad_with_param_name[self.weights_name[index]] = value
return loss, grad_with_param_name
return loss

这个类实现将网络和优化器封装在一起,并且进行梯度更新。返回值是loss,输入和带有loss函数的网络输入一样。

  • 实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 实例化前向网络
t = test()
# 设定损失函数并连接前向网络与损失函数
loss = CustomLoss()
net_with_criterion = nn.WithLossCell(t, loss)
# 设定优化器
opt = nn.Adam(t.trainable_params(), learning_rate=0.001)
# 定义训练网络
train_net = nn.TrainOneStepCell(net_with_criterion, opt)
# 设置网络为训练模式
train_net.set_train()
for i in range(10000):
for i1 in iterator:
res = train_net(i1[0],i1[1])

print(loss(t(1),7))
0.0

主要知道两点:

  1. MindSpore训练不需要像Pytorch训练一样梯度清零,梯度更新
  2. 使用下面的几个代码
1
2
3
net_with_criterion = nn.WithLossCell(t, loss)
train_net = nn.TrainOneStepCell(net_with_criterion, opt)
train_net.set_train()

并行训练

如果你用的是modelarts平台那么你可能会有多个npu,但是使用notebook里的代码是无法做到并行训练的。

  • 方法:

将代码写成python文件,和正常pytorch的一样。

执行下面这条语句:

1
2
mpirun --allow-run-as-root -n 8 python train.py --is_parallel True
# 8代表使用多少块NPU进行计算。

指定数据存储

1
mindspore.set_context(mode=0, device_id=0)

并行设置

1
2
3
4
5
6
7
import os
import mindspore as ms
from mindspore.communication import init

ms.set_context(mode=ms.GRAPH_MODE)
ms.set_auto_parallel_context(parallel_mode=ms.ParallelMode.AUTO_PARALLEL, gradients_mean=True)
init()

官方链接