Introduction

Why pytorch?

更加“Python化”,更易于使用

每一个Pytorch示例(CV和NLP)都有共同的结构:

data/
experiments/
model/
    net.py:指定神经网络架构、损失函数和评估指标。
    data_loader.py:指定数据应如何馈送到网络。
train.py:包含主训练循环。
evaluate.py:包含用于评估模型的主循环。
search_hyperparams.py
synthesize_results.py
evaluate.py
utils.py:用于处理超参数 / 日志 / 存储模型的实用功能。

有用的库

下面是一个综合列表,列出了计算机视觉、自然语言处理和生成库等不同领域的一些项目:

易于实现数据并行

Distributed Data-Parallel(分布式数据并行)是 PyTorch 的一项特性,你可以将其与 Data-Parallel(数据并行)结合使用来处理需要大型数据集和模型的用例。

为实现数据并行,使用了torch.nn.DataParallel

增加了对移动设备的支持

如Android和IOS设备部署的支持

PyTorch Mobile 入门:

易于调试

pytorch的动态计算图允许在代码执行时进行动态修改和快速调试

使用方式

torch 与 numpy

互相转换

np_data = np.arange(6).reshape((2, 3))
torch_data = torch.from_numpy(np_data)
tensor2array = torch_data.numpy()

运算形式

  1. 注意torch中的数据形式是Tensor
# abs 绝对值计算
data = [-1, -2, 1, 2]
tensor = torch.FloatTensor(data)  # 转换成32位浮点 tensor
print(
    '\nabs',
    '\nnumpy: ', np.abs(data),          # [1 2 1 2]
    '\ntorch: ', torch.abs(tensor)      # [1 2 1 2]
)
  1. Autograd automatically supports Tensors with requires_grad set to True. 计算图可以通过链式法则求导。如果variables中的任何一个variable是 非标量(non-scalar)的,且requires_grad=True

逻辑回归程序(关系拟合)

  1. Class里,__init__()是定义各层,forward()才是真正搭建网络

  2. unsqueeze(1)的使用是因为将数据从1维变2维,因为pytorch只能处理带patch的二维数据

  3. hiddenLinear()的一个实例,后面是它这一层的内容, 故可以对其传入参数

  4. 训练前对优化器和损失函数进行选择

  5. 优化过程

    1. 为什么要用optimizer.zero_grad()把梯度清零?

      1. 进来一个batch的数据, 就会计算一次梯度, 更新一次网络.
      2. 可以实现梯度累加等Trick: https://www.zhihu.com/question/303070254
    2. 每个batch的操作与基础代码对应关系 pytorch中:

      基础实现:

      1. optimizer.zero_grad()对应d_weights = [0] * n 梯度初始化为零, 一个batch的loss关于weight的导数是所有sample的loss关于weight的导数的累加和

      2. outputs = net(inputs)对应h = dot(input[k], weights)

        即前向传播求出预测的值

      3. loss = criterion(outputs, labels)对应loss += (label[k] - h) * (label[k] - h) / 2

        这一步很明显,就是求loss(其实我觉得这一步不用也可以,反向传播时用不到loss值,只是为了让我们知道当前的loss是多少)

      4. loss.backward()对应d_weights = [d_weights[j] + (label[k] - h) * input[k][j] for j in range(n)]

        即反向传播求梯度

      5. optimizer.step()对应weights = [weights[k] + alpha * d_weights[k] for k in range(n)]

        即更新所有参数

  6. 使用imageio进行GIF绘制。

LR_fitting
LR_fitting
import numpy as np

import torch.nn as nn
import os
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK']='True'       # macOS系统原因需要加上这一句可正常运行

"""
建立数据集   
"""
import torch as t
import matplotlib.pyplot as plt
import imageio
from matplotlib.animation import FuncAnimation
plt.style.use('seaborn')  # 设置使用的样式
import seaborn
# y = a * x^2 + b
# shape=(100, 1),1维变2维,因为pytorch只能处理带patch的二维数据
x = t.linspace(-1,1,100).unsqueeze(1)
y  = x.pow(2) + 0.1 * t.randn(x.size())
x.sort()
plt.scatter(x, y)
plt.show()

"""
建立神经网络
"""
import torch.nn.functional as F  # 激励函数
class Net(t.nn.Module):
    """
    __init__()是定义各层
    forward()才是真正搭建网络
    """
    def __init__(self, n_feature, n_hidden, n_output):
        super(Net, self).__init__()
        # 其实每个层就是一个带参数的广义函数映射
        self.hidden = nn.Linear(n_feature, n_hidden)  # hidden是属性,后面是它这一层的内容
        self.output = nn.Linear(n_hidden, n_output)

    def forward(self, x):  # module中的forward功能
        # 正向传播初入值, 用神经网络分析出输出值
        # hidden获得的值是torch.nn.Linear的一个实例,然后Linear又是Module的子类,
        # 所以hidden相当于is a Module,Module中实现了__call__()方法,Module类型的所有实例都是可以被当成一个方法来调用的。
        x = F.relu(self.hidden(x))
        x = self.output(x)
        return x


net = Net(1, 10, 1)  # 输入x,隐藏单元10个,输出y
print(net)  # 可以打印网络结构

"""
训练网络
"""
optimizer = t.optim.SGD(net.parameters(), lr=0.2)  # 传入net的参数 和 lr
loss_func = t.nn.MSELoss()      # 回归问题用均方差误差即可

# plt.ion()   # 设置为实时打印的画图

image_list = []
for _ in tqdm(range(200)):
    prediction = net(x)

    loss = loss_func(prediction, y)  # 计算误差

    """优化"""
    optimizer.zero_grad()   # 清空上一步残余更新参数值,要把梯度清零
    loss.backward()         # 误差反向传播,计算梯度作为参数更新值
    optimizer.step()        # step()进行单次优化,将参数重新施加到net的parameters上

    """可视化"""
    if _ % 10 == 0:
        # plot and show learning process
        plt.cla()
        plt.scatter(x.data.numpy(), y.data.numpy())
        line =plt.plot(x.data.numpy(), prediction.data.numpy(), 'r-',  lw=5)
        plt.text(0.5, 0, 'Loss=%.4f' % loss.data.numpy(), fontdict={'size': 20, 'color':  'red'})

        # 生成动态图
        plt.savefig("temp.jpg")
        plt.pause(0.1)
        image_list.append(imageio.imread("temp.jpg"))  # 可以不用循环i,直接用列表形式
        # print(image_list)
        # duration 是图像间的间隔
imageio.mimsave('LR_fitting.gif', image_list,'GIF', duration=0.2)

# plt.ioff()
# plt.show()

分类程序(区分类型)

  1. 注意其与回归网络的不同之处:

    1. 网络输入输出节点数不同
    1. 分类问题使用交叉熵损失函数, 回归问题使用MSE

    2. 经过一个F.softmax()才是概率值, 取最大作为预测输出

  2. torch.normal(_mean_, _std_, _*_, _generator=None_, _out=None_)Tensor

    返回从单独的正态分布中得出均值和标准差的随机数张量.

    mean是具有每个输出元素的正态分布平均值的张量

    std是一个张量,每个输出元素的正态分布的标准偏差

    meanstd的形状不需要匹配,但是每个张量中元素的总数必须相同.

    Example:

  3. torch.cat()合并数据

  4. 根据c的标签不同颜色染色

Classification
Classification
import torch.nn as nn
import os
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import torch as t
import matplotlib.pyplot as plt
import imageio
plt.style.use('seaborn')  # 设置使用的样式


"""
建立数据集   
"""
# 假数据, 中心点分别为(2,2)和(-2,-2)的两个正态分布的数据
n_data = t.ones(100, 2)         # 数据的基本形态
x0 = t.normal(2*n_data, 1)      # 类型0(label) x data (tensor), shape=(100, 2)
y0 = t.zeros(100)               # 类型0(label) y data (tensor), shape=(100, )
x1 = t.normal(-2*n_data, 1)     # 类型1 x data (tensor), shape=(100, 1)
y1 = t.ones(100)                # 类型1 y data (tensor), shape=(100, )

