基于Pytorch搭建潜水泵异常检测系统

基于视觉识别搭建潜水泵异常检测系统

[!] 源自于本人在Python与树莓派的选修课程报告,为原创内容


文章目录

# 个人介绍

  • 本人是自动化专业学生,这个专业会不可避免地接触到工业生产中各种生产装置的设计,考虑到本人的发展方向为计算机视觉方向,因此可以考虑使用Python设计一些工业质量检测模型。

# 设计目标

  • 针对于潜水泵叶轮模型工业产品1,来尝试构建一个模型来解决其质量(缺陷)检测问题,要求模型精度高,推理时间短,模型参数量少。

# 模型设计

1 数据集分析

  • 数据集为经典的二分类灰度图像数据,里面包含了训练样本6,633份(2,875正例与3,758反例)与测试样本715份(262正例与453反例),尺寸皆为(300,300)。
  • 由于图像数据的网格结构与具有尺度不变性与平移不变性特点,因此可以通过构建卷积神经网络来完成图像分类。

2 模型设计

  • 一款神经网络可以分为两个部分:特征提取模块与分类器。在接下来的部分中,我们将依次设计模型的特征提取模块和分类器,来构建我们最终的神经网络模型。

2 特征提取模块选定

目标要求模型精度高,且推理时间短

  • 自2012年ILSVRC中AlexNet到2021年Transformer的变体ViT,这些图像分类模型都可以作为我们设计的基准。经过初步筛选后,得到的选项为:

    • AlexNet
    • VGG
    • ResNet(或ResNext)
    • GoogleNet(Inception V1 至Inception Res-V2)
    • DenseNet
    • NIN
    • Vision Transformer (ViT)
  • 由于要求推理时间短,因此可以排除NIN与ViT;而对于参数量少的要求,AlexNet,VGG和GoogleNet系列显然不符合。因此剩下的选项为ResNet和DenseNet。

  • 考虑到ResNet中残差机制主要用于解决模型深度退化问题,在网络层数较少的情况下的提升极其有限;而DenseNet的特征多重利用特点使其适用于多种不同的情况之中。

  • 综上所述,我们这次建立的模型应该参考DenseNet来进行设计。

  • DenseNet的密集特征连接结构如下图所示:
    基于Pytorch搭建潜水泵异常检测系统

3 分类器选定

  • 我们可以通过最小化数据分布 p d a t a p_{data} pdata​与模型分布 p m o d e l ( y ∣ x ; θ ) p_{model}(\mathbf{y}|\mathbf{x};\theta) pmodel​(y∣x;θ)之间的KL散度来使得分类器能够得到最佳的函数近似 y = f ( x ; θ ) \mathbf{y}=f(\mathbf{x};\theta) y=f(x;θ),其中 θ \theta θ代表了模型的参数, x , y \mathbf{x},\mathbf{y} x,y分别对应了样本空间中的样本点及其监督标签。

  • 一般而言,绝大多数现代神经网络采用极大似然估计来训练神经网络,因此,最小化数据分布与模型分布之间KL散度在极大似然估计方法中的损失函数等价于: J ( θ ) = − E x , y ∼ p ^ d a t a log ⁡ p m o d e l ( y ∣ x ; θ ) J(\theta) = -\Epsilon_{\mathbf{x} ,\mathbf{y} \sim\hat{p}_{data}}\log p_{model}(\mathbf{y} |\mathbf{x} ;\mathbf{\theta} ) J(θ)=−Ex,y∼p^​data​​logpmodel​(y∣x;θ) 即模型分布的负对数似,其中 E \Epsilon E为期望算子, p ^ d a t a \hat{p}_{data} p^​data​为数据分布中的一次抽样。

  • 由于题目为二分类任务,此时模型分布退化为伯努利分布 P ( y = 1 ∣ x ) P(y=1|x) P(y=1∣x)。然而,神经网络输出单元的诱导局部域值域为全体实数 R R R,因此,我们需要使用激活函数来将其转化为一个有效的概率值 p ∈ [ 0 , 1 ] p\in[0,1] p∈[0,1],最常用的输出单元激活函数为Logistic Sigmoid函数 σ ( x ) = 1 1 + exp ⁡ { − x } \sigma(x)=\frac{1}{1+\exp\{-x\}} σ(x)=1+exp{−x}1​

  • 因此,分类器的输出单元如下所示: y ^ = σ ( w T h + b ) \hat{y} = \sigma(\mathbf{w}^T\mathbf{h} +b) y^​=σ(wTh+b)其中b为单元偏置值, h \mathbf{h} h为特征提取模块的输出特征。因此,有 J ( θ ) = − w T [ y log ⁡ x + ( 1 − y ) log ⁡ ( 1 − x ) ] J(\theta) = -\mathbf{w}^T[y\log{x}+(1-y)\log(1-x)] J(θ)=−wT[ylogx+(1−y)log(1−x)]即Binary Cross Entropy Loss(BCELoss)。

