Kafka - 4 Spring集成Kafka
SpringBoot集成Kafka
简单使用
对于Kafka,你更应该从各个角度建立起一个完整的数据流转的模型,通过这些模型去回顾Kafka的重要设计,并且尝试去验证自己的一些理解。这样才能真正去理解Kafka的强大之处。
当你掌握了Kafka的核心消息流转模型时,也可以帮助你去了解Kafka更多的应用生态。比如SpringBoot集成Kafka,其实非常简单。就分三步
在SpringBoot项目中,引入Maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在application.yml中配置kafka相关参数
比如
kafka:
enabled: true # 启用 Kafka
bootstrap-servers: 127.0.0.1:9092 # Kafka 服务器地址
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
retries: 3
enable-idempotence: true
transactional-id-prefix: file-upload-tx-
properties:
client.dns.lookup: use_all_dns_ips
consumer:
group-id: file-processing-group # 消费者组 ID
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*" # 允许反序列化的包
client.dns.lookup: use_all_dns_ips
topic:
file-processing: file-processing-topic1 # 新增的 Topic 配置
dlt: file-processing-dlt # 死信队列主题
或者在对应 properties 配置
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=worker1:9092,worker2:9093,worker3:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto-commit-interval=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
应用中使用框架注入的KafkaTemplate发送消息
@RestController
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
// 发送消息
@GetMapping("/kafka/normal/{message}")
public void sendMessage1(@PathVariable("message") String normalMessage) {
kafkaTemplate.send("topic1", normalMessage);
}
}
使用@KafkaListener注解声明消息消费者
@Component
public class KafkaConsumer {
// 消费监听
@KafkaListener(topics = {"topic1"})
public void onMessage1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
这部分的应用本来就非常简单,而且他的本质也是在框架中构建Producer和Consumer。当你了解了kafka的核心消息流转流程,对这些应用参数就可以进行合理的组装,那么分分钟就可以上手SpringBoot集成Kafka框架的
发送消息 KafkaTemplate
KafkaTemplate 封装了一个生产者,并提供了便捷方法将数据发送到 Kafka 主题
该类构造方法需要传递一个 ProducerFactory 对象,来表示如何根据对应配置创建一个生产者从而进行消息传递
public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
this(producerFactory, false);
}
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
this(producerFactory, autoFlush, (Map)null);
}
public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, @Nullable Map<String, Object> configOverrides) {
this.logger = new LogAccessor(LogFactory.getLog(this.getClass()));
this.producers = new ConcurrentHashMap();
this.micrometerTags = new HashMap();
this.clusterIdLock = new ReentrantLock();
this.beanName = "kafkaTemplate";
this.messageConverter = new MessagingMessageConverter();
this.producerListener = new LoggingProducerListener();
this.closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
this.micrometerEnabled = true;
this.observationRegistry = ObservationRegistry.NOOP;
Assert.notNull(producerFactory, "'producerFactory' cannot be null");
this.autoFlush = autoFlush;
this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
this.customProducerFactory = !CollectionUtils.isEmpty(configOverrides);
if (this.customProducerFactory) {
this.producerFactory = producerFactory.copyWithConfigurationOverride(configOverrides);
} else {
this.producerFactory = producerFactory;
}
this.transactional = this.producerFactory.transactionCapable();
}
创建 KafkaTemplate
就需要提供一个 ProducerFactory 对象 生产者工厂,规定了一些基础配置,然后在 Template 构造时引用
ProducerConfig 是 Kafka 客户端里定义的生产者配置项大全的类
例子
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map producerConfigs() {
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/41/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
此外,比如我们前面注册了一个 ProducerFactory<String, Object> 的类, 我们在穿件别的,可以覆盖这个工厂的 ProducerConfig 属性,以使用与同一工厂不同的生产者配置创建模板
@Bean
public KafkaTemplate<String, Object> reliableTemplate(ProducerFactory<String, Object> pf) {
Map<String, Object> overrides = new HashMap<>();
overrides.put(ProducerConfig.ACKS_CONFIG, "all");
overrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
overrides.put(ProducerConfig.RETRIES_CONFIG, 3);
return new KafkaTemplate<>(pf, overrides);
}
KafkaTemplate方法
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
主要方法有 send 和 sendDefault,并且返回的都是 Future 类型 CompletableFuture
The sendDefault API requires that a default topic has been provided to the template.
public CompletableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {
return this.send(this.defaultTopic, data);
}
即用 sendDefault 方法需要确定 defaultTopic
而 send 需要自己指明
调用 kafkaTemplate.metrics() (拿生产者的监控指标) 和 kafkaTemplate.partitionsFor(topic) (查 topic 的分区信息)
KafkaTemplate 不自己实现这些功能,它会把调用原封不动转发给底层的 org.apache.kafka.clients.producer.KafkaProducer(或者说 Producer 实例)去执行
调用 KafkaTemplate.send(Message<?> message) 这种方法时, 消息的元信息(topic、partition、key、timestamp)不再写在参数里,而是放在 Message 的 header(消息头)里传递
Message
Spring 自己定义了一个通用消息接口:org.springframework.messaging.Message<T>
它的实现在 GenericMessage<T>
主要就是有两个属性一个是 payload 一个是 headers
如果用 Message 传信息就可以这样写
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.kafka.support.KafkaHeaders;
Message<FileProcessingTask> message = MessageBuilder
.withPayload(task)
.setHeader(KafkaHeaders.TOPIC, kafkaConfig.getFileProcessingTopic())
.setHeader(KafkaHeaders.KEY, request.fileMd5())
.setHeader(KafkaHeaders.TIMESTAMP, System.currentTimeMillis())
.build();
kafkaTemplate.send(message);
向 Kafka 发送消息
发送方法返回一个 CompletableFuture<SendResult> 对象,我们需要用 whenComplete(...) 去写回调
非阻塞
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
阻塞
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
事务
Spring Kafka 在 executeInTransaction 里会做这些事(概念上):
- 从 ProducerFactory 拿一个“事务型 Producer”(前提是你设置了 transactionIdPrefix)
- beginTransaction()
- 执行你的 lambda:send(...)
- commitTransaction()
- 如果 lambda 里抛异常或发送失败导致异常冒泡:abortTransaction()
用事务是希望这个任务消息一定要可靠地发出去,最好和我前面那堆数据库操作(更新 file_upload 状态、写记录等)形成一个一致的提交边界
监听 ProducerListener
可以为 KafkaTemplate 配置一个 ProducerListener,以获取发送结果(成功或失败)的异步回调,而不是等待 Future 完成
public interface ProducerListener<K, V> {
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
}
}
示例
可以 KafkaProducerListenerConfig.java 建一个监听配置
@Component
public class KafkaProducerLogListener implements ProducerListener<String, Object> {
@Override
public void onSuccess(ProducerRecord<String, Object> record, RecordMetadata metadata) {
log.info("sent ok topic={}, partition={}, offset={}, key={}",
metadata.topic(), metadata.partition(), metadata.offset(), record.key());
}
@Override
public void onError(ProducerRecord<String, Object> record,
RecordMetadata metadata,
Exception exception) {
String meta = (metadata == null)
? "meta=null"
: String.format("partition=%d, offset=%d", metadata.partition(), metadata.offset());
log.error("send failed topic={}, key={}, {}",
record.topic(), record.key(), meta, exception);
}
}
然后在 KafkaTemplate 那里注入它:
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
ProducerFactory<String, Object> pf,
ProducerListener<String, Object> producerListener) {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(pf);
template.setProducerListener(producerListener);
return template;
}
接收消息
示例
监听 Kafka 的 test topic,最简单就 3 步:加依赖 → 配置 consumer → 写 @KafkaListener
1.加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2.配置 application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest # 第一次消费从最早开始(可选)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.写监听器 Consumer
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class TestTopicListener {
@KafkaListener(topics = "test", groupId = "test-group")
public void onMessage(String msg) {
System.out.println("收到 test topic 消息: " + msg);
}
}
消息监听器接口
使用消息监听器容器时,您必须提供一个监听器来接收数据。目前有八种支持的消息监听器接口
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