# 注意 x, y 数据的数据形式是一定要像下面一样 (torch.cat 是在合并数据)
# 合并x当做数据, y当做标签,注意标签的类型应该是LongTensor
x = t.cat((x0, x1), 0).type(t.FloatTensor)  # FloatTensor = 32-bit floating
y = t.cat((y0, y1), ).type(t.LongTensor)    # LongTensor = 64-bit integer

# 根据c的标签不同颜色染色
plt.scatter(x.data.numpy()[:, 0], x.data.numpy()[:, 1], c=y.data.numpy(), s=100, lw=0, cmap='RdYlGn')
plt.show()


"""
建立神经网络
"""
import torch.nn.functional as F  # 激励函数
class Net(t.nn.Module):
    """
    __init__()是定义各层
    forward()才是真正搭建网络
    """
    def __init__(self, n_feature, n_hidden, n_output):
        super(Net, self).__init__()
        self.hidden = nn.Linear(n_feature, n_hidden)  # hidden是属性,后面是它这一层的内容
        self.output = nn.Linear(n_hidden, n_output)

    def forward(self, x):  
        # module中的forward功能,正向传播初入值, 用神经网络分析出输出值
        x = F.relu(self.hidden(x))
        x = self.output(x)
        return x


net = Net(2, 10, 2)  # 输入输出类别需要因任务而异
print(net)  


"""
训练网络
"""
optimizer = t.optim.SGD(net.parameters(), lr=0.01)  # 传入net的参数 和 lr, 为了演示将lr调小
loss_func = t.nn.CrossEntropyLoss()      # 注意多分类用CrossEntropyLoss  [0, 0, 1] -> [0.2, 0.1, 0.7]

# plt.ion()   # 设置为实时打印的画图

image_list = []
for _ in tqdm(range(100)):
    out = net(x)    # 分类是out, [-2, -0.12, 20], 需要再加入激活层转化为概率 :F.softmax(out)

    loss = loss_func(out, y)  # 计算误差

    """优化"""
    optimizer.zero_grad()   # 清空上一步残余更新参数值,要把梯度清零
    loss.backward()         # 误差反向传播,计算梯度作为参数更新值
    optimizer.step()        # step()进行单次优化,将参数重新施加到net的parameters上

    """可视化"""
    if _ % 2 == 0:
        plt.cla()
        # 过了一道 softmax 的激励函数后的最大概率才是预测值
        prediction = t.max(F.softmax(out), 1)[1]    # max()[1]取索引(label)
        pred_y = prediction.data.numpy().squeeze()
        target_y = y.data.numpy()
        plt.scatter(x.data.numpy()[:, 0], x.data.numpy()[:, 1], c=pred_y, s=100, lw=0, cmap='RdYlGn')
        accuracy = sum(pred_y == target_y) / 200.  # 预测中有多少和真实值一样
        plt.text(1.5, -4, 'Accuracy=%.2f' % accuracy, fontdict={'size': 20, 'color': 'red'})

        # 生成动态图
        plt.savefig("temp.jpg")
        plt.pause(0.1)
        image_list.append(imageio.imread("temp.jpg"))  # 可以不用循环i,直接用列表形式

imageio.mimsave('Classification.gif', image_list,'GIF', duration=0.05)   # duration 是图像间的间隔

# plt.ioff()
# plt.show()

更快的搭建神经网络

由输出可以发现, net2将激活函数也纳入网络中了, 但net中激活函数是在forward()中才被调用, 说明:

net 的好处就是你可以根据你的个人需要更加个性化你自己的前向传播过程, 比如(RNN).

net2的好处是更加简单

F.relu() 是一个function

nn.ReLU() 是一个class

net2 = nn.Sequential(
    nn.Linear(1, 10),
    nn.ReLU(),
    nn.Linear(10,1)
)
print(net)
print(net2)

输出:

Net(        # 正常搭建
  (hidden): Linear(in_features=1, out_features=10, bias=True)
  (output): Linear(in_features=10, out_features=1, bias=True)
)
Sequential(
  (0): Linear(in_features=1, out_features=10, bias=True)
  (1): ReLU()
  (2): Linear(in_features=10, out_features=1, bias=True)
)

保存和恢复模型

  1. 保存网络

  2. 恢复整个网络

  3. 恢复网络参数

由图可以看出3个net得到的回归曲线完全相同, 因为它们的内部参数都是完全相同的.

ModelSaveUse_10hidden
ModelSaveUse_10hidden

hidden layer nodes == 10

ModelSaveUse_100hidden
ModelSaveUse_100hidden

hidden layer nodes == 100, 增加宽度后拟合效果明显好与hidden layer nodes == 10的网络.

完整代码:

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import imageio
from tqdm import tqdm
import torch as t
import torch.nn as nn
import matplotlib.pyplot as plt
plt.style.use('seaborn')  # 设置使用的样式

t.manual_seed(1)    # reproducible

# 假数据
x = t.unsqueeze(t.linspace(-1, 1, 100), dim=1)  # x data (tensor), shape=(100, 1)
y = x.pow(2) + 0.2*t.rand(x.size())  # noisy y data (tensor), shape=(100, 1)