# 实践

1 平台环境

  • 本项目在Windows 10上运行,GPU为GeForce RTX 3090
  • 本项目基于Pytorch1.9.1运行,CUDA为11.1,Python版本为3.8

2 代码实践

2.1 导入代码所需库

  • 首先,我们需要导入这次代码所涉及到的库
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset,DataLoader,SubsetRandomSampler
import torch.nn as nn
import torch.optim as optim
import torchvision

import matplotlib.pyplot as plt
import cv2
import random as ra
import os
import numpy as np
from tqdm import tqdm

2.2 数据加载器

  • Pytorch通过数据加载器来得到每一批的数据,而数据集加载器需要一个数据集实例来作为参数,因此,我们首先构建数据集类PreprocessDataset

  • PreprocessDataset类的作用为读取并缓存数据集,并根据数据加载器的需要按索引获取每个样本

  • PreprocessDataset类需要构建如下方法:构造方法__init__、长度符重载方法__len__,索引重载方法__getitem__

    class PreprocessDataset(Dataset):
        def __init__(self,path,imgSize = 224):
            self.path = path
            self.posPath = os.path.join(path,'ok_front')
            self.negPath = os.path.join(path,'def_front')
            self.imgSize = imgSize
            
            self.datas = list()   #Pos: 1 Neg: 0
            for root,_,files in os.walk(self.posPath):
                for file in files:
                    self.datas.append((os.path.join(root,file),1.0))
    
            for root,_,files in os.walk(self.negPath):
                for file in files:
                    self.datas.append((os.path.join(root,file),0.0))
            
            print("[INFO] Successfully loaded the dataset with %d samples!" % len(self.datas))
            
        def __len__(self):
            return len(self.datas)
        
        def __getitem__(self,index):
            imgPath,label = self.datas[index]
            img = cv2.imread(imgPath,0)
            resizeSize = int(self.imgSize * 1.05)
            img = cv2.resize(img,(resizeSize,resizeSize))
            
            img = self._randomCrop(img)
            img = torch.tensor(img / 255.0).float()
            img = self._normalization(img)
            
            img = img.unsqueeze(0)
            return img,label
    
  • 我们使用OpenCV来读取图像对象, 并将其进行预处理,为了增加模型的泛化性能,我们在数据集类中增加了两个私有方法,分别用于随即裁剪和图像标准化

        #...接上
        def _randomCrop(self,img):
            height,width = img.shape[:2]
            cropHeight = ra.randint(0,height - self.imgSize - 1)
            cropWidth = ra.randint(0,width - self.imgSize - 1)
            img = img[cropHeight:cropHeight + self.imgSize,cropWidth:cropWidth + self.imgSize]
            return img
            
        def _normalization(self,img,std = 0.5,mean = 0.5):
            img = (img - mean) / std
            return img
    
  • 至此,数据集类已经构造完毕,根据Pytorch中提供的函数即可构建数据集加载器。为了提高我们方法的说服力,我们将训练集划分为训练集和验证集,比例为7:3。

      batch = 32
      epochs = 150
      imgSize = 224
      device = torch.device("cuda" if torch.cuda.is_available() else " cpu")
    
      path = './casting_data/'
      trainDataPath = os.path.join(path,'train')
      testDataPath = os.path.join(path,'test')
    
      trainDataset = PreprocessDataset(trainDataPath,imgSize)
      testDataset = PreprocessDataset(testDataPath,imgSize)
    
      # Get validation Dataset
      length = len(trainDataset)
      indices = list(range(length))
      ra.shuffle(indices)
      trainSampler = SubsetRandomSampler(indices[:int(0.7 * length)])
      valSampler = SubsetRandomSampler(indices[int(0.7 * length):])
    
    
      trainData = DataLoader(trainDataset,batch_size = batch,sampler = trainSampler)
      valData = DataLoader(trainDataset,batch_size = batch,sampler = valSampler)
      testData = DataLoader(testDataset,batch_size = batch)
    

