跳至主要內容

Kafka - 4 Spring集成Kafka

code中间件Kafka约 2182 字大约 7 分钟

SpringBoot集成Kafka

简单使用

对于Kafka,你更应该从各个角度建立起一个完整的数据流转的模型,通过这些模型去回顾Kafka的重要设计,并且尝试去验证自己的一些理解。这样才能真正去理解Kafka的强大之处。

​当你掌握了Kafka的核心消息流转模型时,也可以帮助你去了解Kafka更多的应用生态。比如SpringBoot集成Kafka,其实非常简单。就分三步

在SpringBoot项目中,引入Maven依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

在application.yml中配置kafka相关参数

比如

kafka:
  enabled: true  # 启用 Kafka
  bootstrap-servers: 127.0.0.1:9092 # Kafka 服务器地址
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    acks: all
    retries: 3
    enable-idempotence: true
    transactional-id-prefix: file-upload-tx-
    properties:
      client.dns.lookup: use_all_dns_ips
  consumer:
    group-id: file-processing-group # 消费者组 ID
    auto-offset-reset: earliest
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    properties:
      spring.json.trusted.packages: "*" # 允许反序列化的包
      client.dns.lookup: use_all_dns_ips
  topic:
    file-processing: file-processing-topic1 # 新增的 Topic 配置
    dlt: file-processing-dlt # 死信队列主题

或者在对应 properties 配置

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=worker1:9092,worker2:9093,worker3:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto-commit-interval=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

应用中使用框架注入的KafkaTemplate发送消息

@RestController
public class KafkaProducer {
  @Autowired
  private KafkaTemplate<String, Object> kafkaTemplate;
  // 发送消息
  @GetMapping("/kafka/normal/{message}")
  public void sendMessage1(@PathVariable("message") String normalMessage) {
      kafkaTemplate.send("topic1", normalMessage);
  }
}

使用@KafkaListener注解声明消息消费者

@Component
public class KafkaConsumer {
  // 消费监听
  @KafkaListener(topics = {"topic1"})
  public void onMessage1(ConsumerRecord<?, ?> record){
      // 消费的哪个topic、partition的消息,打印出消息内容
      System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
  }
}

这部分的应用本来就非常简单,而且他的本质也是在框架中构建Producer和Consumer。当你了解了kafka的核心消息流转流程,对这些应用参数就可以进行合理的组装,那么分分钟就可以上手SpringBoot集成Kafka框架的

发送消息 KafkaTemplate

KafkaTemplate 封装了一个生产者,并提供了便捷方法将数据发送到 Kafka 主题

该类构造方法需要传递一个 ProducerFactory 对象,来表示如何根据对应配置创建一个生产者从而进行消息传递

public KafkaTemplate(ProducerFactory<K, V> producerFactory) {
  this(producerFactory, false);
}

public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
  this(producerFactory, autoFlush, (Map)null);
}

public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush, @Nullable Map<String, Object> configOverrides) {
  this.logger = new LogAccessor(LogFactory.getLog(this.getClass()));
  this.producers = new ConcurrentHashMap();
  this.micrometerTags = new HashMap();
  this.clusterIdLock = new ReentrantLock();
  this.beanName = "kafkaTemplate";
  this.messageConverter = new MessagingMessageConverter();
  this.producerListener = new LoggingProducerListener();
  this.closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT;
  this.micrometerEnabled = true;
  this.observationRegistry = ObservationRegistry.NOOP;
  Assert.notNull(producerFactory, "'producerFactory' cannot be null");
  this.autoFlush = autoFlush;
  this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
  this.customProducerFactory = !CollectionUtils.isEmpty(configOverrides);
  if (this.customProducerFactory) {
      this.producerFactory = producerFactory.copyWithConfigurationOverride(configOverrides);
  } else {
      this.producerFactory = producerFactory;
  }

  this.transactional = this.producerFactory.transactionCapable();
}

创建 KafkaTemplate

就需要提供一个 ProducerFactory 对象 生产者工厂,规定了一些基础配置,然后在 Template 构造时引用

ProducerConfig 是 Kafka 客户端里定义的生产者配置项大全的类

例子

@Bean
public ProducerFactory producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map producerConfigs() {
    Map props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/41/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
}

此外,比如我们前面注册了一个 ProducerFactory<String, Object> 的类, 我们在穿件别的,可以覆盖这个工厂的 ProducerConfig 属性,以使用与同一工厂不同的生产者配置创建模板

@Bean
public KafkaTemplate<String, Object> reliableTemplate(ProducerFactory<String, Object> pf) {
  Map<String, Object> overrides = new HashMap<>();
  overrides.put(ProducerConfig.ACKS_CONFIG, "all");
  overrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
  overrides.put(ProducerConfig.RETRIES_CONFIG, 3);

  return new KafkaTemplate<>(pf, overrides);
}

KafkaTemplate方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

主要方法有 sendsendDefault,并且返回的都是 Future 类型 CompletableFuture

The sendDefault API requires that a default topic has been provided to the template.

public CompletableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {
  return this.send(this.defaultTopic, data);
}

即用 sendDefault 方法需要确定 defaultTopic