def savemodel():
    """
    保存网络
    """
    # Build Net
    net1 = nn.Sequential(
        nn.Linear(1, 10),
        nn.ReLU(),
        nn.Linear(10, 1)
    )
    optimizer = t.optim.SGD(net1.parameters(), lr=0.5)
    loss_func = t.nn.MSELoss()

    # Train
    for _ in range(100):
        prediction = net1(x)
        loss = loss_func(prediction, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    """Save model"""
    t.save(net1, './Try_ModelSaved/net.pkl')  # 保存整个网络
    t.save(net1.state_dict(), './Try_ModelSaved/net_params.pkl')  # 只保存参数,速度快


def restore_net():
    """
    提取整个网络
        网络大时可能会慢
    """
    net2 = t.load('./Try_ModelSaved/net.pkl')  # restore entire net1 to net2
    prediction = net2(x)

def restore_params():
    """
    提取网络参数
        提取参数, 放入新建的网络中
    """
    """新建 net3"""
    net3 = t.nn.Sequential(
        nn.Linear(1, 10),
        nn.ReLU(),
        nn.Linear(10, 1)
    )
    """将参数复制到net3中"""
    net3.load_state_dict(t.load('./Try_ModelSaved/net_params.pkl'))
    prediction = net3(x)


savemodel()
restore_net()
restore_params()

批训练 — DataLoader

  1. 批训练可以有很多种途径, 详情请见 我制作的 训练优化器 动画简介.
  2. Torch 中提供了一种帮你整理你的数据结构的好东西, DataLoader 是 torch 给你用来包装你的数据的工具.
    1. 作用: 包装自己的数据, 进行批训练.
    2. 过程
      1. 将自己的 (numpy array 或其他) 数据形式装换成 Tensor
      2. 然后再放进这个包装器中.
    3. 好处: 他们帮你有效地迭代数据
  3. DataLoader核心代码

  4. 细节:

    1. 如果最后的step里数据不够一个batch_size时, 最后一个step会输出剩下的数据

完整示例代码:

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import imageio
from tqdm import tqdm
import torch as t
import torch.nn as nn
import torch.utils.data as Data
import matplotlib.pyplot as plt
plt.style.use('seaborn')  # 设置使用的样式

t.manual_seed(1)    # reproducible

BATCH_SIZE = 5      # 批训练的数据个数

x = t.linspace(1, 10, 10)       # x data (torch tensor)
y = t.linspace(10, 1, 10)       # y data (torch tensor)

# 先转换成 torch 能识别的 Dataset
torch_dataset = Data.TensorDataset(x, y)

# 把 dataset 放入 DataLoader
loader = Data.DataLoader(
    dataset=torch_dataset,      # torch TensorDataset format
    batch_size=BATCH_SIZE,      # mini batch size
    shuffle=True,               # 要不要打乱数据 (打乱比较好)
    num_workers=2,              # 多线程来读数据
)

for epoch in range(3):
    for step, (batch_x, batch_y) in enumerate(loader):
        # 假设这里就是你训练的地方...

        # 打出来一些数据
        print('Epoch:', epoch, '| Step:', step, '| batch x:',
              batch_x.numpy(), '| batch y:', batch_y.numpy())

输出:

Epoch: 0 | Step: 0 | batch x: [ 5.  7. 10.  3.  4.] | batch y: [6. 4. 1. 8. 7.]
Epoch: 0 | Step: 1 | batch x: [2. 1. 8. 9. 6.] | batch y: [ 9. 10.  3.  2.  5.]
Epoch: 1 | Step: 0 | batch x: [ 4.  6.  7. 10.  8.] | batch y: [7. 5. 4. 1. 3.]
Epoch: 1 | Step: 1 | batch x: [5. 3. 2. 1. 9.] | batch y: [ 6.  8.  9. 10.  2.]
Epoch: 2 | Step: 0 | batch x: [ 4.  2.  5.  6. 10.] | batch y: [7. 9. 6. 5. 1.]
Epoch: 2 | Step: 1 | batch x: [3. 9. 1. 8. 7.] | batch y: [ 8.  2. 10.  3.  4.]

不同的优化器 — 加速神经网络训练

  1. 主要有以下几种模式

    1. Stochastic Gradient Descent(SGD)
    2. Momentum
    3. AdaGrad
    4. RMSProp
    5. Adam
  2. 图示:

    speedup3

    原始形式:

    加速神经网络训练 (Speed Up Training)

    Momentum:

    加速神经网络训练 (Speed Up Training)

    Adam:

    加速神经网络训练 (Speed Up Training)

    SGD 是最普通的优化器, 也可以说没有加速效果, 而 MomentumSGD 的改良版, 它加入了动量原则. 后面的 RMSprop 又是 Momentum 的升级版. 而 Adam 又是 RMSprop 的升级版. 不过从这个结果中我们看到, Adam 的效果似乎比 RMSprop 要差一点. 所以说并不是越先进的优化器, 结果越佳. 我们在自己的试验中可以尝试不同的优化器, 找到那个最适合你数据/网络的优化器.

  3. 注意画不同的图的时候要建立不同的plt.figure()

  4. 待分类数据:

    Optimizer_data 不同优化器对比结果: Optimizer

完整代码:

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import imageio
from tqdm import tqdm
import torch as t
import torch.nn as nn
import torch.utils.data as Data
import matplotlib.pyplot as plt
import torch.nn.functional as F
plt.style.use('seaborn')  # 设置使用的样式

t.manual_seed(1)    # reproducible

LR = 0.01
BATCH_SIZE = 32      # 批训练的数据个数
EPOCH = 12

# fake dataset
x = t.unsqueeze(t.linspace(-1, 1, 1000), dim=1)
y = x.pow(2) + 0.1*t.normal(t.zeros(*x.size()))

# 注意画不同的图的时候要建立不同的`plt.figure()`
plt.figure()
plt.scatter(x.numpy(), y.numpy())
plt.show()

# 使用上节内容提到的 data loader
torch_dataset = Data.TensorDataset(x, y)
loader = Data.DataLoader(dataset=torch_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2,)


# 为了对比每一种优化器, 我们给他们各自创建一个神经网络, 但这个神经网络都来自同一个 Net 形式.
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.hidden = nn.Linear(1, 10)
        self.predict = nn.Linear(10, 1)

    def forward(self, x):
        x = self.hidden(x)
        x = F.relu(x)
        x = self.predict(x)
        return x

net_SGD         = Net()
net_Momentum    = Net()
net_RMSprop     = Net()
net_Adam        = Net()
nets = [net_SGD, net_Momentum, net_RMSprop, net_Adam]

opt_SGD         = t.optim.SGD(net_SGD.parameters(), lr=LR)
opt_Momentum    = t.optim.SGD(net_Momentum.parameters(), lr=LR, momentum=0.8)
opt_RMSprop     = t.optim.RMSprop(net_RMSprop.parameters(), lr=LR, alpha=0.9)
opt_Adam        = t.optim.Adam(net_Adam.parameters(), lr=LR, betas=(0.9, 0.99))
optimizers = [opt_SGD, opt_Momentum, opt_RMSprop, opt_Adam]

loss_func = nn.MSELoss()
losses_his = [[], [], [], []]   # record the loss of 4 net

for epoch in range(EPOCH):
    print('Epoch:', epoch)
    for step, (b_x, b_y) in enumerate(loader):

        # 通过数组的形式, 迭代4次
        for net, opt, l_his in zip(nets, optimizers, losses_his):
            output = net(b_x)
            loss = loss_func(output, b_y)

            opt.zero_grad()
            loss.backward()
            opt.step()      # 为每个节点重设梯度
            l_his.append(loss.data.numpy())

# 注意画不同的图的时候要建立不同的`plt.figure()`
plt.figure()       
labels = ['SGD', 'Momentum', 'RMSprop', 'Adam']
for i, l_his in enumerate(losses_his):
    plt.plot(l_his, label=labels[i])
plt.legend(loc='best')
plt.xlabel('Steps')
plt.ylabel('Loss')
plt.ylim((0, 0.2))
plt.show()

CNN — MNIDT分类

  1. 加载数据
    1. 选一个样本进行输出 image-20200804154520873
  2. 下载数据集方法

  3. t-SNE聚类及可视化

    low_dim_embs = tsne.fit_transform(last_layer.data.numpy()[:plot_only, :]), ==通过最后一层的pixel进行t-SNE分类可视化==

  4. CNN模型

    1. 通过padding, 经过卷积层图片大小不变
    2. 注意全连接层输入是batch_size个一维向量 — (32*7*7, 10)
    3. 可视化时需要 return x 来返回

    CNN_MNISTCNN结构输出:

    CNN(
      (conv1): Sequential(
        (0): Conv2d(1, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
        (1): ReLU()
        (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      )
      (conv2): Sequential(
        (0): Conv2d(16, 32, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
        (1): ReLU()
        (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      )
      (out): Linear(in_features=1568, out_features=10, bias=True)
    )

    测试集前10个样本输出预测

    [7 2 1 0 4 1 4 9 5 9] prediction number
    [7 2 1 0 4 1 4 9 5 9] real number

    完整代码:

    import os
    
    os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
    import imageio
    from tqdm import tqdm
    import torch as t
    import torch.nn as nn
    import torch.utils.data as Data
    import matplotlib.pyplot as plt
    import torch.nn.functional as F
    import torchvision
    
    plt.style.use('seaborn')  # 设置使用的样式
    
    # for visualization
    from matplotlib import cm
    try:
        from sklearn.manifold import TSNE; HAS_SK = True
    except:
        HAS_SK = False; print('Please install sklearn for layer visualization')
    
    LR = 0.001
    BATCH_SIZE = 50  # 批训练的数据个数
    EPOCH = 8
    DOWNLOAD_MNIST = False
    
    """Mnist digits dataset"""
    if not(os.path.exists('Volumes/ArcFile/mnist/')) or not os.listdir('Volumes/ArcFile/mnist/'):
        DOWNLOAD_MNIST = True
    
    train_data = torchvision.datasets.MNIST(
        root='Volumes/ArcFile/mnist/',
        train=True,  # this is training data
        # PIL.Image or numpy.ndarray to torch.FloatTensor(C * H * W), and normalize in the range [0.0, 1.0]
        transform=torchvision.transforms.ToTensor(),
    
        download=DOWNLOAD_MNIST
    )
    # 批训练 50samples, 1 channel, 28x28 (50, 1, 28, 28)
    train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
    # plot one example
    print(train_data.train_data.size())                 # (60000, 28, 28)
    print(train_data.train_labels.size())               # (60000)
    plt.imshow(train_data.train_data[0].numpy(), cmap='gray')
    plt.title('%i' % train_data.train_labels[0])
    plt.show()
    
    # pick 2000 samples to speed up testing
    test_data = torchvision.datasets.MNIST(root='Volumes/ArcFile/mnist/', train=False)
    # shape from (2000, 28, 28) to (2000, 1, 28, 28), value in range(0,1)
    test_x = t.unsqueeze(test_data.test_data, dim=1).type(t.FloatTensor)[:2000] / 255.
    test_y = test_data.test_labels[:2000]
    
    
    class CNN(t.nn.Module):
        def __init__(self):
            super(CNN, self).__init__()
            self.conv1 = nn.Sequential(
                # input shape (1, 28, 28) -> (16, 28, 28) -> (16, 14, 14)
                nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5, stride=1, padding=2),  # conv后的图片大小不变
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2))
            self.conv2 = nn.Sequential(
                # input shape (16, 14, 14) -> (32, 14, 14) -> (32, 7, 7)
                nn.Conv2d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2),
                nn.ReLU(),
                nn.MaxPool2d(kernel_size=2))
            self.out = nn.Linear(32 * 7 * 7, 10)  # 注意:(32*7*7, 10)
    
        def forward(self, x):
            x = self.conv1(x)
            x = self.conv2(x)
            x = x.view(x.size(0), -1)  # 注意要将输入展开成一维向量(batch_size, 32*7*7)
            output = self.out(x)
            return output, x        # 一定注意: 可视化时 return x for visualization
    
    
    cnn = CNN()
    print(cnn)
    
    optimizer = t.optim.Adam(cnn.parameters(), lr=LR, betas=(0.9, 0.99))
    loss_func = nn.CrossEntropyLoss()
    
    
    def plot_with_labels(lowDWeights, labels, image_list, *param):
        plt.cla()
        X, Y = lowDWeights[:, 0], lowDWeights[:, 1]
        epoch, loss, accuracy = param
        for x, y, s in zip(X, Y, labels):
            c = cm.rainbow(int(255 * s / 9))
            plt.text(x, y, s, backgroundcolor=c, fontsize=9)
        # plt.xlim(X.min(), X.max())
        # plt.ylim(Y.min(), Y.max())
        plt.xlim(-50, 50)
        plt.ylim(-40, 40)
        plt.title('Visualize last layer')
        text = ('Epoch: %d,' % epoch + ' train loss: %.4f,' % loss.data.numpy() + ' test accuracy: %.3f' % accuracy)
        plt.text(-30, -35, text, fontdict={'size': 13, 'color': 'red'})
        plt.show()
        plt.savefig("temp.jpg")
        plt.pause(0.02)
        image_list.append(imageio.imread("temp.jpg"))  # 可以不用循环i,直接用列表形式
    
    
    def Train():
        plt.ion()
        image_list = []
        for epoch in tqdm(range(EPOCH)):
            for step, (b_x, b_y) in enumerate(train_loader):
                output = cnn(b_x)[0]  # 注意如果需要visualization时会有2个返回值,故要选择
    
                loss = loss_func(output, b_y)
                optimizer.zero_grad()  # 计算完损失就可以清除梯度了
                loss.backward()
                optimizer.step()
    
                # visualization use t-SNE
                if step % 100 == 0:
                    test_output, last_layer = cnn(test_x)
                    pred_y = t.max(test_output, 1)[1].data.numpy()
                    accuracy = float((pred_y == test_y.data.numpy()).astype(int).sum()) / float(test_y.size(0))
                    print('Epoch: ', epoch, '| train loss: %.4f' % loss.data.numpy(), '| test accuracy: %.3f' % accuracy)
                    if HAS_SK:
                        # Visualization of trained flatten layer (T-SNE)
                        tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
                        plot_only = 500
                        low_dim_embs = tsne.fit_transform(last_layer.data.numpy()[:plot_only, :])
                        labels = test_y.numpy()[:plot_only]
                        plot_with_labels(low_dim_embs, labels, image_list, epoch, loss, accuracy)  # lowDWeights, labels
        plt.ioff()
        imageio.mimsave('CNN_MNIST.gif', image_list, 'GIF', duration=0.2)
    
        t.save(cnn, './Try_ModelSaved/cnn_mnist.pkl')  # 保存整个网络
    
    
    def Test():
        net2 = t.load('./Try_ModelSaved/cnn_mnist.pkl')  # restore entire net1 to net2
        test_output = net2(test_x[:10])
        pred_y = t.max(test_output[0], 1)[1].data.numpy().squeeze()
        print(pred_y, 'prediction number')
        print(test_y[:10].numpy(), 'real number')
    
    
    Train()
    Test()

RNN — 序列化分析

介绍

  1. 具有记忆之前知识的能力

rnn4.png

  1. 梯度消失或者梯度爆炸的问题(取决于w大于1还是小于1)

    lstm3.png

    lstm4.png

LSTM分类 — 延缓记忆衰退

带控制器 — 输入控制,输出控制,忘记控制

lstm5.png

  1. 参数相比CNN增加了2个, 横着读, 每次读一行(28个point), 读28次.
TIME_STEP = 28      # rnn 时间步数 / 图片高度
INPUT_SIZE = 28     # rnn 每步输入值 / 图片每行像素

4-1-1.png

  1. RNN模型: 和以前一样, 我们用一个 class 来建立 RNN 模型. 这个 RNN 整体流程是

  2. (input0, state0) -> LSTM -> (output0, state1);
  3. (input1, state1) -> LSTM -> (output1, state2);
  4. (inputN, stateN)-> LSTM -> (outputN, stateN+1);
  5. outputN -> Linear -> prediction. 通过LSTM分析每一时刻的值, 并且将这一时刻和前面时刻的理解合并在一起, 生成当前时刻对前面数据的理解或记忆. 传递这种理解给下一时刻分析.

输出:

torch.Size([60000, 28, 28])
torch.Size([60000])
RNN(
  (rnn): LSTM(28, 64, batch_first=True)
  (out): Linear(in_features=64, out_features=10, bias=True)
)
Epoch:  0 | train loss: 2.2925 | test accuracy: 0.10
Epoch:  0 | train loss: 1.1347 | test accuracy: 0.55
Epoch:  0 | train loss: 1.0238 | test accuracy: 0.68
Epoch:  0 | train loss: 0.4887 | test accuracy: 0.81
Epoch:  0 | train loss: 0.4760 | test accuracy: 0.85
Epoch:  0 | train loss: 0.2999 | test accuracy: 0.86
Epoch:  0 | train loss: 0.3342 | test accuracy: 0.89
Epoch:  0 | train loss: 0.4210 | test accuracy: 0.93
Epoch:  0 | train loss: 0.1264 | test accuracy: 0.93
Epoch:  0 | train loss: 0.3789 | test accuracy: 0.93
Epoch:  0 | train loss: 0.2171 | test accuracy: 0.92
Epoch:  0 | train loss: 0.2132 | test accuracy: 0.94
Epoch:  0 | train loss: 0.2326 | test accuracy: 0.93
Epoch:  0 | train loss: 0.0600 | test accuracy: 0.94
Epoch:  0 | train loss: 0.2595 | test accuracy: 0.94
Epoch:  0 | train loss: 0.0987 | test accuracy: 0.95
Epoch:  0 | train loss: 0.1818 | test accuracy: 0.94
Epoch:  0 | train loss: 0.3106 | test accuracy: 0.96
Epoch:  0 | train loss: 0.1822 | test accuracy: 0.95
[7 2 1 0 4 1 4 9 5 9] prediction number
[7 2 1 0 4 1 4 9 5 9] real number

完整程序:

import os

os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
import imageio
from tqdm import tqdm
import torch as t
import torch.nn as nn
import torch.utils.data as Data
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

plt.style.use('seaborn')  # 设置使用的样式

# for visualization
from matplotlib import cm
try:
    from sklearn.manifold import TSNE; HAS_SK = True
except:
    HAS_SK = False; print('Please install sklearn for layer visualization')

LR = 0.01
BATCH_SIZE = 64
EPOCH = 1
TIME_STEP = 28      # rnn 时间步数 / 图片高度
INPUT_SIZE = 28     # rnn 每步输入值 / 图片每行像素

DOWNLOAD_MNIST = False

"""Mnist digits dataset"""
if not(os.path.exists('/Volumes/ArcFile/mnist/')) or not os.listdir('/Volumes/ArcFile/mnist/'):
    DOWNLOAD_MNIST = True

train_data = torchvision.datasets.MNIST(
    root='/Volumes/ArcFile/mnist/',
    train=True,  # this is training data
    # PIL.Image or numpy.ndarray to torch.FloatTensor(C * H * W), and normalize in the range [0.0, 1.0]
    transform=transforms.ToTensor(),

    download=DOWNLOAD_MNIST
)
# 批训练 50samples, 1 channel, 28x28 (50, 1, 28, 28)
train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
# plot one example
print(train_data.train_data.size())                 # (60000, 28, 28)
print(train_data.train_labels.size())               # (60000)
plt.imshow(train_data.train_data[0].numpy(), cmap='gray')
plt.title('%i' % train_data.train_labels[0])
plt.show()

# pick 2000 samples to speed up testing
test_data = torchvision.datasets.MNIST(root='/Volumes/ArcFile/mnist/', train=False,  transform=transforms.ToTensor())
# shape from (2000, 28, 28) to (2000, 1, 28, 28), value in range(0,1)
test_x = test_data.test_data.type(t.FloatTensor)[:2000] / 255.      # 注意这里不需要unsqueeze()数据了,因为RNN只需要3个??
test_y = test_data.test_labels.numpy()[:2000]


class RNN(t.nn.Module):
    def __init__(self):
        super(RNN, self).__init__()
        self.rnn = nn.LSTM(  # LSTM 效果要比 nn.RNN() 好多了
            input_size=28,  # 图片每行的数据像素点
            hidden_size=64,  # rnn hidden unit
            num_layers=1,  # 有几层 RNN layers
            batch_first=True,  # input & output 会是以 batch size 为第一维度的特征集 e.g. (batch, time_step, input_size)
        )

        self.out = nn.Linear(64, 10)  # 输出层

    def forward(self, x):
        # x shape (batch, time_step, input_size)
        # r_out shape (batch, time_step, output_size)
        # h_n shape (n_layers, batch, hidden_size)   LSTM 有两个 hidden states, h_n 是分线, h_c 是主线
        # h_c shape (n_layers, batch, hidden_size)
        r_out, (h_n, h_c) = self.rnn(x, None)  # None 表示 hidden state 会用全0的 state

        # 选取最后一个时间点的 r_out 输出
        # 这里 r_out[:, -1, :] 的值也是 h_n 的值
        out = self.out(r_out[:, -1, :])
        return out

rnn = RNN()
print(rnn)
"""
RNN (
  (rnn): LSTM(28, 64, batch_first=True)
  (out): Linear (64 -> 10)
)
"""

optimizer = t.optim.Adam(rnn.parameters())
loss_func = nn.CrossEntropyLoss()


def plot_with_labels(lowDWeights, labels, image_list, *param):
    plt.cla()
    X, Y = lowDWeights[:, 0], lowDWeights[:, 1]
    epoch, loss, accuracy = param
    for x, y, s in zip(X, Y, labels):
        c = cm.rainbow(int(255 * s / 9))
        plt.text(x, y, s, backgroundcolor=c, fontsize=9)
    # plt.xlim(X.min(), X.max())
    # plt.ylim(Y.min(), Y.max())
    plt.xlim(-50, 50)
    plt.ylim(-40, 40)
    plt.title('Visualize last layer')
    text = ('Epoch: %d,' % epoch + ' train loss: %.4f,' % loss.data.numpy() + ' test accuracy: %.3f' % accuracy)
    plt.text(-30, -35, text, fontdict={'size': 13, 'color': 'red'})
    plt.show()
    plt.savefig("temp.jpg")
    plt.pause(0.02)
    image_list.append(imageio.imread("temp.jpg"))  # 可以不用循环i,直接用列表形式


def Train():
    plt.ion()
    # plt.figure(figsize=(5,5))
    # image_list = []
    for epoch in tqdm(range(EPOCH)):
        for step, (b_x, b_y) in enumerate(train_loader):
            b_x = b_x.view(-1, 28, 28)   # reshape x to (batch, time_step, input_size)

            output = rnn(b_x)  # 注意如果需要visualization时会有2个返回值,故要选择

            loss = loss_func(output, b_y)
            optimizer.zero_grad()  # 计算完损失就可以清除梯度了
            loss.backward()
            optimizer.step()

            # visualization use t-SNE
            if step % 100 == 0:
                test_output = rnn(test_x)  # x 赋值给 last_layer
                pred_y = t.max(test_output, 1)[1].data.numpy()
                accuracy = float((pred_y == test_y).astype(int).sum()) / float(test_y.size)
                print('Epoch: ', epoch, '| train loss: %.4f' % loss.data.numpy(), '| test accuracy: %.3f' % accuracy)
    #             if HAS_SK:
    #                 # Visualization of trained flatten layer (T-SNE)
    #                 tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000)
    #                 plot_only = 500
    #                 low_dim_embs = tsne.fit_transform(last_layer.data.numpy()[:plot_only, :])  # 通过最后一层的pixel进行t-SNE分类可视化
    #                 labels = test_y.numpy()[:plot_only]
    #                 plot_with_labels(low_dim_embs, labels, image_list, epoch, loss, accuracy)  # lowDWeights, labels
    # plt.ioff()
    # imageio.mimsave('RNN_MNIST.gif', image_list, 'GIF', duration=0.2)

    t.save(rnn, './Try_ModelSaved/rnn_mnist.pkl')  # 保存整个网络


def Test():
    net2 = t.load('./Try_ModelSaved/rnn_mnist.pkl')  # restore entire net1 to net2
    test_output = net2(test_x[:10])
    pred_y = t.max(test_output, 1)[1].data.numpy().squeeze()
    print(pred_y, 'prediction number')
    print(test_y[:10], 'real number')


Train()
Test()


import torch
from torch import nn
import torchvision.datasets as dsets
import torchvision.transforms as transforms
import matplotlib.pyplot as plt


# torch.manual_seed(1)    # reproducible

# Hyper Parameters
EPOCH = 1               # train the training data n times, to save time, we just train 1 epoch
BATCH_SIZE = 64
TIME_STEP = 28          # rnn time step / image height
INPUT_SIZE = 28         # rnn input size / image width
LR = 0.01               # learning rate
DOWNLOAD_MNIST = True   # set to True if haven't download the data


# Mnist digital dataset
train_data = dsets.MNIST(
    root='./mnist/',
    train=True,                         # this is training data
    transform=transforms.ToTensor(),    # Converts a PIL.Image or numpy.ndarray to
                                        # torch.FloatTensor of shape (C x H x W) and normalize in the range [0.0, 1.0]
    download=DOWNLOAD_MNIST,            # download it if you don't have it
)

# plot one example
print(train_data.train_data.size())     # (60000, 28, 28)
print(train_data.train_labels.size())   # (60000)
plt.imshow(train_data.train_data[0].numpy(), cmap='gray')
plt.title('%i' % train_data.train_labels[0])
plt.show()

# Data Loader for easy mini-batch return in training
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)

# convert test data into Variable, pick 2000 samples to speed up testing
test_data = dsets.MNIST(root='./mnist/', train=False, transform=transforms.ToTensor())
test_x = test_data.test_data.type(torch.FloatTensor)[:2000]/255.   # shape (2000, 28, 28) value in range(0,1)
test_y = test_data.test_labels.numpy()[:2000]    # covert to numpy array


class RNN(nn.Module):
    def __init__(self):
        super(RNN, self).__init__()

        self.rnn = nn.LSTM(         # if use nn.RNN(), it hardly learns
            input_size=INPUT_SIZE,
            hidden_size=64,         # rnn hidden unit
            num_layers=1,           # number of rnn layer
            batch_first=True,       # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
        )

        self.out = nn.Linear(64, 10)

    def forward(self, x):
        # x shape (batch, time_step, input_size)
        # r_out shape (batch, time_step, output_size)
        # h_n shape (n_layers, batch, hidden_size)
        # h_c shape (n_layers, batch, hidden_size)
        r_out, (h_n, h_c) = self.rnn(x, None)   # None represents zero initial hidden state

        # choose r_out at the last time step
        out = self.out(r_out[:, -1, :])
        return out


rnn = RNN()
print(rnn)

optimizer = torch.optim.Adam(rnn.parameters(), lr=LR)   # optimize all cnn parameters
loss_func = nn.CrossEntropyLoss()                       # the target label is not one-hotted

# training and testing
for epoch in range(EPOCH):
    for step, (b_x, b_y) in enumerate(train_loader):        # gives batch data
        b_x = b_x.view(-1, 28, 28)              # reshape x to (batch, time_step, input_size)

        output = rnn(b_x)                               # rnn output
        loss = loss_func(output, b_y)                   # cross entropy loss
        optimizer.zero_grad()                           # clear gradients for this training step
        loss.backward()                                 # backpropagation, compute gradients
        optimizer.step()                                # apply gradients

        if step % 50 == 0:
            test_output = rnn(test_x)                   # (samples, time_step, input_size)
            pred_y = torch.max(test_output, 1)[1].data.numpy()
            accuracy = float((pred_y == test_y).astype(int).sum()) / float(test_y.size)
            print('Epoch: ', epoch, '| train loss: %.4f' % loss.data.numpy(), '| test accuracy: %.2f' % accuracy)

# print 10 predictions from test data
test_output = rnn(test_x[:10].view(-1, 28, 28))
pred_y = torch.max(test_output, 1)[1].data.numpy()
print(pred_y, 'prediction number')
print(test_y[:10], 'real number')

RNN回归

RNN_LR

注意

  1. 前向传播过程

  2. 预测过程

完整代码:

import os

os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
import imageio
from tqdm import tqdm
import torch as t
import torch.nn as nn
import torch.utils.data as Data
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
import numpy as np

plt.style.use('seaborn')

LR = 0.02
BATCH_SIZE = 64
TIME_STEP = 10      # rnn 时间步数 / 图片高度
INPUT_SIZE = 1     # rnn 每步输入值 / 图片每行像素

# show data
steps = np.linspace(0, np.pi * 2, 100, dtype=np.float32)  # float32 for converting torch FloatTensor
x_np = np.sin(steps)
y_np = np.cos(steps)
plt.plot(steps, y_np, 'r-', label='target (cos)')
plt.plot(steps, x_np, 'b-', label='input (sin)')
plt.legend(loc='best')
plt.show()


class RNN(t.nn.Module):
    def __init__(self):
        super(RNN, self).__init__()
        self.rnn = nn.RNN(      # 注意这里是RNN就够了
            input_size=INPUT_SIZE,  # 函数幅度是1
            hidden_size=32,  # rnn hidden unit
            num_layers=1,  # 有几层 RNN layers
            batch_first=True,  # input & output 会是以 batch size 为第一维度的特征集 e.g. (batch, time_step, input_size)
        )
        self.out = nn.Linear(32, 1)  # 输出层

    def forward(self, x, h_state):
        # 因为 hidden state 是连续的, 所以我们要一直传递这一个 state
        # x (batch, time_step, input_size)
        # h_state (n_layers, batch, hidden_size)
        # r_out (batch, time_step, output_size)
        # h_state是之前事情的记忆, 也要作为 RNN 的一个输入
        r_out, h_state = self.rnn(x, h_state)

        outs = []    # 保存所有时间点的预测值, 利用动态图
        for time_step in range(r_out.size(1)):    # 对每一个时间点计算 output
            outs.append(self.out(r_out[:, time_step, :]))  # 每个time_step计算
        return t.stack(outs, dim=1), h_state    # t.stack(): list -> tensor
#
rnn = RNN()
print(rnn)
"""
RNN (
  (rnn): RNN(1, 32, batch_first=True)
  (out): Linear (32 -> 1)
)
"""

optimizer = t.optim.Adam(rnn.parameters(), lr=LR)
loss_func = nn.MSELoss()


def Train():
    plt.ion()
    h_state = None  # for initial hidden state, 因为最初没有前面的数据
    plt.figure(1, figsize=(12, 5))
    image_list = []
    for step in tqdm(range(100)):
        start, end = step * np.pi, (step + 1) * np.pi  # time steps, 截取一小段起始位置
        # sin 预测 cos
        steps = np.linspace(start, end, 10, dtype=np.float32, endpoint=False)
        x_np = np.sin(steps)  # float32 for converting torch FloatTensor
        y_np = np.cos(steps)

        x = t.from_numpy(x_np[np.newaxis, :, np.newaxis])  # shape (batch, time_step, input_size) 加1个维度
        y = t.from_numpy(y_np[np.newaxis, :, np.newaxis])

        prediction, h_state = rnn(x, h_state)  # rnn 对于每个 step 的 prediction, 还有最后一个 step 的 h_state
        # !!  下一步十分重要 !! 否则无法反向传播
        h_state = h_state.data  # 要把 h_state 重新包装一下才能放入下一个 iteration, 不然会报错


        loss = loss_func(prediction, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        plt.plot(steps, y_np.flatten(), 'r-')
        plt.plot(steps, prediction.data.numpy().flatten(), 'b-')
        plt.draw()
        plt.savefig('temp.jpg')
        plt.pause(0.05)
        if step % 4 == 0:
            image_list.append(imageio.imread('temp.jpg'))

    imageio.mimsave('RNN_MNIST.gif', image_list, 'GIF', duration=0.2)
    plt.ioff()
    plt.show()


Train()

自编码(Autoencoder) — 用NN进行非监督学习

自编码是一种神经网络的形式

压缩与解压

对自编码器的训练, 相当于将图片经过了压缩,再解压的这一道工序. 当压缩的时候, 原有的图片质量被缩减, 解压时用信息量小却包含了所有关键信息的文件恢复出原本的图片.

编码器: 进而可以训练编码器可以提取足够的有用信息以够恢复原始数据.实际使用时只使用前半部分, 目的是经过编码器网络可以极大减少数据量却不损失关键信息(提取关键特征)

==自编码器能自动分类数据, 也可以其那套在半监督上, 用少量有标签样本和大量无标签样本进行学习.==

auto2.png

auto3.png

编码器部分(encoder)

编码器能得到原数据的精髓, 然后我们只需要再创建一个小的神经网络学习这个精髓的数据,不仅减少了神经网络的负担, 而且同样能达到很好的效果.

auto4.png

自编码是类似于PCA可以对特征属性降维, 以用来区分原数据. 效果甚至超越PCA

解码器(decoder)

解码器训练时是用来将精髓信息解压成原始信息, 所以可以认为是类似于解压器或者生成器的作用(类似GAN), 那做这件事的一种特殊自编码叫做 variational autoencoders,

VAE解释: Variational Autoencoders Explained

有一个例子就是让它能模仿并生成手写数字.

auto6.jpg

自编码器无监督分类MNIST代码

  1. 只需要使用training data, 且不需要labels

  2. 训练过程(上面是输入图片, 下面是网络选择的图片)

    可以看出5和3,4和9比较难分辨

AE_MNIST
AE_MNIST
  1. 最终分类图 AE_Classification_3D

  2. 注意

完整代码:

import os

os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
import imageio
from tqdm import tqdm
import torch as t
import torch.nn as nn
import torch.utils.data as Data
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torchvision
from mpl_toolkits.mplot3d import Axes3D
import numpy as np
import torchvision.transforms as transforms

plt.style.use('seaborn')  # 设置使用的样式

# for visualization
from matplotlib import cm
from sklearn.manifold import TSNE; HAS_SK = True

LR = 0.005
BATCH_SIZE = 64  # 批训练的数据个数
EPOCH = 10
DOWNLOAD_MNIST = False
N_TEST_IMG = 5  # 到时候显示 5张图片看效果, 如上图一

"""Mnist digits dataset"""
if not(os.path.exists('/Volumes/ArcFile/mnist/')) or not os.listdir('/Volumes/ArcFile/mnist/'):
    DOWNLOAD_MNIST = True

train_data = torchvision.datasets.MNIST(
    root='/Volumes/ArcFile/mnist/',
    train=True,
    # PIL.Image or numpy.ndarray to torch.FloatTensor(C * H * W), and normalize in the range [0.0, 1.0]
    transform=transforms.ToTensor(),

    download=DOWNLOAD_MNIST
)
# Data Loader for easy mini-batch return in training, the image batch shape will be (50, 1, 28, 28)
train_loader = Data.DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)


class AutoEncoder(nn.Module):
    def __init__(self):
        super(AutoEncoder, self).__init__()
        # 压缩
        self.encoder = nn.Sequential(
            nn.Linear(28*28, 128),
            nn.Tanh(),
            nn.Linear(128, 64),
            nn.Tanh(),
            nn.Linear(64, 12),
            nn.Tanh(),
            nn.Linear(12, 3),   # 压缩成3个feature, 进行3D图像可视化
        )
        # 解压
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.Tanh(),
            nn.Linear(12, 64),
            nn.Tanh(),
            nn.Linear(64, 128),
            nn.Tanh(),
            nn.Linear(128, 28*28),
            nn.Sigmoid(),       # 激励函数是为了让输出值在 (0,1)
        )

    def forward(self, x):
        encoder = self.encoder(x)
        decoder = self.decoder(encoder)
        return encoder, decoder     # 注意要把encoder和decoder都输出


