-
首先在 https://start.spring.io/ 里选择
Spring for Apache Kafka和Spring Web( 仅做测试用) -
然后添加
com.fasterxml.jackson相关依赖。完成后配置pom.xml里的dependencies类似<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-kafka</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.20.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.20.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.20</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webmvc</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webmvc-test</artifactId> <scope>test</scope> </dependency> </dependencies>
-
配置
application.yml添加 Kafka 相关配置spring: kafka: consumer: bootstrap-servers: localhost:9092 # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息) auto-offset-reset: earliest producer: bootstrap-servers: localhost:9092 # 发送的对象信息变为json格式 value-serializer: org.springframework.kafka.support.serializer.JsonSerializer kafka: topic: my-topic: my-topic my-topic2: my-topic2
-
添加配置类
KafkaConfig.javapackage me.try4.hss.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.StringJacksonJsonMessageConverter; @Configuration public class KafkaConfig { @Value("${kafka.topic.my-topic}") String myTopic; @Value("${kafka.topic.my-topic2}") String myTopic2; /** * JSON消息转换器 */ @Bean public RecordMessageConverter jsonConverter() { return new StringJacksonJsonMessageConverter(); } /** * 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。 */ @Bean public NewTopic myTopic() { return new NewTopic(myTopic, 2, (short) 1); } @Bean public NewTopic myTopic2() { return new NewTopic(myTopic2, 1, (short) 1); } }
-
添加消息实体
package me.try4.hss.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.StringJacksonJsonMessageConverter; @Configuration public class KafkaConfig { @Value("${kafka.topic.my-topic}") String myTopic; @Value("${kafka.topic.my-topic2}") String myTopic2; /** * JSON消息转换器 */ @Bean public RecordMessageConverter jsonConverter() { return new StringJacksonJsonMessageConverter(); } /** * 通过注入一个 NewTopic 类型的 Bean 来创建 topic,如果 topic 已存在,则会忽略。 */ @Bean public NewTopic myTopic() { return new NewTopic(myTopic, 2, (short) 1); } @Bean public NewTopic myTopic2() { return new NewTopic(myTopic2, 1, (short) 1); } }
-
添加生产者
package me.try4.hss.kafka.service; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; @Service @Slf4j public class BookProducerService { private final KafkaTemplate<String, Object> kafkaTemplate; public BookProducerService(KafkaTemplate<String, Object> kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String topic, Object o) { // 分区编号最好为 null,交给 kafka 自己去分配 ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, null, System.currentTimeMillis(), String.valueOf(o.hashCode()), o); CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(producerRecord); future.thenApply( result -> { log.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()); return ""; }) .exceptionally( ex -> { log.error("生产者发送消失败,原因:{}", ex.getMessage()); return null; } ); } }
-
添加消费者
package me.try4.hss.kafka.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import me.try4.hss.kafka.entity.Book; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @Slf4j public class BookConsumerService { @Value("${kafka.topic.my-topic}") private String myTopic; @Value("${kafka.topic.my-topic2}") private String myTopic2; private final ObjectMapper objectMapper = new ObjectMapper(); @KafkaListener(topics = {"${kafka.topic.my-topic}"}, groupId = "group1") public void consumeMessage(ConsumerRecord<String, String> bookConsumerRecord) { try { Book book = objectMapper.readValue(bookConsumerRecord.value(), Book.class); log.info("消费者消费topic:{} partition:{}的消息 -> {}", bookConsumerRecord.topic(), bookConsumerRecord.partition(), book.toString()); } catch (JsonProcessingException e) { e.printStackTrace(); } } @KafkaListener(topics = {"${kafka.topic.my-topic2}"}, groupId = "group2") public void consumeMessage2(Book book) { log.info("消费者消费{}的消息 -> {}", myTopic2, book.toString()); } }
-
添加控制器
package me.try4.hss.kafka.controller; import lombok.RequiredArgsConstructor; import me.try4.hss.kafka.entity.Book; import me.try4.hss.kafka.service.BookProducerService; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import java.util.Random; import java.util.concurrent.atomic.AtomicLong; @RestController @RequiredArgsConstructor @RequestMapping("message") public class MessageController { @Value("${kafka.topic.my-topic}") String myTopic; private final BookProducerService bookProducerService; private AtomicLong atomicLong = new AtomicLong(); @GetMapping("send") public String send(@RequestParam String msg) { Book newBook = Book.builder().id(atomicLong.addAndGet(1)).name(msg).build(); bookProducerService.sendMessage(myTopic,newBook); return "ok"; } }
-
程序起来后,访问下面的网页,就可以看到消息了
目前尚无回复