SpringCloud使用Kafka消费者

目录

POM文件配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.donwait</groupId>
  <artifactId>my-kafka-demon</artifactId>
  <version>0.0.1-SNAPSHOT</version>
 
  <!-- spring boot项目 -->
  <parent>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-parent</artifactId>
      <version>2.0.4.RELEASE</version>
      <relativePath/> <!-- lookup parent from repository -->
  </parent>
 
  <!-- 项目属性:子模块不能引用父项目的properties变量 -->
  <properties>
       <!-- 系统全局版本号信息: 所有服务会继承 -->
      <dys.global.version>1.0.0.1</dys.global.version>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
      <java.version>1.8</java.version>
      <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
      <!-- <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>-->
      <lombok.version>1.16.20</lombok.version>
  </properties>
 
  <!-- 项目依赖:特殊强制依赖,其他继承父亲 -->
  <dependencies>
     <!--spring boot测试-->
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
     </dependency>
     <!-- 监控系统健康情况的工具 -->
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-actuator</artifactId>
     </dependency>
     <!--Lombok:消除模板代码-->
     <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
     </dependency>
     <!-- logback日志包 -->
     <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
    </dependency>
     <!-- SpringMVC -->
     <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <!-- kafka客户端 -->
     <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
  </dependencies>
 
  <!-- 编译插件 -->
  <build>
      <plugins>
            <!--spring boot maven插件-->
          <plugin>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
      </plugins>
  </build>
 
</project>

注意:
(1)引入kafka客户端

<!-- kafka客户端 -->
     <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
  <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>

2)添加springboot-maven插件

<!-- 编译插件 -->
  <build>
      <plugins>
            <!--spring boot maven插件-->
          <plugin>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-maven-plugin</artifactId>
          </plugin>
      </plugins>
  </build>

创建kafka配置

package com.donwait.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
@EnableKafka
public class KafkaConfig {
     @Value("${kafka.broker-list}")
    private String brokers;
     @Value("${kafka.producer.retries}")
    private Integer producer_retries;
     @Value("${kafka.producer.batch-size}")
    private Integer producer_batch_size;
     @Value("${kafka.producer.linger-ms}")
    private Integer producer_linger_ms;
     @Value("${kafka.producer.buffer-memory}")
    private Integer producer_buffer_memory;
     @Value("${kafka.producer.key-serializer}")
    private String producer_key_serializer;
     @Value("${kafka.producer.value-serializer}")
    private String producer_value_serializer;
     
     @Value("${kafka.consumer.topic}")
    private String consumer_topic;
     @Value("${kafka.consumer.gourp-id}")
    private String consumer_gourp_id;
     @Value("${kafka.consumer.enable-auto-commit}")
    private boolean consumer_enable_auto_commit;
     @Value("${kafka.consumer.auto-commit-ms}")
    private String consumer_auto_commit_ms;
     @Value("${kafka.consumer.session-timeout-ms}")
    private String consumer_session_timeout_ms;
     @Value("${kafka.consumer.key-deserializer}")
    private String consumer_key_deserializer;
     @Value("${kafka.consumer.value-deserializer}")
    private String consumer_value_deserializer;
     
     @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
     
     @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
     
     /**
      * 消费者参数配置
      * @return
      */
     @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_gourp_id);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer_enable_auto_commit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumer_auto_commit_ms);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumer_session_timeout_ms);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer_key_deserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer_value_deserializer);
        return props;
    }
     
     @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
     
     /**
      * 生产者参数配置
      * @return
      */
     @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ProducerConfig.RETRIES_CONFIG, producer_retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, producer_batch_size);
        props.put(ProducerConfig.LINGER_MS_CONFIG, producer_linger_ms);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producer_buffer_memory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer_key_serializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer_value_serializer);
        return props;
    }
     
     @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        System.out.println("init");
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

系统配置信息

系统的application.yml配置内容如下:

#服务配置
server:
  port: 7002
spring:
  application:
    name: kafka
#日志信息配置
logging:
  level:
    org.springframework.cloud.gateway: TRACE
    org.springframework.http.server.reactive: DEBUG
    org.springframework.web.reactive: DEBUG
    reactor.ipc.netty: DEBUG
#Spring Boot Actuator:监控系统配置
endpoints:
  shutdown:
    enabled: true
    path: /shutdown
    sensitive: true
management:
  security:
    enabled: false
kafka:
   broker-list: 192.168.12.150:9092,192.168.12.151:9092,192.168.12.152:9092
   producer:
      #发送失败后的重试次数,默认0
      retries: 1
      #以字节为单位控制默认的批量大小
      batch-size: 0
      #延迟时间
      linger-ms: 1
      #缓冲等待发送到服务器的记录的总内存字节数
      buffer-memory: 33554432
      #实现Serializer接口的序列化类键
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #实现Serializer接口的序列化类值
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
   consumer:
      #消费的主题
      topic: test-topic
      #消费者组id
      gourp-id: test-group
      #是否自动提交偏移量
      enable-auto-commit: true
      #提交偏移量的间隔-毫秒
      auto-commit-ms: 1000
      #客户端消费的会话超时时间-毫秒
      session-timeout-ms: 10000
      #实现DeSerializer接口的反序列化类键
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #实现DeSerializer接口的反序列化类值
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

启动入口

如果不需要测试生产者入口为:

package com.donwait;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import lombok.extern.slf4j.Slf4j;
/**
 * 实现命令行接口,从命令行读取参数发送
 * @author Administrator
 *
 */
@SpringBootApplication
@Slf4j
public class KafkaApp {
     /**
      * kafka消费
      * @param cr
      * @throws Exception
      */
     @KafkaListener(topics = "test-topic")
    public void listen(ConsumerRecord<String, String> cr) throws Exception {
        log.info("我是消费者:{}:{}", cr.key(), cr.value());
        //latch.countDown();
    }
     
     public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args).close();
    }
}

如果需要测试生产者,则实现命令中发送数据即可:

package com.donwait;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
importorg.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import lombok.extern.slf4j.Slf4j;
/**
 * 实现命令行接口,从命令行读取参数发送
 * @author Administrator
 *
 */
@SpringBootApplication
@Slf4j
public class KafkaApp implements CommandLineRunner {
     // kafka模板
     @Autowired
    private KafkaTemplate<String, String> template;
     // 计数器-等待3个消息接收完成
     private final CountDownLatch latch = new CountDownLatch(3);
     @Override
     public void run(String... args) throws Exception {
          System.out.println("发送信息...");
        this.template.send("test-topic", "foo1");
        this.template.send("test-topic", "foo2");
        this.template.send("test-topic", "foo3");
       
        // 等待60秒接收完成退出
        latch.await(60, TimeUnit.SECONDS);
        log.info("接收完成");
     }
     
     /**
      * kafka消费
      * @param cr
      * @throws Exception
      */
     @KafkaListener(topics = "test-topic")
    public void listen(ConsumerRecord<String, String> cr) throws Exception {
        log.info("我是消费者:{}:{}", cr.key(), cr.value());
        //latch.countDown();
    }
     
     public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args).close();
    }
}
上一篇:Spring整合ActiveMQ


下一篇:Tensorflow踩坑系列---数据读取文件队列