autoencoder = AutoEncoder()
print(autoencoder)

optimizer = t.optim.Adam(autoencoder.parameters(), lr=LR)
loss_func = nn.MSELoss()   # why??


def plot_with_labels(lowDWeights, labels, image_list, *param):
    # 要观看的数据
    view_data = train_data.train_data[:200].view(-1, 28 * 28).type(torch.FloatTensor) / 255.
    encoded_data, _ = autoencoder(view_data)  # 提取压缩的特征值
    fig = plt.figure(2)
    ax = Axes3D(fig)  # 3D 图
    # x, y, z 的数据值
    X = encoded_data.data[:, 0].numpy()
    Y = encoded_data.data[:, 1].numpy()
    Z = encoded_data.data[:, 2].numpy()
    values = train_data.train_labels[:200].numpy()  # 标签值
    for x, y, z, s in zip(X, Y, Z, values):
        c = cm.rainbow(int(255 * s / 9))  # 上色
        ax.text(x, y, z, s, backgroundcolor=c)  # 标位子
    ax.set_xlim(X.min(), X.max())
    ax.set_ylim(Y.min(), Y.max())
    ax.set_zlim(Z.min(), Z.max())
    epoch, loss, accuracy = param
    text = ('Epoch: %d,' % epoch + ' train loss: %.4f,' % loss.data.numpy() + ' test accuracy: %.3f' % accuracy)
    plt.text(-30, -35, text, fontdict={'size': 13, 'color': 'red'})
    plt.show()
    plt.savefig("temp.jpg")
    plt.pause(0.02)
    image_list.append(imageio.imread("temp.jpg"))  # 可以不用循环i,直接用列表形式


