【Pix2Pix】当生成式对抗神经网络遇到车道线检测

当生成式对抗神经网络遇到车道线检测

目前,卷积神经网络已经成功地应用于语义分割任务。然而,有许多问题本质上不是像素分类问题,但仍然经常被表述为语义分割,将像素概率图转换为最终所需的输出。

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

以车道线检测为例,目前车道线检测的难点为寻找语义上的线,而不是局限于表观存在的线。

但是生成对抗网络 (GAN) 可用于使语义分割网络的输出更真实或更好地保留结构。

一、数据集简介

本项目使用的是21年新出的车道线检测数据集VIL-100,这是一个包含100个视频,10000帧图像,涵盖10种车道线类型、各种驾驶场景、光照条件和多条车道线实体,同时对视频中的所有车道线提供了高质量的实体级标注。

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

# 解压数据集
!unzip -q /home/aistudio/data/data115234/VIL100.zip -d data/
# 数据可视化
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

root = "data/VIL100"
with open("data/VIL100/data/train.txt", "r") as trainList: 
    trainDatas = trainList.readlines()
    print('训练集数据量: {}'.format(len(trainDatas)))

with open("data/VIL100/data/test.txt", "r") as testList: 
    testDatas = testList.readlines()
    print('测试集数据量: {}'.format(len(testDatas)))

# 从训练集中随机抽取一张图像进行可视化
index = random.randint(0, len(trainDatas))
traindata = trainDatas[index].split(" ")
image = cv2.imread(root + traindata[0])
label = cv2.imread(root + traindata[1])
plt.figure(figsize=(10, 10))
plt.imshow(np.hstack([image, label])[:,:,::-1])
plt.show()
训练集数据量: 8000
测试集数据量: 2000

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

二、数据预处理

数据预处理部分与图像分割的数据处理类似,需要对输入图像进行归一化,并基于飞桨提供的paddle.io.Dataset基类,实现自定义数据集。

from paddle.io import Dataset

class VILData(Dataset):
    def __init__(self, mode='train'):
        super(VILData, self).__init__() 
        self.train_data_paths = self.load_train_data()# 获取训练集
        self.test_data_paths = self.load_test_data()  # 获取训练集
        self.mode = mode
        self.root = "data/VIL100"

    def __getitem__(self, idx):
        if self.mode == 'test':
            data_paths = self.test_data_paths
        else:
            data_paths = self.train_data_paths
        image = cv2.imread(self.root + data_paths[idx].split(" ")[0])
        image = (image / 255. * 2. - 1.).astype('float32')
        image = np.transpose(image, (2, 0, 1))
        label = cv2.imread(self.root + data_paths[idx].split(" ")[1])
        label = label.astype('float32')
        label = np.transpose(label, (2, 0, 1))

        return image, label

    def __len__(self):
        if self.mode == 'test':
            return len(self.test_data_paths)
        else:
            return len(self.train_data_paths)

    @staticmethod
    def load_train_data():
        data_path = 'data/VIL100/data/train.txt'
        with open(data_path, "r") as trainList: 
            return trainList.readlines()

    @staticmethod
    def load_test_data():
        data_path = 'data/VIL100/data/test.txt'
        with open(data_path, "r") as testList: 
            return testList.readlines()

traindataset = VILData('train')
testdataset = VILData('test')
# 从训练集中随机抽取一张图像进行可视化
index = random.randint(0, len(traindataset))
plt.imshow(np.transpose(traindataset[index][0], (1,2,0)))
plt.show()
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

# 可视化图像对应的标签
plt.imshow(np.transpose(traindataset[index][1], (1,2,0)))
plt.show()
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

三、模型组网

Pix2Pix,通过随机向量z和图像x生成需要图像y,即{z,x} -> y

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

生成器G用于生成尽可能愚弄判别器D的图像,判别器D尽可能分辨出生成器G生成的假图以及真实图像。