send 需要自己指明

调用 kafkaTemplate.metrics() (拿生产者的监控指标) 和 kafkaTemplate.partitionsFor(topic) (查 topic 的分区信息)

KafkaTemplate 不自己实现这些功能,它会把调用原封不动转发给底层的 org.apache.kafka.clients.producer.KafkaProducer(或者说 Producer 实例)去执行

调用 KafkaTemplate.send(Message<?> message) 这种方法时, 消息的元信息(topic、partition、key、timestamp)不再写在参数里,而是放在 Message 的 header(消息头)里传递

Message

Spring 自己定义了一个通用消息接口:org.springframework.messaging.Message<T>

它的实现在 GenericMessage<T>

主要就是有两个属性一个是 payload 一个是 headers

如果用 Message 传信息就可以这样写

import org.springframework.messaging.support.MessageBuilder;
import org.springframework.kafka.support.KafkaHeaders;

Message<FileProcessingTask> message = MessageBuilder
        .withPayload(task)
        .setHeader(KafkaHeaders.TOPIC, kafkaConfig.getFileProcessingTopic())
        .setHeader(KafkaHeaders.KEY, request.fileMd5())
        .setHeader(KafkaHeaders.TIMESTAMP, System.currentTimeMillis())
        .build();

kafkaTemplate.send(message);

向 Kafka 发送消息

发送方法返回一个 CompletableFuture<SendResult> 对象,我们需要用 whenComplete(...) 去写回调

非阻塞

public void sendToKafka(final MyOutputData data) {
  final ProducerRecord<String, String> record = createRecord(data);

  CompletableFuture<SendResult<String, String>> future = template.send(record);
  future.whenComplete((result, ex) -> {
      if (ex == null) {
          handleSuccess(data);
      }
      else {
          handleFailure(data, record, ex);
      }
  });
}

阻塞

public void sendToKafka(final MyOutputData data) {
  final ProducerRecord<String, String> record = createRecord(data);

  try {
      template.send(record).get(10, TimeUnit.SECONDS);
      handleSuccess(data);
  }
  catch (ExecutionException e) {
      handleFailure(data, record, e.getCause());
  }
  catch (TimeoutException | InterruptedException e) {
      handleFailure(data, record, e);
  }
}

事务

Spring Kafka 在 executeInTransaction 里会做这些事(概念上):

  • 从 ProducerFactory 拿一个“事务型 Producer”(前提是你设置了 transactionIdPrefix)
  • beginTransaction()
  • 执行你的 lambda:send(...)
  • commitTransaction()
    • 如果 lambda 里抛异常或发送失败导致异常冒泡:abortTransaction()

用事务是希望这个任务消息一定要可靠地发出去,最好和我前面那堆数据库操作(更新 file_upload 状态、写记录等)形成一个一致的提交边界

监听 ProducerListener

可以为 KafkaTemplate 配置一个 ProducerListener,以获取发送结果(成功或失败)的异步回调,而不是等待 Future 完成

public interface ProducerListener<K, V> {
  default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
  }

    default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
  }
}

示例

可以 KafkaProducerListenerConfig.java 建一个监听配置

@Component
public class KafkaProducerLogListener implements ProducerListener<String, Object> {

    @Override
    public void onSuccess(ProducerRecord<String, Object> record, RecordMetadata metadata) {
        log.info("sent ok topic={}, partition={}, offset={}, key={}",
                metadata.topic(), metadata.partition(), metadata.offset(), record.key());
    }

    @Override
    public void onError(ProducerRecord<String, Object> record,
                        RecordMetadata metadata,
                        Exception exception) {
        String meta = (metadata == null)
                ? "meta=null"
                : String.format("partition=%d, offset=%d", metadata.partition(), metadata.offset());

        log.error("send failed topic={}, key={}, {}",
                record.topic(), record.key(), meta, exception);
    }
}

然后在 KafkaTemplate 那里注入它:

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(
  ProducerFactory<String, Object> pf,
  ProducerListener<String, Object> producerListener) {
  KafkaTemplate<String, Object> template = new KafkaTemplate<>(pf);
  template.setProducerListener(producerListener);
  return template;
}

接收消息

示例

监听 Kafka 的 test topic,最简单就 3 步:加依赖 → 配置 consumer → 写 @KafkaListener

1.加依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

2.配置 application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest   # 第一次消费从最早开始(可选)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.写监听器 Consumer

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class TestTopicListener {
    @KafkaListener(topics = "test", groupId = "test-group")
    public void onMessage(String msg) {
        System.out.println("收到 test topic 消息: " + msg);
    }
}

消息监听器接口

使用消息监听器容器时,您必须提供一个监听器来接收数据。目前有八种支持的消息监听器接口

public interface MessageListener<K, V> {
  void onMessage(ConsumerRecord<K, V> data);
}

public interface AcknowledgingMessageListener<K, V> {
  void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
  void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
  void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

public interface BatchMessageListener<K, V> {
  void onMessage(List<ConsumerRecord<K, V>> data);
}

public interface BatchAcknowledgingMessageListener<K, V> {
  void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
  void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
  void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
上次编辑于: