SpringBoot 3/4 与 Kafka 的简单使用

yufei       19 小时, 58 分钟 前       16

  1. 首先在 https://start.spring.io/ 里选择 Spring for Apache KafkaSpring Web ( 仅做测试用)

  2. 然后添加 com.fasterxml.jackson 相关依赖。完成后配置 pom.xml 里的 dependencies 类似

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.20.1</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.20.1</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>2.20</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-webmvc</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-kafka-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-webmvc-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
  3. 配置 application.yml 添加 Kafka 相关配置

    spring:
      kafka:
        consumer:
          bootstrap-servers: localhost:9092
          # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
          auto-offset-reset: earliest
        producer:
          bootstrap-servers: localhost:9092
          # 发送的对象信息变为json格式
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    kafka:
      topic:
        my-topic: my-topic
        my-topic2: my-topic2
    
  4. 添加配置类 KafkaConfig.java

    package me.try4.hss.kafka.config;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.support.converter.RecordMessageConverter;
    import org.springframework.kafka.support.converter.StringJacksonJsonMessageConverter;
    
    @Configuration
    public class KafkaConfig {
    
        @Value("${kafka.topic.my-topic}")
        String myTopic;
    
        @Value("${kafka.topic.my-topic2}")
        String myTopic2;
    
        /**
         * JSON消息转换器
         */
        @Bean
        public RecordMessageConverter jsonConverter() {
            return new StringJacksonJsonMessageConverter();
        }
    
        /**
         * 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。
         */
        @Bean
        public NewTopic myTopic() {
            return new NewTopic(myTopic, 2, (short) 1);
        }
    
        @Bean
        public NewTopic myTopic2() {
            return new NewTopic(myTopic2, 1, (short) 1);
        }
    }
    
  5. 添加消息实体

    package me.try4.hss.kafka.config;
    
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.support.converter.RecordMessageConverter;
    import org.springframework.kafka.support.converter.StringJacksonJsonMessageConverter;
    
    @Configuration
    public class KafkaConfig {
    
        @Value("${kafka.topic.my-topic}")
        String myTopic;
    
        @Value("${kafka.topic.my-topic2}")
        String myTopic2;
    
        /**
         * JSON消息转换器
         */
        @Bean
        public RecordMessageConverter jsonConverter() {
            return new StringJacksonJsonMessageConverter();
        }
    
        /**
         * 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。
         */
        @Bean
        public NewTopic myTopic() {
            return new NewTopic(myTopic, 2, (short) 1);
        }
    
        @Bean
        public NewTopic myTopic2() {
            return new NewTopic(myTopic2, 1, (short) 1);
        }
    }
    
  6. 添加生产者

    package me.try4.hss.kafka.service;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Service;
    
    import java.util.concurrent.CompletableFuture;
    
    @Service
    @Slf4j
    public class BookProducerService {
    
        private final KafkaTemplate<String, Object> kafkaTemplate;
    
        public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public void sendMessage(String topic, Object o) {
            // 分区编号最好为 null,交给 kafka 自己去分配
            ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, null, System.currentTimeMillis(), String.valueOf(o.hashCode()), o);
    
            CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(producerRecord);
    
            future.thenApply(
                    result -> {
                        log.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition());
                        return "";
                    })
                    .exceptionally(
                    ex -> {
                        log.error("生产者发送消失败,原因:{}", ex.getMessage());
                        return null;
                    }
                    );
        }
    }
    
  7. 添加消费者

    package me.try4.hss.kafka.service;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import lombok.extern.slf4j.Slf4j;
    import me.try4.hss.kafka.entity.Book;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    
    @Service
    @Slf4j
    public class BookConsumerService {
    
        @Value("${kafka.topic.my-topic}")
        private String myTopic;
    
        @Value("${kafka.topic.my-topic2}")
        private String myTopic2;
    
        private final ObjectMapper objectMapper = new ObjectMapper();
    
        @KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1")
        public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) {
            try {
                Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class);
                log.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString());
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    
        @KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2")
        public void consumeMessage2(Book book) {
            log.info("消费者消费{}的消息 -> {}", myTopic2, book.toString());
        }
    }
    
  8. 添加控制器

    package me.try4.hss.kafka.controller;
    
    import lombok.RequiredArgsConstructor;
    import me.try4.hss.kafka.entity.Book;
    import me.try4.hss.kafka.service.BookProducerService;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.util.StringUtils;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Random;
    import java.util.concurrent.atomic.AtomicLong;
    
    @RestController
    @RequiredArgsConstructor
    @RequestMapping("message")
    public class MessageController {
    
        @Value("${kafka.topic.my-topic}")
        String myTopic;
    
        private final BookProducerService bookProducerService;
    
        private AtomicLong atomicLong = new AtomicLong();
    
        @GetMapping("send")
        public String send(@RequestParam String msg) {
    
            Book newBook = Book.builder().id(atomicLong.addAndGet(1)).name(msg).build();
            bookProducerService.sendMessage(myTopic,newBook);
            return "ok";
        }
    }
    
  9. 程序起来后,访问下面的网页,就可以看到消息了

    http://localhost:9090/message/send?msg=oka1

目前尚无回复
简单教程 = 简单教程,简单编程
简单教程 是一个关于技术和学习的地方
现在注册
已注册用户请 登入
关于   |   FAQ   |   我们的愿景   |   广告投放   |  博客

  简单教程,简单编程 - IT 入门首选站

Copyright © 2013-2022 简单教程 twle.cn All Rights Reserved.