Kafka(7):生产者详解
1 消息发送
1.1 Kafka Java客户端数据生产流程解析
1 首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic、分区Partition、键 Key以及值 Value,主题和值是必须要声明的,分区和键可以不用指定。
2 调用send() 方法进行消息发送。
3 因为消息要到网络上进行传输,所以必须进行序列化,序列化器的作用就是把消息的 key value对象序列化成字节数组。
4 接下来数据传到分区器,如果之间的 ProducerRecord 对象指定了分区,那么分区器将不再做任何事,直接把指定的分区返回;如果没有,那么分区器会根据 Key 来选择一个分区,选择好分区之后,生产者就知道该往哪个主题和分区发送记录了。
5 接着这条记录会被添加到一个记录批次里面,这个批次里所有的消息会被发送到相同的主题和分区。会有一个独立的线程来把这些记录批次发送到相应的 Broker 上。
6 Broker成功接收到消息,表示发送成功,返回消息的元数据(包括主题和分区信息以及记录在分区里的偏移量)。发送失败,可以选择重试或者直接抛出异常。
依赖的包 <kafka.version>2.0.0</kafka.version>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
1.2 必要参数配置
package com.example.demo.demo2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Kafka消息生产者进一步分析
*/
public class KafkaProducerAnalysis {
private static final String brokerList = "192.168.222.130:9092,192.168.222.131:9092,192.168.222.132:9092";
private static final String topic = "heima";
public static Properties initConfig() {
Properties props = new Properties();
// 该属性指定 brokers 的地址清单,格式为 host:port。清单里不需要包含所有的 broker 地址,
// 生产者会从给定的 broker 里查找到其它 broker 的信息。——建议至少提供两个 broker 的信息,因为一旦其中一个宕机,生产者仍然能够连接到集群上。
props.put("bootstrap.servers", brokerList);
// 将 key 转换为字节数组的配置,必须设定为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,
// 生产者会用这个类把键对象序列化为字节数组。
// ——kafka 默认提供了 StringSerializer和 IntegerSerializer、ByteArraySerializer。当然也可以自定义序列化器。
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 和 key.serializer 一样,用于 value 的序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 用来设定KafkaProducer对应的客户端ID,默认为空,如果不设置KafkaProducer会自动生成一个非空字符串。
// 内容形式如:"producer-1"
props.put("client.id", "producer.client.id.demo");
return props;
}
public static Properties initNewConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
// 自定义分区器的使用
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());
// 自定义拦截器使用
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,ProducerInterceptorPrefix.class.getName());
props.put(ProducerConfig.ACKS_CONFIG,0);
return props;
}
public static Properties initPerferConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return props;
}
public static void main(String[] args) throws InterruptedException {
Properties props = initNewConfig();
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// KafkaProducer<String, String> producer = new KafkaProducer<>(props,
// new StringSerializer(), new StringSerializer());
//生成 ProducerRecord 对象,并制定 Topic,key 以及 value
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");
try {
// 1、发送消息
producer.send(record);
// 2、同步发送
//通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应
//如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量
// 如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理
//producer.send(record).get();
// 3、异步发送
// producer.send(record, new Callback() {
// @Override
// public void onCompletion(RecordMetadata metadata, Exception exception) {
// if (exception == null) {
// System.out.println(metadata.partition() + ":" + metadata.offset());
// }
// }
// });
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
// TimeUnit.SECONDS.sleep(5);
}
}
1.3 发送类型
发送即忘记
producer.send(record)
同步发送
//通过send()发送完消息后返回一个Future对象,然后调用Future对象的get()方法等待kafka响应
//如果kafka正常响应,返回一个RecordMetadata对象,该对象存储消息的偏移量
// 如果kafka发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理
producer.send(record).get();
异步发送
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata.partition() + ":" + metadata.offset());
}
}
});
1.4 序列化器
消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。
Kafka提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)基本上能够满足大部分场景的需求。
1.5 自定义序列化器
未完待续
阿J~: 大佬,一给我嘞giao
穷苦书生_万事愁: 博主的文章《Java:实现RSA加密与验证的方法详解》深入浅出地介绍了RSA加密与验证的方法,让我对这个主题有了全新的认识。文章中的细节描写非常到位,让我感受到了博主的深厚功底和对技术的热爱。期待博主未来能够持续分享更多这样有价值、有深度的好文,同时也希望能够得到博主的指导,共同进步。非常感谢博主的分享和支持!
穷苦书生_万事愁: 博主的这篇文章真的让我眼前一亮,对java面试题:简化URL的讲解非常深入,让我对这个主题的理解有了全新的认识。文章中的细节描述非常到位,让我感受到了博主深厚的技术功底和丰富的经验。期待博主未来能够持续分享更多类似的好文,同时也希望能够得到博主的指导,共同进步。非常感谢博主的无私分享和支持!
夜晚的猿猴: 8.0以上的 TRUNCATE TABLE performance_schema.host_cache
sunsheng1985: 这玩意支持Redis集群吗