def Train():
    # initialize figure
    f, a = plt.subplots(2, N_TEST_IMG, figsize=(5, 2))
    plt.ion()  # continuously plot

    # original data (first row) for viewing
    view_data = train_data.train_data[:N_TEST_IMG].view(-1, 28 * 28).type(t.FloatTensor) / 255.
    for i in range(N_TEST_IMG):
        a[0][i].imshow(np.reshape(view_data.data.numpy()[i], (28, 28)), cmap='gray')
        a[0][i].set_xticks(())
        a[0][i].set_yticks(())

    image_list = []
    for epoch in range(EPOCH):
        for step, (x, b_label) in tqdm(enumerate(train_loader)):
            # 输出的目标图像是和输入完全相同的, 想要恢复的目标
            b_x = x.view(-1, 28*28)  # batch x, shape (batch, 28*28)
            b_y = x.view(-1, 28*28)   # batch y, shape (batch, 28*28)

            encoder, decoder = autoencoder(b_x)

            loss = loss_func(decoder, b_y)  # 入图像向量和出图像向量差别为loss
            optimizer.zero_grad()  # 计算完损失就可以清除梯度了
            loss.backward()
            optimizer.step()

            # visualization use t-SNE
            if step % 50 == 0:
                print('Epoch: ', epoch, '| train loss: %.4f' % loss.data.numpy())

                # plotting decoded image (second row)
                _, decoded_data = autoencoder(view_data)
                for i in range(N_TEST_IMG):
                    a[1][i].clear()
                    a[1][i].imshow(np.reshape(decoded_data.data.numpy()[i], (28, 28)), cmap='gray')
                    a[1][i].set_xticks(())
                    a[1][i].set_yticks(())
                plt.draw()
                plt.savefig('temp.jpg')
                plt.pause(0.05)
                if step % 200 == 0:
                    image_list.append(imageio.imread('temp.jpg'))

    plt.ioff()
    imageio.mimsave('AE_MNIST.gif', image_list, 'GIF', duration=0.2)

    t.save(autoencoder, './Try_ModelSaved/autoencoder_mnist.pkl')  # 保存整个网络


