Kafka Producer - 分区机制实战

12 篇文章 3 订阅
订阅专栏

Kafka Producer - 分区机制实战

上一篇介绍了kafka Producer 生产者发送数据的程序代码,以及对生产者分区机制的相关介绍,今天继续深入的了解下分区机制的原理、测试验证、自定义分区。

在学习之前先在本地机器搭建一个单机版的双节点集群环境,方便后面做测试,另外本机使用的软件版本信息如下:

  • JDK17
  • kafka_2.13-3.3.1

搭建集群

修改配置

# 1. 在kafka根目录 创建cluster目录
mkdir cluster
# 2. 复制配置文件 模板
cp config/server.properties cluster/server_n1.properties
cp config/server.properties cluster/server_n2.properties
cp config/server.properties cluster/server_n3.properties
# 修改 server_n1.properties 并保存
vim cluster/server_n1.properties
# 更改相关内容如下
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/node1/kafka-logs 
# 修改 server_n2.properties 并保存
vim cluster/server_n2.properties
# 更改相关内容如下
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/node2/kafka-logs 
# 修改 server_n3.properties 并保存
vim cluster/server_n3.properties
# 更改相关内容如下
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/node3/kafka-logs 

启动集群

# 1. 启动zk
./bin/zookeeper-server-start.sh config/zookeeper.properties
# 2. 启动Node1
./bin/kafka-server-start.sh cluster/server_n1.properties
# 3. 启动Node2 
./bin/kafka-server-start.sh cluster/server_n2.properties
# 3. 启动Node3
./bin/kafka-server-start.sh cluster/server_n3.properties

创建topic

# 1. 创建一个分区为3的topic
./bin/kafka-topics.sh --create --topic topic_t3 --bootstrap-server localhost:9092 --partitions 3

# 2. 创建完成后,查看主题信息
./bin/kafka-topics.sh --describe --topic topic_t3 --bootstrap-server localhost:9092

在这里插入图片描述

测试代码

public class SimpleProducer {

    public static void main(String[] args) throws Exception{
        String topicName = "topic_t3";
        Properties props = new Properties();
        //指定kafka 服务器连接地址
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        // 发送失败 重试次数
        props.put("retries", 0);
        // 消息发送延迟时间 默认为0 表示消息立即发送,单位为毫秒
        props.put("linger.ms", 0);
        // 序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 10; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "message : " + i);
                Future<RecordMetadata> send = producer.send(record);
                RecordMetadata metadata = send.get();
                System.out.println(String.format("sent record(key=%s value=%s) 分区数: %d, 偏移量: %d, 时间戳: %d",
                        record.key(), record.value(),
                        metadata.partition(),metadata.offset(), metadata.timestamp()
                ));
        }
        System.out.println("Message sent successfully");
        producer.close();
    }
}

无分区无Key

ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "message : " + i);

注意上面代码,没有指定分区,Record对象也没有指定key值, Kakfa为了内部的性能考虑,会选取其中一个节点进行发送(避免多节点发送数据造成性能损耗),该机制被称为黏性分区

在这里插入图片描述

无分区指定Key

// ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "message : " + i);
// 替换成以下代码
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, String.valueOf(i) ,"message : " + i);

重新执行,测试结果

在这里插入图片描述

当key不为空时,Kafka默认使用key的hash值,来计算待发送的分区值,核心代码如下

org.apache.kafka.clients.producer.internals.BuiltInPartitioner#partitionForKey

/*
     * Default hashing function to choose a partition from the serialized key bytes
     */
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
  return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

Round-Robin 机制

// 指定轮训分区算法,将数据均匀的打散在不同的节点
props.put("partitioner.class","org.apache.kafka.clients.producer.RoundRobinPartitioner");

在这里插入图片描述

测试结果如上,整体的效果图如下

在这里插入图片描述

相关配置参数

Kafka 默认给生产者提供了许多的参数,进行分区策略的配置

  • partitioner.class - 默认值为null, 显示指定分区的策略,可以是自定义分区机制
  • partitioner.ignore.keys - 默认为false, 设置为true时,生产者不会使用键来计算分区,注意:如使用自定义分区,则此设置无效
  • partitioner.adaptive.partitioning.enable - 默认为true, 当设置为“true”时,生产者将会想Broker性能好的服务发送更多的消息,。如果为“false”,生产者将尝试统一分发消息。注意:如果使用自定义分区器,则此设置无效
  • partitioner.availability.timeout.ms - 默认值为0,如果Broker 在partitioner.availability.timeout时间内无法处理请求,将会视为该分区无效不可用。如果值为0,则禁用此逻辑。注意:如果使用自定义分区器或分区er.adaptive.partitioning,则此设置无效。enable设置为“false”