2.3 模型构建

  • 根据上述建模方法,使用Pytorch设计了一款模型

    class Block(nn.Sequential):
      def __init__(self,inChannals,outChannals):
          """DenseBlock中的非线性组合函数"""
          super(Block,self).__init__(
              nn.BatchNorm2d(inChannals),
              nn.ReLU(inplace = True),
              nn.Conv2d(inChannals,outChannals,kernel_size = 1,stride = 1,bias = False),
              nn.BatchNorm2d(outChannals),
              nn.ReLU(inplace = True),
              nn.Conv2d(outChannals,outChannals,kernel_size = 3,padding = 1,stride = 1,bias = False,groups = outChannals)
          )
    
          
    class DenseBlock(nn.Module):
        def __init__(self,inChannals,blockNum,k = 24):
            """网络中的密集连接模块,k为每个模块的输出通道"""
            super(DenseBlock,self).__init__()
            self.blocks = nn.ModuleList([Block(inChannals + k * i,k) for i in range(blockNum)])
            
        def forward(self,input):
            outputs = [input]
            outputs.append(self.blocks[0](outputs[-1])) 
            for i in range(1,len(self.blocks)):
                temp = torch.cat(outputs,dim = 1)  
                outputs.append(self.blocks[i](temp))  
            
            output = torch.cat(outputs,dim = 1) 
            return output
        
    class Transition(nn.Sequential):
        def __init__(self,inChannals,outChannals):
            """降采样模块"""
            super(Transition,self).__init__(
                nn.BatchNorm2d(inChannals),
                nn.ReLU(inplace = True),
                nn.Conv2d(inChannals,outChannals,kernel_size = 1,bias = False),
                nn.AvgPool2d(2)
            )
    
    
    class Model(nn.Module):
        def __init__(self,channalNum = 64,compressionRate = 0.5,k = 24):
            super(LDN_S,self).__init__()
            self.features = nn.Sequential(
                nn.Conv2d(1,channalNum,kernel_size=7,stride=2,bias=False),
                nn.BatchNorm2d(channalNum),
                nn.ReLU(inplace = True),
                nn.MaxPool2d(kernel_size = 3,stride = 2,padding = 1, dilation=1, ceil_mode=False)
            )
            
            self.blockConfig = [4]
            self.blocks = list()
            for blockNum in self.blockConfig:
                self.blocks.append(DenseBlock(channalNum,blockNum,k))
                channalNum = channalNum + blockNum * k
                self.blocks.append(Transition(channalNum,int(channalNum * compressionRate)))
                channalNum = int(channalNum * compressionRate)
            
            self.blocks = nn.ModuleList(self.blocks)
            self.classifier = nn.Sequential(
                nn.BatchNorm2d(channalNum),
                nn.AdaptiveAvgPool2d((1,1)),
                nn.Flatten(),
                nn.Dropout(),
                nn.Linear(channalNum,1)
            )
            
        def forward(self,input):
            x = self.features(input)
            for block in self.blocks:
                x = block(x)
            
            x = self.classifier(x)
            return x
    
  • 至此,我们可以通过简单的类定义函数来构建神经网络,以及优化器和损失函数。优化器我们使用的是Adam

    net = Model().to(device)
    
    #这个损失函数自带Sigmoid单元,因此神经网络输出层不用加
    lossF = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(net.parameters(),lr=1e-5)
    