Train()

# visualize in 3D plot
view_data = train_data.train_data[:200].view(-1, 28*28).type(t.FloatTensor)/255.
encoded_data, _ = autoencoder(view_data)
fig = plt.figure(2); ax = Axes3D(fig)
X, Y, Z = encoded_data.data[:, 0].numpy(), encoded_data.data[:, 1].numpy(), encoded_data.data[:, 2].numpy()
values = train_data.train_labels[:200].numpy()
for x, y, z, s in zip(X, Y, Z, values):
    c = cm.rainbow(int(255*s/9)); ax.text(x, y, z, s, backgroundcolor=c)
ax.set_xlim(X.min(), X.max()); ax.set_ylim(Y.min(), Y.max()); ax.set_zlim(Z.min(), Z.max())
plt.show()

DQN(Deep Q Network ) 强化学习

DQN对比Q-Learning

RL中用表格村春每个状态state 和 在这个状态下每个行为action所拥有的Q值. 但当今问题太复杂, 状态非常多, 其存储和搜索都很难. 所以引入神经网络.

  1. 我们可以将状态和动作当成神经网络的输入, 然后经过神经网络分析后得到动作的 Q 值, 这样我们就没必要在表格中记录 Q 值, 而是直接使用神经网络生成 Q 值.
  2. 还有一种形式的是这样, 我们也能只输入状态值, 输出所有的动作值, 然后按照 Q learning 的原则, 直接选择拥有最大值的动作当做下一步要做的动作.
