kafka接口(c++怎么调用java的kafka接口)
本文目录
c++怎么调用java的kafka接口
《pre t="code" l="java"》看Java源代码,发现有些方法是空实现,在方法前面有一个修饰词 native ; 查阅得知 该方法就是Java程序调用底层的C接口,你可以向这方面入手找些资料做参考!
kafka如果有多个节点,客户端连接地址
每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响c***umers
broker.id =1
kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs
提供给客户端响应的端口
port =6667
消息体的最大大小,单位是字节
message.max.bytes =1000000
broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads =3
broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads =8
一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
background.threads =4
等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,算是一种自我保护机制
queued.max.requests =500
broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
host.name
打广告的地址,若是设置的话,会提供给producers, c***umers,其他broker连接,具体如何使用还未深究
advertised.host.name
广告地址端口,必须不同于port中的设置
advertised.port
socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes =100*1024
socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes =100*1024
socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
3、Kafka生产者-向Kafka写入数据
发送消息的主要步骤
格式:每个消息是一个 ProducerRecord 对象, 必须指定 所属的 Topic和Value , 还可以指定Partition及Key
1:序列化 ProducerRecord
2:分区: 如指定Partition,不做任何事情;否则,Partitioner 根据key得到Partition 。生产者向哪个Partition发送
3:消息添加到相应 bach中 ,独立线程将batch 发到Broker上
4:broker收到消息响应 。 成功回RecordMetaData对象 ,包含了Topic信息、Patition信息、消息在Partition中的Offset信息; 失败返回错误
有序场景:不建议retries 0。可max.in.flight.requests.per.connection 1, 影响生产者吞吐量,但保证有序 ps: 同partition消息有序
三个 必选 的属性:
(1) bootstrap.servers ,broker地址清单
(2) key.serializer: 实现org.apache.kafka.common.serialization.Serializer接口的类,key序列化成字节数组。注意: 必须被设置,即使没指定key
(3)value.serializer, value序列化成字节数组
同步发送消息
异步发送消息
(1)acks: 指定多少partition副本收到消息,生产者才会认为写成功
0,不需等待服务器的响应,吞吐量高,如broker没有收到,生产者不知道
1,leader partition收到消息,一个即成功
all,所有partition都收到,才成功,leader和follower共同应答
(2)buffer.memory, 生产者内 缓存区域大小
(3)compression.type ,默认不压缩,设置成snappy、gzip或lz4对发送给broker压缩
(4)retries, 重发消息的次数
(5)batch.size, 发送同一partition消息会先存储在batch中,该参数指定一个batch内存大小,单位byte。不一定填满才发送
(6)linger.ms ,批次时间,batch被填满或者linger.ms达到上限,就把batch中的消息发送出去
(7)max.in.flight.requests.per.connection, 生产者在收到服务器响应之前可以发送的消息个数
创建ProducerRecord时,必须 指定序列化器 ,推荐序列化框架Avro、Thrift、ProtoBuf等
用 Avro 之前,先定义schema(通常用 JSON 写)
(1)创建一个类代表客户,作为消息的value
(2)定义schema
(3)生成Avro对象发送到Kafka
ProducerRecord包含Topic、value,key默认null,ey的两个作用:1)附加信息 2)被写到Topic的哪个partition
key null ,默认partitioner, RoundRobin均衡分布
key不空,hash进行散列 ,不改变partition数量(永远不加),key和partition映射不变。
自定义paritioner 需实现Partitioner接口
deliver与kafka直接关联么
直接。在Kafka中,用户在页面点击从而产生一个HTTP请求,这个请求发送到业务生产进程,就会启动一个投递线程(Deliver)调用Kafka的SDK接口,直接关联其本地数据块读取。
Kafka的特性
Kafka的特性:
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, c***umer group 对partition进行c***ume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
1.2 Kafka的使用场景:
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种c***umer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
更多文章:
equals键(为什么重写equals方法,一定要重写HashCode方法)
2026年4月4日 05:40
resp***etext中的值(怎么获得由Ajax得到的Json的resp***eText中以字符串为键所对应的值)
2026年4月4日 05:00
paddington包(职场女性到商场看包包,一看就是小半天,但又不会买这是为什么呢)
2026年4月4日 03:20
mysql工厂sqlquery(mysql_query除了能执行sql语句,还能设置字符编码集那会不会混淆)
2026年4月4日 02:40