2.4 训练框架

  • 首先,我们构建准确率函数来查看模型的训练情况

    def accuracy(outputs,labels):
        predictions = torch.where(outputs > 0.5, torch.ones_like(outputs), torch.zeros_like(outputs))
        acc = torch.sum(predictions == labels)/ labels.shape[0]
        return acc * 100
    
  • 随后,我们便可以开始构建模型训练函数和验证函数

    def train(epoch):
        net.train(True)
        totalAcc,totalLoss = 0.0,0.0
        processBar = tqdm(trainData,ncols = 100)
        for step,(imgs,labels) in enumerate(processBar,1):
            imgs = imgs.to(device)
            labels = labels.to(device).view([-1,1])
            
            net.zero_grad()
            outputs = net(imgs)
            loss = lossF(outputs,labels)
            loss.backward()
            
            acc = accuracy(outputs,labels)
            optimizer.step()
            
            totalAcc += acc.item()
            totalLoss += loss.item()
            processBar.set_description("[%d/%d] Loss-M: %.4f Acc-M: %.2f" % (epoch,epochs,totalLoss/step, 
                                    totalAcc/step))
        
        processBar.close()
        return totalLoss/step,totalAcc/step
    
    def validation(epoch):
        net.train(False)
        totalAcc,totalLoss = 0.0,0.0
        for step,(imgs,labels) in enumerate(valData,1):
            imgs = imgs.to(device)
            labels = labels.to(device).view([-1,1])
            
            outputs = net(imgs)
            loss = lossF(outputs,labels)
            acc = accuracy(outputs,labels)
            
            totalAcc += acc.item()
            totalLoss += loss.item()
        print("[%d/%d] Val Loss: %.4f Val Acc: %.2f" % (epoch,epochs,totalLoss/step,
                                                                            totalAcc/step))
        
        return totalLoss/step,totalAcc/step
    
  • 同理,可以构建测试函数

    def test():
        net.train(False)
        totalAcc,totalLoss = 0.0,0.0
        for step,(imgs,labels) in enumerate(testData,1):
            imgs = imgs.to(device)
            labels = labels.to(device).view([-1,1])
            
            outputs = net(imgs)
            loss = lossF(outputs,labels)
            acc = accuracy(outputs,labels)
            
            totalAcc += acc.item()
            totalLoss += loss.item()
        print("Test Loss: %.4f Test Acc: %.2f" % (totalLoss/step, totalAcc/step))
        
        return totalLoss/step,totalAcc/step
    
  • 最后,我们构建训练总循环

    history = {
        'trainLoss': list(),
        'trainAcc':  list(),
        'valLoss': list(),
        'valAcc': list()
    }
    
    for epoch in range(epochs):
        trainLoss,trainAcc = train(epoch)
        valLoss,valAcc = validation(epoch)
        
        #保存最佳模型
        if epoch == 0 or valAcc > max(history['valAcc']):
            print("[INFO] Successfully saved the Neural Network (Validation Accuracy %.2f)" % (valAcc))
            saveDict = {
              'net': net.state_dict(),
              'optimizer': optimizer.state_dict(),
              'epoch': epoch
            }
            torch.save(saveDict, './checkpoints/Faster_LDN_%d_Acc%.2f.pth' % (epoch,valAcc)) 
            
        history['trainAcc'].append(trainAcc)
        history['trainLoss'].append(trainLoss)
        history['valAcc'].append(valAcc)
        history['valLoss'].append(valLoss)
    
    test()
    

# 结果

  • 输出显示为
      [143/150] Loss-M: 0.0402 Acc-M: 99.25: 100%|██████████████████████| 146/146 [00:05<00:00, 24.42it/s]
      [143/150] Val Loss: 0.0274 Val Acc: 99.60
      [144/150] Loss-M: 0.0373 Acc-M: 99.51: 100%|██████████████████████| 146/146 [00:05<00:00, 24.54it/s]
      [144/150] Val Loss: 0.0271 Val Acc: 99.80
      [145/150] Loss-M: 0.0381 Acc-M: 99.38: 100%|██████████████████████| 146/146 [00:05<00:00, 24.75it/s]
      [145/150] Val Loss: 0.0227 Val Acc: 99.80
      [146/150] Loss-M: 0.0394 Acc-M: 99.34: 100%|██████████████████████| 146/146 [00:05<00:00, 24.49it/s]
      [146/150] Val Loss: 0.0261 Val Acc: 99.75
      [147/150] Loss-M: 0.0376 Acc-M: 99.34: 100%|██████████████████████| 146/146 [00:05<00:00, 24.37it/s]
      [147/150] Val Loss: 0.0290 Val Acc: 99.80
      [148/150] Loss-M: 0.0415 Acc-M: 99.15: 100%|██████████████████████| 146/146 [00:05<00:00, 24.62it/s]
      [148/150] Val Loss: 0.0268 Val Acc: 99.65
      [149/150] Loss-M: 0.0451 Acc-M: 99.11: 100%|██████████████████████| 146/146 [00:05<00:00, 24.74it/s]
      [149/150] Val Loss: 0.0324 Val Acc: 99.55
      Test Loss: 0.0300 Test Acc: 99.73
    
  • 即在测试集上 99.73% 准确率,通过专业工具2可得其参数量仅有23,289,浮点计算量为0.10G,符合要求。
  • 使用Matplotlib对训练结果进行可视化,结果如下所示:
    基于Pytorch搭建潜水泵异常检测系统

  1. 数据集源自Kaggle:https://www.kaggle.com/ravirajsinh45/real-life-industrial-dataset-of-casting-product ↩︎

  2. CSDN上找的计算工具 ↩︎

上一篇:《Pytorch模型推理及多任务通用范式》第四节作业


下一篇:Java-se之I/O流