DQN2.png
DQN2.png

更新神经网络

DQN3.png
DQN3.png
DQN4.png
DQN4.png

我们通过 NN 预测出Q(s2, a1) 和 Q(s2,a2) 的值, 这就是 Q 估计.

然后我们选取 Q 估计中最大值的动作来换取环境中的奖励 reward.

而 Q 现实中也包含从神经网络分析出来的两个 Q 估计值, 不过这个 Q 估计是针对于下一步在 s’ 的估计.

最后再通过算法更新神经网络中的参数

  1. 首先, 我们需要 a1, a2 正确的Q值, 这个 Q 值我们就用之前在 Q learning 中的 Q 现实来代替. 同样我们还需要一个 Q 估计 来实现神经网络的更新.
  2. 所以神经网络的的参数就是老的 NN 参数 加学习率 alpha 乘以 Q 现实 和 Q 估计 的差距. 我们整理一下.

还有两大因素支撑着 DQN 使得它变得无比强大. 这两大因素就是 Experience replay 和 Fixed Q-targets.

Q learning 是一种 off-policy 离线学习法, 它能学习当前经历着的, 也能学习过去经历过的, 甚至是学习别人的经历. 所以每次 DQN 更新的时候, 我们都可以随机抽取一些之前的经历进行学习. 随机抽取这种做法打乱了经历之间的相关性, 也使得神经网络更新更有效率. Fixed Q-targets 也是一种打乱相关性的机理, 如果使用 fixed Q-targets, 我们就会在 DQN 中使用到两个结构相同但参数不同的神经网络, 预测 Q 估计 的神经网络具备最新的参数, 而预测 Q 现实 的神经网络使用的参数则是很久以前的. 有了这两种提升手段, DQN 才能在一些游戏中超越人类.

DQN5.png
DQN5.png
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym

