❒ 使用Kafka在消息的收发过程都会出现消息丢失,Kafka分别给出了解决方案
✔ 生产者发送消息到Brocker丢失
✔ 消息在Brocker中存储丢失
✔ 消费者从Brocker接收消息丢失
producer -> broker(topic) -> consumer
❒ 设置异步发送
// 同步发送
RecordMetadata recordMetadatÃs kafkaProducer.send(record).get();
// 异步发送
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null){
System.out.println("消息发送失败|记录日志");
}
long offset = recordMetadata.offset();
int partition = recordMetadata.partition();
String topic = recordMetadata.topic();
});
❒ 消息重试
// 设置重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
❒ 消息在Brocker中存储丢失
✔ acks = 0 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
✔ acks = 1(默认值) 只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
✔ acks = all 只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
❒ 消费者从Brocker接收消息丢失
✔ Kafka中的分区机制指的是将每个主题划分成多个分区(Partition)
✔ topic分区中消息只能由消费者组中的唯一一个消费者处理,不同的分区分配给不同的消费者(同一个消费者组)
❒ 禁用自动提交偏移量,改为手动
✔ 同步提交:如果一切正常,我们使用consumer.commitAsync()方法来提交。这样速度更快,而且即使这次提交失败,下次提交也可能会成功。
✔ 异步提交:如果直接关闭消费者,那就没有所谓的“下一次提交”了。使用consumer.commitSync()方法会一直重试,直到提交成功或者发生无法恢复的错误。
✔ 同步(在finally中写入同步提交逻辑) + 异步(每处理一条便异步提交一条)组合提交