Springboot整合SpringBatch完成基本案例--从数据库读取数据并写入文件

本案例旨在让新手从0开始完成一个批量任务的开发

 

 

第一步:建表,入参

CREATE TABLE `music_info` (

  `id` int(10) NOT NULL AUTO_INCREMENT COMMENT ‘主键id‘,

  `singer_name` varchar(100) NOT NULL COMMENT ‘歌手名‘,

  `music_size` varchar(100) NOT NULL COMMENT ‘歌曲大小‘,

  `music_name` varchar(100) NOT NULL COMMENT ‘歌曲名‘,

  PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8;

 

insert  into `music_info`(`id`,`singer_name`,`music_size`,`music_name`) values (1,‘小三‘,‘3.2M‘,‘起风了‘),(2,‘刘德华‘,‘3.0M‘,‘忘情水‘),(3,‘猪点点‘,‘5.0M‘,‘会写程序的小猪‘);

 

第二步:搭建SpringBoot项目,使用IDEA或者Spring官网均可

 

第三步:导入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>

第四步:启动类上加注解@EnableBatchProcessing

@SpringBootApplication
@EnableBatchProcessing
public class SpringbatchApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbatchApplication.class, args);
    }
}

第五步:添加配置文件内容

#开发配置
#数据库连接参数的配置
spring.datasource.driverClassName = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/test?serverTimezone=UTC
spring.datasource.username = root
spring.datasource.password = 123456
#项目启动时的建表sql脚本,该脚本由Spring Batch提供
spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
#在项目启动时进行执行建表sql
#
是否生成执行状态记录的表结构
spring.batch.initialize-schema=always
#禁止Spring Batch自动执行,既需要用户触发才能执行
spring.batch.job.enabled=true

第六步:创建实体类

public class Music {
    // 主键id
   
private Integer id;

    // 歌手名
   
private String singer_name;

    // 歌曲大小
   
private String music_size;

    // 歌曲名
   
private String music_name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getSinger_name() {
        return singer_name;
    }

    public void setSinger_name(String singer_name) {
        this.singer_name = singer_name;
    }

    public String getMusic_size() {
        return music_size;
    }

    public void setMusic_size(String music_size) {
        this.music_size = music_size;
    }

    public String getMusic_name() {
        return music_name;
    }

    public void setMusic_name(String music_name) {
        this.music_name = music_name;
    }

    @Override
    public String toString() {
        return "Music{" +
                "id=" + id +
                ", singer_name=‘" + singer_name + \‘+
                ", music_size=‘" + music_size + \‘+
                ", music_name=‘" + music_name + \‘+
                ‘}‘;
    }
}

第七步:创建数据持久层代码

@Mapper
public interface MusicDao {
    //通过id查询数据库记录
   
@Select("select id , singer_name , music_size , music_name from music_info where id = #{id};")
    public List<Music> queryInfoById(Map<String , Integer> map);
}

第八步:编写批量代码

@Configuration
public class BatchJobDemo {
    @Autowired
    JobBuilderFactory jobBuilderFactory;
    @Autowired
    StepBuilderFactory stepBuilderFactory;
    @Autowired
    SqlSessionFactory sqlSessionFactory;

    private static final String JOB = "job";

    private static final String STEP = "step";

    //配置一个Job
   
@Bean(name = JOB)
    Job job() {
        return jobBuilderFactory.get(JOB)
                .start(step())
                .build();
    }


    //配置一个Step
   
@Bean(name = STEP)
    Step step() {
        return stepBuilderFactory.get(STEP)
                .<Music, Music>chunk(2)
                .reader(itemReader())
                .writer(itemWriter())
                .build();
    }


    //配置itemReader
   
@Bean("itemReader")
    @StepScope
    MyBatisCursorItemReader<Music> itemReader() {
        System.out.println("开始查询数据库");
        MyBatisCursorItemReader<Music> reader = new MyBatisCursorItemReader<>();
        Map<String , Object> map = new HashMap<>();
        map.put("id" , 2);
        reader.setQueryId("com.example.springbatch.dao.MusicDao.queryInfoById");
        reader.setSqlSessionFactory(sqlSessionFactory);
        reader.setParameterValues(map);
        return reader;
    }

    //配置itemWriter
   
@Bean("itemWriter")
    @StepScope
    FlatFileItemWriter<Music> itemWriter() {
        System.out.println("开始写入文件中");
        FlatFileItemWriter<Music> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("F:\\music.txt"));//系统目录
       
//Music对象转换成字符串,并输出到文件
       
writer.setLineAggregator(new LineAggregator<Music>() {
            @Override
            public String aggregate(Music music) {
                ObjectMapper mapper = new ObjectMapper();
                String str = null;
                try {
                    str = mapper.writeValueAsString(music);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
                return str;
            }
        });
        return writer;
    }
}

第八步:在磁盘根目录创建文件music.txt

第九步:启动服务

第十步:文件中的结果

{"id":2,"singer_name":"刘德华","music_size":"3.0M","music_name":"忘情水"}

 

 

技术总结

1、 什么是SpringBatch?

是Spring提供的一个数据处理框架,是一个轻量级,全面的批处理框架,旨在开发对企业系统日常运营至关重要的强大批处理应用程序

2、 批处理应用程序大致流程

从数据库、文件、队列中读取大量记录

以某种方式处理数据

以修改之后的形式写回数据

3、 SpringBatch总体架构

在SpringBatch中一个Job可以定义很多的步骤Step,在每一个Step中可以定义其专属的ItemReader用于读取数据,ItemProcesseor用于处理数据,ItemWriter用于写入数据,而每一个定义的Job则都在JobRepository中,我们可以通过JobLauncher来启动某个Job

4、 什么是Job?

Job是一个封装整个批处理过程的概念,在SpringBatch体系中是一个最顶层的抽象概念,体现在代码中则是一个最上层的接口

5、 什么是JobLauncher?

该接口主要用于启动指定了JobParameters的Job,JobParameters和Job一起才能组成一次Job的执行

6、 chunk处理流程

由于一次batch任务可能会有很多数据读写操作,一条一条的处理并向数据库提交的话效率不会很高,因此SpringBatch提出了chunk的概念,设定一个chunk size,SpraingBatch将会一条条处理数据,但是不会提交到数据库,只有当处理的数据达到了chunk size设定值,才会一起commit

 

例如:在一个Step中,chunk size设置为10,当ItemReader读的数据达到10的时候,这一批次就一起传到ItemWriter,同时Transaction被提交

7、 skip策略和失败处理

skipLimit():该方法的作用是我们可以设定一个我们允许的Step跳过异常的数量,加入设定为10,那么整个Step运行时,只要出现异常数量不超过10,整个Step就不会失败

skip():用来指定跳过的异常,因为有些异常的出现, 我们可以忽略

noSkip():指定某些异常出现时,无需跳过,一旦出现,计数器就会累加一次,直到达到上限

8、JobLauncher何时使用?

 

 

 

问题:

1、 如果一个类实现了Job接口,会怎样,实际开发中没有出现实现Job接口的情况?

2、 什么是JobInstance、什么是JobParameters、什么是JobExecution?Springboot整合SpringBatch完成基本案例--从数据库读取数据并写入文件

Springboot整合SpringBatch完成基本案例--从数据库读取数据并写入文件

上一篇:自己设计大学排名-数据库实践


下一篇:Mac 安装flask-sqlacodegen后使用提示 -bash: flask-sqlacodegen: command not found