1.生成器的搭建

生成器G的结构采用的是U-Net。

在车道线检测任务中,需要从一个图像到另一个图像,输入输出虽然在表面细节不同,但是底层大致的结构是相同的,所以输入与输出需要粗略的对齐。对于图像的任务而言,需要输入输出之间除了共享高层语义信息之外,还需要能够共享底层的语义信息。这样,U-Net就可以被运用起来。

特别的,为了更好的传递信息,Pix2Pix的作者加了跳连, 直接从i层传输到n-i层,n是总得网络层数,i是与n-i层之间有相应的通道。

import paddle
import paddle.nn as nn

# 下采样
class Downsample(nn.Layer):
    # LeakyReLU => conv => batch norm
    def __init__(self, in_dim, out_dim, kernel_size=5, stride=1, padding=1):
        super(Downsample, self).__init__()

        self.layers = nn.Sequential(
            nn.LeakyReLU(.2),
            nn.Conv2D(in_dim, out_dim, kernel_size, stride, padding),
            nn.BatchNorm2D(out_dim),
        )

    def forward(self, x):
        x = self.layers(x)
        return x

# 上采样
class Upsample(nn.Layer):
    # ReLU => deconv => batch norm => dropout
    def __init__(self, in_dim, out_dim, kernel_size=5, stride=1, padding=1, use_dropout=False):
        super(Upsample, self).__init__()

        sequence = [
            nn.ReLU(),
            nn.Conv2DTranspose(in_dim, out_dim, kernel_size, stride, padding),
            nn.BatchNorm2D(out_dim),
        ]

        if use_dropout:
            sequence.append(nn.Dropout(p=0.5))

        self.layers = nn.Sequential(*sequence)

    def forward(self, x, skip):
        x = self.layers(x)
        x = paddle.concat([x, skip], axis=1)
        return x

# 生成器
class UnetGenerator(nn.Layer):
    def __init__(self, input_nc=3, output_nc=3, ngf=4):
        super(UnetGenerator, self).__init__()
        self.down1 = nn.Conv2D(input_nc, ngf, kernel_size=5, stride=1, padding=1)
        self.down2 = Downsample(ngf, ngf*2)
        self.down3 = Downsample(ngf*2, ngf*4)
        self.down4 = Downsample(ngf*4, ngf*8)
        self.down5 = Downsample(ngf*8, ngf*8)
        self.down6 = Downsample(ngf*8, ngf*8)
        self.down7 = Downsample(ngf*8, ngf*8)
        self.center = Downsample(ngf*8, ngf*8)
        self.up7 = Upsample(ngf*8, ngf*8, use_dropout=True)
        self.up6 = Upsample(ngf*8*2, ngf*8, use_dropout=True)
        self.up5 = Upsample(ngf*8*2, ngf*8, use_dropout=True)
        self.up4 = Upsample(ngf*8*2, ngf*8)
        self.up3 = Upsample(ngf*8*2, ngf*4)
        self.up2 = Upsample(ngf*4*2, ngf*2)
        self.up1 = Upsample(ngf*2*2, ngf)
        self.output_block = nn.Sequential(
            nn.ReLU(),
            nn.Conv2DTranspose(ngf*2, output_nc, kernel_size=5, stride=1, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        d1 = self.down1(x)
        d2 = self.down2(d1)
        d3 = self.down3(d2)
        d4 = self.down4(d3)
        d5 = self.down5(d4)
        d6 = self.down6(d5)
        d7 = self.down7(d6)
        c = self.center(d7)
        x = self.up7(c, d7)
        x = self.up6(x, d6)
        x = self.up5(x, d5)
        x = self.up4(x, d4)
        x = self.up3(x, d3)
        x = self.up2(x, d2)
        x = self.up1(x, d1)
        x = self.output_block(x)
        return x

generator = UnetGenerator()
W1106 16:03:34.220363 20736 device_context.cc:404] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.1, Runtime API Version: 10.1
W1106 16:03:34.226832 20736 device_context.cc:422] device: 0, cuDNN Version: 7.6.