# 超参数
BATCH_SIZE = 32
LR = 0.01
EPSILON = 0.9               # 最优选择动作百分比
GAMMA = 0.9                 # 奖励递减参数
TARGET_REPLACE_ITER = 100   # Q 现实网络的更新频率
MEMORY_CAPACITY = 2000      # 记忆库大小
env = gym.make('CartPole-v0')   # 立杆子游戏
env = env.unwrapped             # ??????
N_ACTIONS = env.action_space.n  # 杆子能做的动作
N_STATES = env.observation_space.shape[0]   # 杆子能获取的环境信息数



class Net(nn.Module):
    def __init__(self, ):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(N_STATES, 10)
        self.fc1.weight.data.normal_(0, 0.1)   # initialization, 正态分布随机生成起始参数值
        self.out = nn.Linear(10, N_ACTIONS)     # 输出价值
        self.out.weight.data.normal_(0, 0.1)   # initialization

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        actions_value = self.out(x)
        return actions_value


class DQN(object):
    def __init__(self):
        # 建立 target net 和 eval net 还有 memory
        self.eval_net, self.target_net = Net(), Net()   # 相同的网络,参数延迟更新

        self.learn_step_counter = 0     # 用于 target 更新计时
        self.memory_counter = 0         # 记忆库记数
        self.memory = np.zeros((MEMORY_CAPACITY, N_STATES * 2 + 2))     # 初始化记忆库
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)    # torch 的优化器
        self.loss_func = nn.MSELoss()   # 误差公式

    def choose_action(self, x):
        # 根据环境观测值选择动作的机制
        x = torch.unsqueeze(torch.FloatTensor(x), 0)
        # 这里只输入一个 sample
        if np.random.uniform() < EPSILON:   # 选最优动作
            actions_value = self.eval_net.forward(x)
            action = torch.max(actions_value, 1)[1].data.numpy()[0]     # return the argmax
        else:   # 选随机动作
            action = np.random.randint(0, N_ACTIONS)
        return action

    def store_transition(self, s, a, r, s_):
        # 存储记忆
        transition = np.hstack((s, [a, r], s_))
        # 如果记忆库满了, 就覆盖老数据
        index = self.memory_counter % MEMORY_CAPACITY
        self.memory[index, :] = transition
        self.memory_counter += 1

    def learn(self):
        # target 网络更新
        # 学习记忆库中的记忆
        if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())
        self.learn_step_counter += 1

        # 抽取记忆库中的批数据
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)
        b_memory = self.memory[sample_index, :]
        b_s = torch.FloatTensor(b_memory[:, :N_STATES])
        b_a = torch.LongTensor(b_memory[:, N_STATES:N_STATES + 1].astype(int))
        b_r = torch.FloatTensor(b_memory[:, N_STATES + 1:N_STATES + 2])
        b_s_ = torch.FloatTensor(b_memory[:, -N_STATES:])

        # 针对做过的动作b_a, 来选 q_eval 的值, (q_eval 原本有所有动作的值)
        q_eval = self.eval_net(b_s).gather(1, b_a)  # shape (batch, 1)
        q_next = self.target_net(b_s_).detach()  # q_next 不进行反向传递误差, 所以 detach
        q_target = b_r + GAMMA * q_next.max(1)[0]  # shape (batch, 1)
        loss = self.loss_func(q_eval, q_target)

        # 计算, 更新 eval net
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

dqn = DQN() # 定义 DQN 系统

for i_episode in range(400):
    s = env.reset()
    while True:
        env.render()    # 显示实验动画
        a = dqn.choose_action(s)    # 行为

        # 选动作, 得到环境反馈
        s_, r, done, info = env.step(a)     # 根据行为得到的反馈

        # 修改 reward, 使 DQN 快速学习
        x, x_dot, theta, theta_dot = s_
        r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
        r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
        r = r1 + r2

        # 存记忆
        dqn.store_transition(s, a, r, s_)        # 状态 动作 奖励 下一个状态 存储

        if dqn.memory_counter > MEMORY_CAPACITY:
            dqn.learn() # 记忆库满了就进行学习

        if done:    # 如果回合结束, 进入下回合
            break

        s = s_

GAN

import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt

# torch.manual_seed(1)    # reproducible
# np.random.seed(1)

# Hyper Parameters
BATCH_SIZE = 64
LR_G = 0.0001  # learning rate for generator
LR_D = 0.0001  # learning rate for discriminator
N_IDEAS = 5  # 灵感(random noise)的个数
ART_COMPONENTS = 15  # it could be total point G can draw in the canvas
PAINT_POINTS = np.vstack([np.linspace(-1, 1, ART_COMPONENTS) for _ in range(BATCH_SIZE)])


# show our beautiful painting range
# plt.plot(PAINT_POINTS[0], 2 * np.power(PAINT_POINTS[0], 2) + 1, c='#74BCFF', lw=3, label='upper bound')
# plt.plot(PAINT_POINTS[0], 1 * np.power(PAINT_POINTS[0], 2) + 0, c='#FF9359', lw=3, label='lower bound')
# plt.legend(loc='upper right')
# plt.show()


def artist_works():  # painting from the famous artist (real target)
    a = np.random.uniform(1, 2, size=BATCH_SIZE)[:, np.newaxis]
    paintings = a * np.power(PAINT_POINTS, 2) + (a - 1)     # 15个x, a*x^2+a-1
    paintings = torch.from_numpy(paintings).float()     # 转化成torch 的形式
    return paintings


G = nn.Sequential(  # Generator
    # N_IDEAS=5 -> ART_COMPONENTS=15
    nn.Linear(N_IDEAS, 128),  # random ideas (could from normal distribution)
    nn.ReLU(),
    nn.Linear(128, ART_COMPONENTS),  # making a painting from these random ideas
)

D = nn.Sequential(  # Discriminator
    nn.Linear(ART_COMPONENTS, 128),  # receive art work either from the famous artist or a newbie like G
    nn.ReLU(),
    nn.Linear(128, 1),
    nn.Sigmoid(),  # 转化为判断为真的概率 [0,1]
)

opt_D = torch.optim.Adam(D.parameters(), lr=LR_D)
opt_G = torch.optim.Adam(G.parameters(), lr=LR_G)

plt.ion()  # something about continuous plotting
for step in range(10000):
    artist_paintings = artist_works()  # real painting from artist
    G_ideas = torch.randn(BATCH_SIZE, N_IDEAS, requires_grad=True)  # random ideas\n
    G_paintings = G(G_ideas)  # fake painting from G (random ideas)

    prob_artist1 = D(G_paintings)  # D try to reduce this prob
    G_loss = torch.mean(torch.log(1. - prob_artist1))

    opt_G.zero_grad()
    G_loss.backward()
    opt_G.step()

    prob_artist0 = D(artist_paintings)  # 真判真概率 D try to increase this prob
    prob_artist1 = D(G_paintings.detach())  # 假判真概率 D try to reduce this prob
    D_loss = - torch.mean(torch.log(prob_artist0) + torch.log(1. - prob_artist1))

    opt_D.zero_grad()
    D_loss.backward(retain_graph=True)  # 保留计算图 reusing computational graph
    opt_D.step()

    if step % 50 == 0:  # plotting
        plt.cla()
        plt.plot(PAINT_POINTS[0], G_paintings.data.numpy()[0], c='#4AD631', lw=3, label='Generated painting', )
        plt.plot(PAINT_POINTS[0], 2 * np.power(PAINT_POINTS[0], 2) + 1, c='#74BCFF', lw=3, label='upper bound')
        plt.plot(PAINT_POINTS[0], 1 * np.power(PAINT_POINTS[0], 2) + 0, c='#FF9359', lw=3, label='lower bound')
        plt.text(-.5, 2.3, 'D accuracy=%.2f (0.5 for D to converge)' % prob_artist0.data.numpy().mean(),
                 fontdict={'size': 13})
        plt.text(-.5, 2, 'D score= %.2f (-1.38 for G to converge)' % -D_loss.data.numpy(), fontdict={'size': 13})
        plt.ylim((0, 3));
        plt.legend(loc='upper right', fontsize=10);
        plt.draw();
        plt.pause(0.01)

plt.ioff()
plt.show()

Reference

  1. 莫烦Python