自定义分区

程序代码

开发者可以选择实现 org.apache.kafka.clients.producer.Partitioner 接口来自定义分区机制,满足特殊的业务场景需求,接下来利用随机算法实现一个自定义分区的功能。该实现么有任何实际的作用,仅仅只是作为学习使用

public class RandomPartitioner implements Partitioner {
    private Random random = new Random();

    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return random.nextInt(numPartitions);
    }

    public void close() {}
}

测试验证

// 指定自定义的分区策略实现类
props.put("partitioner.class","org.kafka.example.RandomPartitioner");

在这里插入图片描述

关于Kafka Producer中的分区
Lestat.Z.的博客
05-13 1456
前言 在前面的示例中,我们创建的ProducerRecord对象包括主题名称,键和值。 Kafka消息是键值对,虽然可以仅使用主题和值创建ProducerRecord,但默认情况下键设置为null,大多数应用程序生成带键的记录。 Key的使用有两个目标: 它们是与消息一起存储的附加信息 它们还用于决定将消息写入哪个主题分区。 具有相同Key的所有消息将转到同一分区。 这意味着如果进程只读取主题...
【003】- Kafka技术内幕之Producer Partition(分区)
zhangxiongcolin的专栏
11-07 1447
我们在前面提到过,kafka的topic是个逻辑概念,实际处理消息处理的是topic的partition。本篇我们将介绍kafka消息发送时是如何分区的以及如何自定义分区。 关注微信公众号,获取更多内容 一. 默认分区 kafka在发送消息时,有两个参数,一个是key,一个是value,key是跟分区相关的,表示该消息应该发送到哪个分区上。当我们在发送消息时,如果不指定key,则kafka内部默...
kafka producer 分区
m0_46449152的博客
03-15 265
策略一: 如果发送消息的时候,没有指定key, 轮询达到负载均衡 //策略二:这个地方就是指定了key, hash取模,相同的key打到同一个分区上 int partition = partition(record, serializedKey, serializedValue, cluster); -> return partition != null ? partition : //使用分区器进行选择合适的分区
kafka系列文章二(Producer消息分区策略)
qq_16077549的博客
03-31 2865
前言 上一篇文章kafka系列文章一(kafka介绍)
kafka生产者分区策略
健康平安的活着的专栏
04-07 5269
kafka分区策略 1.1 分区的好处 分区的好处:便于将数据进行合理存储,将数据分割成一块一块的存储到不同的broker中。 提高并行度:生产者分区为单位进行生产,消费者按分区为单位进行消费。 1.2 分区策略 1.给定了分区号,直接将数据发送到指定的分区里面去。 2.没有给定分区号,通过key的hashcode,和 topic的分区数,进行取模。 例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那 么key1 对应的value
kafka-08-SpringBoot Kafka实战
qq_20633779的博客
01-13 1285
注意: 当前的 Kafka 版本无法保证每个消息“只被保存一次”。现实中的很多应用程序在消息里加入唯一标识符,用于检测重复消息,消费者在读取消息时可以对它们进行清理。应用程序需要可以做到消息的“幂等”,也就是说,即使出现了重复消息,也不会对处理结果的正确性造成负面影响。 整合SpringBoot kafka,加入依赖 <dependency> <groupId>org.springframework.boot</groupId&gt
Kafka --Kafka编程实战-java客户端开发例子
XiaodunLP的博客
02-06 510
本入门教程,涵盖Kafka核心内容,通过实例和大量图表,帮助学习者理解,任何问题欢迎留言。 目录: kafka简介 kafka安装和使用 kafka核心概念 kafka核心组件和流程--控制器 kafka核心组件和流程--协调器 kafka核心组件和流程--日志管理器 kafka核心组件和流程--副本管理器 kafka编程实战 本章通过实际例子,讲解...
python flink kafka_Flink-Kafka-Connector Flink结合Kafka实战
weixin_39799825的博客
12-19 653
戳更多文章:简介Flink-kafka-connector用来做什么?Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复Kafka可以作为Flink的source和sink任务失败,通过设置kafka的offset来恢复应用kafka简单介绍关于kafka,我们会有专题文章介绍,这里简单介绍几个必须知道的概念。1.生产者(Producer)顾名思义,生产者就是生产消息的...
Kafka-Vip-高性能消息中间件Kafka实战(1)1
08-08
Producer 可以选择将消息均匀地分配到各个 Partition 或者根据特定的键进行分区,以实现负载均衡或数据局部性。 **Consumer** 是订阅并处理 Topic 中消息的组件。Kafka 使用 **Consumer Group** 的概念,同一个 ...
Kafka 分区机制详解
jcc4261的博客
10-18 385
由于负责产品的性质原因,我需要大量接触 Kafka,因此对 Kafka 的使用和原理都有一定的了解!主题与分区之间的关系 ✅分区工作的原理流程 ✅如何创建一个多分区的主题 ✅。
#指定kafka_producer 按照表名分区
最新发布
m0_37759590的博客
02-21 135
指定kafka_producer 按照表名分区
kafka分区
qq_43460095的博客
08-15 636
我们也可以自己实现接口来自定义分区器通过配置来配置自定义分区器。
kafka生产者分区策略选择
hailishen的专栏
12-05 1060
一般情况下选择默认分区器(DefaultPartitioner),无论是否高并发,如果极致追求数据平均分布,选择轮询分区器(RoundRobinPartitioner).纯粹的粘性分区策略(UniformStickyPartitioner)基本没有使用场景.高并发下不需要,中低并发下有无batch无所谓.
关于kafka producer 分区策略的思考
热门推荐
Kevin.yang专栏
04-07 2万+
今天跑了一个简单的kafka produce程序,如下所示public class kafkaProducer extends Thread{ private String topic; public kafkaProducer(String topic){ super(); this.topic = topic; }
手撸kafka producer生产者分区器(partition)API
qq_43224174的博客
01-10 1241
本篇博客是对kafka produce 生产者分区器的API(Java) 包含以下内容:分区使用原则,分区器使用原则,分区器相关代码编写及pom.xml配置文件编写,到最后的运行结果。 使用kafka producer分区器的好处: 1、方便在集群中扩展 2、可以提高并发性 分区原则 1、 指明 partition 的情况下,直接将指明的值直接作为 partiton 值; 2、没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topi
kafka 生产端指定分区和消费端分区指派
yanerhao的专栏
05-19 2312
生产端指定分区 主要依靠分配器,对于kafka默认分配器,主要工作流程: 1 如果消息自带key则对key可以hash然后选择目标分区; 2 如果消息无key则采用RoundRobin轮询算法,这样可以最大限度确保消息在所有分区的均匀性; 3 特别的,生产者API赋予用户自行指定分区的权利,在发送消息时如果指定了分区则可以跳过以上分区法则。 消费端分区指派 在 kafka 中,存在着两种为消费组组内的消费者分区的分配策略。一种是 RangeAssignor 分配策略(范围分区),另一种是Rou.
Kafka ProducerRecord的分区路由策略
墨玉浮白的博客
04-29 789
分区路由策略 把消息、序列化的key、序列化的value、元数据都传参给partition()方法。 首先要获取消息的分区号: //获取消息的分区 Integer partition = record.partit
7 kafka分区器(自定义随机、hash、轮询分区
qq_36305027的博客
04-14 855
​ 每一条producerRecord有topic名称、可选的partition分区编号,以及一对可选的key和value组成。 消息是按照三种策略进入分区: 1、如果指定的partition,那么直接进入该partition; 2、如果没有指定partition,但是指定了key,使用key的hash选择partition; 3、如果既没有指定partition,也没有指定key,使用轮询的方式进入partition。 ​ 定义分区器要实现接口org.apache.kafka.c
Kafkaproducer生产者发送到Broker分区策略(中级篇一)
点滴记忆,分享成长之路
12-18 2250
导读:本博文重点介绍了生产者发送消息是怎么发得,发送得流程又是什么,生产者配置有哪些常见得配置,代码异步调用得时候怎么知道有没有异常,消息怎么顺序发送和kafka自定义分区规则,让你知其然再知其所以然。
Kafka Producer拦截器实战Kafka Streams应用
课程首先从实战导入,介绍如何在Java代码中通过Producer拦截器来发送数据,这有助于理解拦截器的实际使用场景和配置。生产者拦截器的核心接口是`org.apache.kafka.clients.producer.ProducerInterceptor`,它包含四...
写文章

热门文章

  • 程序正常启动 telnet端口不通问题处理 12404
  • ThingsBoard MQTT链接、掉线报警、数据转换规则引擎 5419
  • kafka多线程消费 4927
  • Grpc 高级特性之 超时机制 &状态码定义 4819
  • kafka 消息日志原理 & 指定偏移量消费 & 指定时间戳消费 3632

分类专栏

  • K8S 6篇
  • KAFKA 12篇
  • Redis 9篇
  • 工作随笔 7篇
  • MQTT 7篇
  • Thingsboard 3篇
  • MONGODB 1篇
  • java 基础 24篇
  • Dubbo 13篇
  • Golang 21篇

最新评论

  • ThingsBoard 规则引擎-邮件通知

    无聊啊无聊啊啊啊啊啊啊: 温度是一个持续性的,所以温度在一段时间内会告警,每采集一次就发一次邮件?要知道采集的频率是很高的。这个官网的案例离实际运用还差很远

  • kafka多线程消费

    weixin_54643964: 这里没有 终止条件岂不是一直执行 try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if(!records.isEmpty()){ executor.execute(new MessageHandler(records)); } } }finally { consumer.close(); }

  • Golang - 操作Redis

    代码界的小姑娘: 大佬有没有遇到过NewFailoverClusterClient耗时过长的问题 v8版本使用NewFailoverClusterClient redis:M-S-S结构 M在无锡 S在深圳部署 测试客户端起在深圳的机器 初始化NewFailoverClusterClient配置RouteByLatency 理论上应该会路由到深圳从节点耗时应该在1ms之内 实际上耗时100ms+ 生产机器排除网络等问题

  • ThingsBoard MQTT链接、掉线报警、数据转换规则引擎

    梦想有一天能胖起来: 如果你的产品经理经常提一下天马星空的想法,或者你的嵌入式同事经常找事,我个人是建议你直接跟你领导反馈,用阿里云的物联网吧。

  • ThingsBoard MQTT链接、掉线报警、数据转换规则引擎

    backup88: 目前自建物联网平台是不是就这个最好用?

