记一次kafka消息丢失问题排查

news/2024/9/20 5:58:28 标签: kafka

背景

我写了一个 自定义分区器 ,自测发送了一些简单的如Hello world 之类的消息成功了,并且日志现实确实调用了我自己的分区器,然后我自认为已经完美了。

后来我发现很多消息消费者没有消费,  且发送完成回调(CallBack)没有被调用。 消息就这么石沉大海,也没有错误日志。

排查过程

一开始我从消息丢失方向排查,我前面文章也提到 消息发送几种方式 ,其中异步发送确实可能丢失,但我发送代码如下,我处理了异常,如果发送失败应该有异常打印出来的。

我一时也陷入迷茫,于是我网上查阅资料,关于生产者几个重要配置参数

acks 分区中有多少副本收到消息,生产者才认为消息发送成功 ,默认值1 ,并且我本地使用单节点,因此排除该配置

有些网上说 batch.size 和 request.timeout.ms 不对,我半信半疑,改了参数试试,依旧有问题

producer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        } else {
            System.out.println(content + " send sucesss  " + recordMetadata.toString());
        }
    }
});

定位问题

肯定是我修改了什么导致的问题, 于是我先把所有配置修改为默认的,发现能正常发送消息,消息也被消费。最终发现一旦使用我自定义分区器,消息发送就有问题。代码如下,聪明的你看出问题在哪了吗?

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //获取分区总个数
        int numPartitions = partitions.size();

        int partition = key != null ? key.hashCode() % numPartitions : value.hashCode() % numPartitions;
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

答案揭晓

问题在于 HashCode 返回的可能是负数 ,负数再对 分区数 取模,得到的分区序号 就为负数。分区序号一旦为负,分区序号有问题了当然消息就发不出去,只是没想到也没报错。定位问题修复也很简单,给HashCode 取绝对值就解决了。

 int partition = key != null ? Math.abs(key.hashCode()) % numPartitions : Math.abs(value.hashCode()) % numPartitions;

举一反三

如果返回的分区序号大于分区数 消息发送也同负数情况一样,石沉大海。

分区数配置为 10 ,分区器返回序号固定11 测试结果同预期一样,和负数情况一样,消息丢失。

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 11;
    }

反思

  • 没意识到 默认HashCode 可能为负数(当时误任务默认的不会为负数,除非恶意重写了HashCod方法)

  • 没有参考默认的分区器 DefaultPartitioner使用了 Utils.toPositive 将HashCode 转成了正数

  • 排查过程也熟悉了不少 kafka重要参数
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
        return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

结语

限于篇幅一些代码没有给出,欢迎私下交流学习。


http://www.niftyadmin.cn/n/5666674.html

相关文章

2024 go-zero社交项目实战

背景 一位商业大亨&#xff0c;他非常看好国内的社交产品赛道&#xff0c;想要造一款属于的社交产品&#xff0c;于是他找到了负责软件研发的小明。 小明跟张三一拍即合&#xff0c;小明决定跟张三大干一番。 社交产品MVP版本需求 MVP指&#xff1a;Minimum Viable Product&…

网络编程:掌握TCP Socket和UDP Socket

IP地址&#xff1a; 两台计算机通信&#xff0c;双方都必须有IP地址。 IPV4地址有32位&#xff0c;由四个8位二进制组成&#xff0c;因为不好记所以我们把二进制转化为十进制&#xff0c;比如192.168.0.20&#xff0c;这称为点分十进制。 IPV6有128位&#xff0c;由8个16位的…

Java项目实战II基于Java+Spring Boot+MySQL的房屋租赁管理系统的设计与实现

目录 一、前言 二、技术介绍 三、系统实现 四、论文参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 随着城市租…

1网络安全的基本概念

文章目录 网络安全的基本概念可以总结为以下几个方面&#xff1a; 网络安全的需求&#xff1a; 信息安全的重要性&#xff1a;信息安全是计算机、通信、物理、数学等领域的交叉学科&#xff0c;对于社会的发展至关重要。信息安全的目标&#xff1a;主要包括保密性、完整性、可用…

数据库系统原理(第一章 数据库概述)

文章目录 1.数据1.1数据的概念1.2数据与信息的关系1.3数据使用 2.数据管理3.数据库与数据库管理系统3.1数据库3.2数据库管理系统 4.数据库系统4.1数据库系统组成4.2 数据库系统的特点4.3数据库系统体系结构4.3.1内部体系结构4.3.2外部体系结构 本文首先从数据讲起&#xff0c;然…

深度解析ElasticSearch:构建高效搜索与分析的基石原创

引言 在数据爆炸的时代&#xff0c;如何快速、准确地从海量数据中检索出有价值的信息成为了企业面临的重要挑战。ElasticSearch&#xff0c;作为一款基于Lucene的开源分布式搜索和分析引擎&#xff0c;凭借其强大的实时搜索、分析和扩展能力&#xff0c;成为了众多企业的首选。…

tomcat,el表达式执行带参数命令,字符串数组,String[],el表达式注入

准备环境: docker pull tomcat:8;docker run --name tomcat8 -p 808:8080 -v /tmp/CC:/usr/local/tomcat/webapps/ -d tomcat:8;如下为 /tmp/CC/app/index.jsp <% page language"java" contentType"text/html; charsetUTF-8" pageEncoding"UTF-8…

Java 数据类型转换详解:隐式转换(自动转换)与强制转换(手动转换)

目录 前言 取值范围从小到大的关系&#xff1a; 隐式转换&#xff08;自动转换&#xff09; &#x1f4dc;示例 1&#xff1a;基本类型隐式转换 &#x1f4dc;示例 2&#xff1a;算术运算中的类型提升 &#x1f4dc;示例 3&#xff1a;byte、short 和 char 的自动转换 隐…