2.判别器的搭建

在Pix2Pix中,判别器输入两张图像,一张是输入给生成器G的图像,另一张是生成器G的输出的图像。也就是说,对于判别器而言,只有高质量的输出图像是不能骗过判别器的,有两张图像的对应关系能使判别器更好地工作。

class NLayerDiscriminator(nn.Layer):
    def __init__(self, input_nc=6, ndf=4):
        super(NLayerDiscriminator, self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2D(input_nc, ndf, kernel_size=3, stride=1, padding=1), 
            nn.LeakyReLU(0.2),
            ConvBlock(ndf, ndf*2),
            ConvBlock(ndf*2, ndf*4),
            ConvBlock(ndf*4, ndf*8, stride=1),
            nn.Conv2D(ndf*8, 1, kernel_size=3, stride=1, padding=1),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.layers(input)


class ConvBlock(nn.Layer):
    # conv => batch norm => LeakyReLU
    def __init__(self, in_dim, out_dim, kernel_size=3, stride=1, padding=1):
        super(ConvBlock, self).__init__()
        self.layers = nn.Sequential(
            nn.Conv2D(in_dim, out_dim, kernel_size, stride, padding),
            nn.BatchNorm2D(out_dim), 
            nn.LeakyReLU(.2),
        )

    def forward(self, x):
        x = self.layers(x)
        return x

discriminator = NLayerDiscriminator()

3.测试生成器与判别器的输出

完成生成器和判别器的组网后,测试一下两者的输出是否符合我们的预期

out = generator(paddle.ones([1, 3, 1080, 1920]))
print('生成器输出尺寸:', out.shape)  # 应为[1, 3, 1080, 1920]

out = discriminator(paddle.ones([1, 6, 1080, 1920]))
print('鉴别器输出尺寸:', out.shape)  # 应为[1, 1, 1080, 1920]
生成器输出尺寸: [1, 3, 1080, 1920]
鉴别器输出尺寸: [1, 1, 1080, 1920]

四、模型训练

Pix2Pix的优化目标包含2个部分。一部分是cGAN的优化目标。cGAN的优化目标如下所示:

L c G A N ( G , D ) = E x , y [ log ⁡ D ( x , y ) ] + E x , z [ log ⁡ ( 1 − D ( x , G ( x , z ) ) ] , \begin{aligned} \mathcal{L}_{c G A N}(G, D)=& \mathbb{E}_{x, y}[\log D(x, y)]+\\ & \mathbb{E}_{x, z}[\log (1-D(x, G(x, z))], \end{aligned} LcGAN​(G,D)=​Ex,y​[logD(x,y)]+Ex,z​[log(1−D(x,G(x,z))],​

z表示随机噪声,判别器D的优化目标是使得 L c G A N ( G , D ) {L}_{c G A N}(G, D) LcGAN​(G,D)的值越大越好,而生成器G的优化目标是使得 l o g ( 1 − D ( x , G ( x , z ) ) log(1-D(x,G(x,z)) log(1−D(x,G(x,z))越小越好。

这里需要注意的是正如GAN论文提到的,这样的计算方法在训练时容易出现饱和现象,也就是判别器D很强大,但是生成器G很弱小,导致生成器G基本上训练不起来,因此可以将生成器G的优化目标从最小化 l o g ( 1 − D ( x , G ( x , z ) ) log(1-D(x,G(x,z)) log(1−D(x,G(x,z))修改为最大化 l o g ( D ( x , G ( x , z ) ) ) log(D(x,G(x,z))) log(D(x,G(x,z))),Pix2Pix算法采用修改后的优化目标。

另一部分是L1距离,用来约束生成图像和真实图像之间的差异,这部分借鉴了其他基于GAN做图像翻译的思想,只不过这里用L1而不是L2,目的是减少生成图像的模糊。

定义优化器

from paddle.io import DataLoader

# 超参数
lr = 1e-4
BATCH_SIZE = 1
EPOCHS = 10
schedulerG = paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=lr, T_max=EPOCHS*BATCH_SIZE*8000, verbose=False)
schedulerD = paddle.optimizer.lr.CosineAnnealingDecay(learning_rate=lr, T_max=EPOCHS*BATCH_SIZE*8000, verbose=False)

# 优化器
optimizerG = paddle.optimizer.Adam(
    learning_rate=schedulerG,
    parameters=generator.parameters(),
    beta1=0.5,
    beta2=0.999)

optimizerD = paddle.optimizer.Adam(
    learning_rate=schedulerD,
    parameters=discriminator.parameters(), 
    beta1=0.5,
    beta2=0.999)
    
# 损失函数
bce_loss = nn.BCELoss()
l1_loss = nn.SmoothL1Loss()

# dataloader
data_loader_train = DataLoader(
    traindataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    drop_last=False
    )

data_loader_test = DataLoader(
    testdataset,
    batch_size=BATCH_SIZE
    )
# 读取用于测试实时训练效果的图片
image = cv2.imread('data/VIL100/JPEGImages/0_Road001_Trim003_frames/00000.jpg')
label = cv2.imread('data/VIL100/Annotations/0_Road001_Trim003_frames/00000.png')

g_input = image.astype('float32') / 127.5 - 1             # 归一化
g_input = g_input[np.newaxis, ...].transpose(0, 3, 1, 2)  # NHWC -> NCHW
g_input = paddle.to_tensor(g_input)                       # numpy -> tensor

开始训练

import os

results_save_path = 'work/results'
os.makedirs(results_save_path, exist_ok=True)  # 保存每个epoch的测试结果

weights_save_path = 'work/weights'
os.makedirs(weights_save_path, exist_ok=True)  # 保存模型

for epoch in range(EPOCHS):
    num = 0
    for data in data_loader_train:
        num += 1
        real_A, real_B = data
        optimizerD.clear_grad()
        # D([real_A, real_B])
        real_AB = paddle.concat((real_A, real_B), 1)
        d_real_predict = discriminator(real_AB)
        d_real_loss = bce_loss(d_real_predict, paddle.ones_like(d_real_predict))

        # D([real_A, fake_B])
        fake_B = generator(real_A.detach())
        fake_AB = paddle.concat((real_A, fake_B), 1)
        d_fake_predict = discriminator(fake_AB)
        d_fake_loss = bce_loss(d_fake_predict, paddle.zeros_like(d_real_predict))
        
        # train D
        d_loss = (d_real_loss + d_fake_loss) / 2.
        d_loss.backward()
        optimizerD.step()

        optimizerG.clear_grad()
        # D([real_A, fake_B])
        fake_B = generator(real_A)
        fake_AB = paddle.concat((real_A, fake_B), 1)
        g_fake_predict = discriminator(fake_AB)
        g_bce_loss = bce_loss(g_fake_predict, paddle.ones_like(d_real_predict))
        g_l1_loss = l1_loss(real_B, fake_B)
        g_loss = g_bce_loss + g_l1_loss * 100.
        
        # train G
        g_loss.backward()
        optimizerG.step()

        if num % 200 == 0:
            # 查看训练效果
            g_output = generator(g_input)
            g_output = g_output.detach().numpy()                      # tensor -> numpy
            g_output = g_output.transpose(0, 2, 3, 1)[0]              # NCHW -> NHWC
            g_output = g_output * 127.5 + 127.5                       # 反归一化
            g_output = g_output.astype(np.uint8)

            img_show = np.hstack([image, g_output, label])[:,:,::-1]
            cv2.imwrite(os.path.join(results_save_path, 'epoch'+str(epoch+1).zfill(3)+ '_' + str(num).zfill(3)+'.png'), img_show)

            print(f'Epoch [{epoch+1}/{EPOCHS}] Loss D: {d_loss.numpy()}, Loss G: {g_loss.numpy()}')

    if (epoch+1) % 1 == 0:
        paddle.save(generator.state_dict(), os.path.join(weights_save_path, 'epoch'+str(epoch+1).zfill(3)+'.pdparams'))

部分输出结果如下所示:

Epoch [9/10] Loss D: [0.00078643], Loss G: [280.1502]
Epoch [9/10] Loss D: [0.00098845], Loss G: [148.82457]
Epoch [9/10] Loss D: [0.00126031], Loss G: [86.36062]
Epoch [9/10] Loss D: [0.00081538], Loss G: [275.2824]
Epoch [9/10] Loss D: [0.00070759], Loss G: [173.00641]
Epoch [9/10] Loss D: [0.00073985], Loss G: [227.09679]
Epoch [9/10] Loss D: [0.00076476], Loss G: [381.15402]
Epoch [9/10] Loss D: [2.8043735], Loss G: [4.8939624]
Epoch [9/10] Loss D: [0.00117071], Loss G: [85.810814]

可视化训练过程

b站视频链接:https://www.bilibili.com/video/BV13F411Y7jh/

五、效果展示

# 为生成器加载权重
last_weights_path = os.path.join(weights_save_path, sorted(os.listdir(weights_save_path))[-1])
print('加载权重:', last_weights_path)

model_state_dict = paddle.load(last_weights_path)
generator.load_dict(model_state_dict)
generator.eval()
加载权重: work/weights/epoch010.pdparams
# 读取数据
image = cv2.imread('data/VIL100/JPEGImages/0_Road015_Trim008_frames/00000.jpg')

g_input = image.astype('float32') / 127.5 - 1             # 归一化
g_input = g_input[np.newaxis, ...].transpose(0, 3, 1, 2)  # NHWC -> NCHW
g_input = paddle.to_tensor(g_input)                       # numpy -> tensor

g_output = generator(g_input)
g_output = g_output.detach().numpy()                      # tensor -> numpy
g_output = g_output.transpose(0, 2, 3, 1)[0]              # NCHW -> NHWC
g_output = g_output * 127.5 + 127.5                       # 反归一化
g_output = g_output.astype(np.uint8)

img_show = np.hstack([image, g_output])[:,:,::-1]
plt.figure(figsize=(10, 10))
plt.imshow(img_show)
            # tensor -> numpy
g_output = g_output.transpose(0, 2, 3, 1)[0]              # NCHW -> NHWC
g_output = g_output * 127.5 + 127.5                       # 反归一化
g_output = g_output.astype(np.uint8)

img_show = np.hstack([image, g_output])[:,:,::-1]
plt.figure(figsize=(10, 10))
plt.imshow(img_show)
plt.show()

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

六、总结与升华

本项目基于经典的生成式对抗神经网络Pix2Pix实现了车道线检测任务,与以往基于分割的车道线检测不同,基于GAN方法的车道线检测不仅能识别出表观上的车道线,还能识别出语义上的车道线。

作者简介

北京联合大学 机器人学院 自动化专业 2018级 本科生 郑博培

中国科学院自动化研究所复杂系统管理与控制国家重点实验室实习生

百度飞桨开发者技术专家 PPDE

百度飞桨北京领航团团长

百度飞桨官方帮帮团、答疑团成员

深圳柴火创客空间 认证会员

百度大脑 智能对话训练师

阿里云人工智能、DevOps助理工程师

我在AI Studio上获得至尊等级,点亮10个徽章,来互关呀!!!

https://aistudio.baidu.com/aistudio/personalcenter/thirdview/147378

【Pix2Pix】当生成式对抗神经网络遇到车道线检测

上一篇:深度学习环境安装-docker


下一篇:Nvidia-Docker安装、使用基础