最新文章

  • Go ETCD 安装 & 使用
  • Go 基础Interface
  • Golang - 操作Redis
2023年15篇
2022年70篇
2021年1篇
2020年1篇
2019年4篇

目录

目录

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43元 前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值

玻璃钢生产厂家丽水玻璃钢雕塑制作厂家山东商场美陈销售厂家湖南城市景观雕塑玻璃钢河南创意玻璃钢雕塑制作3d商场美陈陇南玻璃钢雕塑价格玻璃钢仿铜人物雕塑定制价格江苏超市商场美陈山西玻璃钢青椒雕塑商场玩具区域美陈图片郑州玻璃钢艺术雕塑定制定做玻璃钢花盆有什么好处驻马店玻璃钢浮雕泡沫雕塑公司玻璃钢花盆厂家直销玻璃钢雕塑建模葫芦岛玻璃钢雕塑工艺天津景区玻璃钢雕塑制作青海彩色玻璃钢雕塑设计玻璃钢雕塑怎么摆放哈尔滨景观雕塑玻璃钢宁夏玻璃钢仿铜雕塑没有面部的玻璃钢雕塑叫什么欧式玻璃钢卡通雕塑价格合理厂家玻璃钢雕塑大型玻璃钢雕塑企业排名商场美陈生产制作项目经验_铜陵定制玻璃钢雕塑市场上海兵马俑跪姿玻璃钢雕塑商场地面520美陈装饰衡东玻璃钢花盆花器香港通过《维护国家安全条例》两大学生合买彩票中奖一人不认账让美丽中国“从细节出发”19岁小伙救下5人后溺亡 多方发声单亲妈妈陷入热恋 14岁儿子报警汪小菲曝离婚始末遭遇山火的松茸之乡雅江山火三名扑火人员牺牲系谣言何赛飞追着代拍打萧美琴窜访捷克 外交部回应卫健委通报少年有偿捐血浆16次猝死手机成瘾是影响睡眠质量重要因素高校汽车撞人致3死16伤 司机系学生315晚会后胖东来又人满为患了小米汽车超级工厂正式揭幕中国拥有亿元资产的家庭达13.3万户周杰伦一审败诉网易男孩8年未见母亲被告知被遗忘许家印被限制高消费饲养员用铁锨驱打大熊猫被辞退男子被猫抓伤后确诊“猫抓病”特朗普无法缴纳4.54亿美元罚金倪萍分享减重40斤方法联合利华开始重组张家界的山上“长”满了韩国人?张立群任西安交通大学校长杨倩无缘巴黎奥运“重生之我在北大当嫡校长”黑马情侣提车了专访95后高颜值猪保姆考生莫言也上北大硕士复试名单了网友洛杉矶偶遇贾玲专家建议不必谈骨泥色变沉迷短剧的人就像掉进了杀猪盘奥巴马现身唐宁街 黑色着装引猜测七年后宇文玥被薅头发捞上岸事业单位女子向同事水杯投不明物质凯特王妃现身!外出购物视频曝光河南驻马店通报西平中学跳楼事件王树国卸任西安交大校长 师生送别恒大被罚41.75亿到底怎么缴男子被流浪猫绊倒 投喂者赔24万房客欠租失踪 房东直发愁西双版纳热带植物园回应蜉蝣大爆发钱人豪晒法院裁定实锤抄袭外国人感慨凌晨的中国很安全胖东来员工每周单休无小长假白宫:哈马斯三号人物被杀测试车高速逃费 小米:已补缴老人退休金被冒领16年 金额超20万

玻璃钢生产厂家 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化