kafka创建topic(ApacheKafka开源消息系统_kafka源码分析)
本文目录
- ApacheKafka开源消息系统_kafka源码分析
- 5-kafka(分布式消息队列)
- 三、Kafaka的基本操作
- windows 下远程连接kafka服务器并创建topic 部署服务
- 怎么设置kafka topic数据存储时间
- Kafka 源码解析之 Topic 的新建/扩容/删除
- Kafka的Topic配置详解
- Kafka之主题创建与修改
- kafka极简入门(三)--创建topic
- kafka 常见命令以及增加topic的分区数
ApacheKafka开源消息系统_kafka源码分析
消息中间价,首选Kafka,大厂开源,稳定更新,性能优越,顺便介绍kafka的相关知识。
一、kafka是什么?
ApacheKafka是一套开源的消息系统,它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式,分区化,可复制的提交日志服务。现在,LinkedIn公司有三个同事离职创业,继续开发kafka。
二、关键配置项解读
出于性能和实际集群部署情况,我们还是需要讲解一些重要的配置项。除此之外,如果对某个默认参数存在质疑,在详细了解改参数的作用前,建议采用默认配置。
advertised.host.name
注册到zk供用户使用的主机名。内网环境通常无需配置,而IaaS一般需要配置为公网地址。默认为“host.name”,可以通过java.net.InetAddress.()接口获取该值。
advertised.port
注册到zk供用户使用的服务端口,通常在IaaS环境需要额外配置。
num.partiti***
自动创建topic的默认partition数量。默认是1,为了获得更好的性能,建议修改为更大。最优取值参考后文。
default.replication.factor
自动创建topic的默认副本数量,官方建议修改为2;但通常一个副本就足够了。
min.insync.replicas
ISR提交生成者请求的最小副本数。
unclean.leader.election.enable
是否允许不具备ISR资格的replicas选举为leader作为不得已的措施,甚至不惜牺牲部分数据。默认允许。建议允许。数据异常重要的情况例外。
controlled.shutdown.enable
在kafka收到stop命令或者异常终止时,允许自动同步数据。建议开启。
三、调优考量
配置合适的partit***数量。
这似乎是kafka新手必问得问题。partiton是kafka的并行单元。从procer和broker的视角看,向不同的partition写入是完全并行的;而对于c***umer,并发数完全取决于partition的数量,即,如果c***umer数量大于partition数量,则必有c***umer闲置。所以,我们可以认为kafka的吞吐与partition时线性关系。partition的数量要根据吞吐来推断,假定p代表生产者写入单个partition的最大吞吐,c代表消费者从单个partition消费的最大吞吐,我们的目标吞吐是t,那么partition的数量应该是t/p和t/c中较大的那一个。实际情况中,p的影响因素有批处理的规模,压缩算法,确认机制和副本数等,然而,多次benchmark的结果表明,单个partition的最大写入吞吐在10MB/sec左右;c的影响因素是逻辑算法,需要在不同场景下实测得出。
这个结论似乎太书生气和不实用。我们通常建议partition的数量一定要大于等于消费者的数量来实现最大并发。官方曾测试过1万个partition的情况,所以不需要太担心partition过多的问题。我建议的做法是,如果是3个broker的集群,有5个消费者,那么建议partition的数量是15,也就是broker和c***umer数量的最小公倍数。当然,也可以是一个大于消费者的broker数量的倍数,比如6或者9,还请读者自行根据实际环境裁定。
5-kafka(分布式消息队列)
Kafka-分布式发布-订阅消息系统,最初是由LinkedIn公司所开发,Scala语言编写,之后加入Apache旗下,成为了Apache的子项目。
1. 消息持久化:采用时间复杂度O(1)的磁盘存储结构,即使TB级以上数据也能保证常数时间的访问速度。
2. 高吞吐:Kafka拥有很高的吞吐量,即使是在单节点性能比较低下的商用集群中,也能保证单节点每秒10万条消息的传输。
3. 高容错:Kafka在设计上支持多分区、多副本的策略,拥有很强的容错性。
4. 易扩展:在进行集群扩展时,集群无需停机,就可以轻松完成对集群规模的扩展,新增加的节点自动感知,加入集群。
5. 多种处理模式:支持离线、实时的处理模式,应对各种生产环境。
Broker :一个Broker就是Kafka集群中的一个节点,多个Broker组成了Kafka集群。
Topic :Topic只是一个逻辑上的概念。Kafka把同一类数据进行汇总,每一类数据的集合就是一个Topic。生产者Producer将同一类型的数据写入同一个Topic,消费者C***umer从同一个Topic中消费该同类数据。
Partition :分区是一个物理概念,每一个Topic都可以包含很多个Partition。是一个有序的不可修改的消息队列,每个分区内消息是有序的,并且每个分区对应一个文件夹,用来存储分区的数据以及索引。
Replication :分区的副本,每个副本存储在不同的Broker中。
Producer :消息生产者,是Kafka中向Broker发布消息的客户端。
C***umer :消息消费者,是消费Broker中信息的客户端。
C***umer Group :Kafka中支持将多个消费者作为一个群体,就是C***umer Group消费者组。每一个C***umer都隶属于一个特定的消费者组,并且一条消息可以发送给多个不同的消费者组,但是每一个消费者组中只能有一个消费者消费该消息。
Zookeeper :Zookeeper负责保存Kafka的元数据,同时也负责Kafka的集群管理。
4 C***umer Group消费者组是为了加快消费的读取速度的一个模型,一个消费者组中的多个C***umer可以并行消费同一个Topic中的数据。
并且多个C***umer Group可以消费同一个Topic,这些消费者组之间是平等的,即同一条消息可同时被多个消费者组消费。
同一个C***umer Group消费者组中的多个C***umer消费者之间是竞争关系,也就是说同一条消息在一个消费者组中只能被一个消费者所消费。
Partition是一个物理结构,它的实际存储在一个文件夹目录内,目录中包含若干个Segment文件。
Segment文件是Kafka中的最小存储单元,它是由以Message在Partition中的起始偏移量命名的数据文件( .log)和索引文件( .index, *.timeindex)组成,如图所示。
早期的Kafka版本里并没有副本的概念,这样一旦某个节点宕机,那么这台节点上存储的所有数据都有可能丢失,为了解决这个问题,在之后的版本更新中提出了Replication副本的概念。
之前讲到过,一个Topic主题中可以有多个Partition分区,并且每个分区都可以有多个副本,这是Kafka的一个数据冗余机制。除此之外,Kafka为了保证多个副本的数据一致性,从同一个分区的多个副本中选举出一个Partition Leader,由这个Leader来负责读写,其他的副本作为Follower从Leader中同步消息,通过这样一个副本同步机制,保证了多副本的数据一致性。
同样的Kafka中的Broker也会有这样的一个选举机制。每个Broker在启动时都会创建一个Kafka Controller进程。由这个Kafka Controller以及Zookeeper来选举出一个Kafka Controller Leader。Kafka Controller Leader负责管理Kafka集群的分区和副本状态,避免了分区副本直接在Zookeeper上注册Watcher和竞争创建临时Znode,导致Zookeeper集群的负载过重。
Kafka的命令操作以对Topic为主,通过PPT中的例子可以创建一个带有Partition以及制定了Replication的Topic,注意,在创建Topic时要制定Topic的名字以及Kafka对应的节点以及端口。
三、Kafaka的基本操作
在启动Kafka之前,需要启动zookeeper,否则会报错!相关的启动指令如下:
在此配置中,只有一个 ZooKeeper 和代理 id 实例。 配置步骤如下:(注意,以下过程中的topicName表示创建主题的名称,可以自己定义。)
(1)创建Kafka主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partiti*** 1 --topic topicName
创建主题后,会在 Kafka 代理终端窗口中获取通知,并在 config / server.properties 文件中的“/ tmp / kafka-logs /"中指定创建主题的日志。
(2)启动生产者以发送消息
bin/kafka-c***ole-producer.sh --broker-list localhost:9092 --topic topicName
生产者命令行客户端需要两个主要参数:
1.代理列表(broker-list): 要发送邮件的代理列表。 这种情况下,只有一个代理。
2.**端口: Config / server.properties 文件包含代理端口 ID,可以查到代理正在侦听端口 9092,因此直接指定它。
生产者在 config / producer.properties 文件中指定默认生产者属性。
(3)启动消费者以接收消息
bin/kafka-c***ole-c***umer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning
消费者在config / c***umer.properties 文件中指定了默认消费者属性。 打开一个新终端并键入以下消息消息语法。
(4)在生产者终端输入数据测试
生产者将等待消息的输入并发布到 Kafka 集群。 默认情况下,每行数据都作为新消息发布。在生产者终端输入数据,这些数据都会在消费者终端显示。
windows 下远程连接kafka服务器并创建topic 部署服务
一.打包项目镜像:
利用Dockerfile 来打包项目的镜像
本次项目共依赖两个镜像(一个基础系统环境和一个项目镜像)
本次直接将Dockerfile写好后,用shell脚本build.sh启动打包:
然后切换到项目的目录下找到build.sh,运行即可打包项目镜像
若
报错:"failed to dial gRPC: cannot connect to the Docker daemon. Is ’docker daemon’ running on this host?: dial unix /var/run/docker.sock: connect: permission denied
"
就用
出现以下说明打包成功,接下来可以开始部署:
***隐藏网址***
注意:如果遇到只读权限不能修改时,将host文件复制一份到桌面,修改后在替换原来的host文件
在hosts文件末尾加上kafka服务器《 !外网! 39. 0.25...》地址,修改后的格式如下:
1.1注意: 修改阿里云服务器的hosts 文件来配置 kafka的服务器地址:
在hosts 文件最后加入:
添加的 kafka-server 就是以下创建topic命令中的 kafka-server别名,
**远程kafka:新建消费者:
远程创建topic的实例:
查看远程已创建的topc:
本地:
远程修改后的kafka topic:
2.通过git Bash 切换到kafka客户端的bin目录:
桌面打开 gitBash,切换到本地kafka软件目录:
这里一定要切换为windows
3.查看已经有的topic
--topic 指定topic名字
--replication-factor 指定副本数,因为我的是集群环境,这里副本数就为3
--partiti*** 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好
注意:服务器部署时候一定要用内网172. .开头的,外部访问设为外网ip
不然会导致Kafka写入数据的时候报错 : TImeout
4.1本地docker创建topic:
4.2 本地windows 创建topic
进入本地软件路径KAFKA/BIN/WIONDOWS
创建topic
5.修改服务器的host:
一定要注意加sudo 不然会导致readonly 无法修改
在host 文件的末尾加上以下:
6.切换到工程部署的目录
7.清理redis,不然数据有残留:
7.1服务器上的redis挂载清除:
在 docker-compose.yml中注销这几行: 目的是每次启动不必记录上次没有执行完的数据.
这个是用来记录redis中假如上次指定的是1到100万块,没有执行完.下次接着执行没执行完的任务,测试时暂时关闭
7.2删除volume:
7.3 如果volume文件被占用时,先删除占用容器:
7.4 清除redis中的数据
进入redis容器中:
8.部署命令:
8.1开启docker可视化web上监控docker:
***隐藏网址***
宿主机IP + 9000端口
8.2执行部署命令,启动服务:
9.部署时报错: yaml: line 46: did not find expected key
原因: docker-compose.yml文件中第46行 报错
解决:将所有数据对齐,不要有多余的空格.
怎么设置kafka topic数据存储时间
1、Kafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partiti*** 3 --topic test 。
2.此命令将创建一个名为test的topic,其中有三个分区,每个分区需要分配三个副本。
三。topic创建主要分为两部分:命令行controller逻辑部分。
四。后台逻辑将**zookeeper下对应的目录节点。一旦启动topic创建命令,它将创建一个新的数据节点并触发后台创建逻辑。
五个。确定分区副本分配方案(即,将每个分区副本分配给哪个代理);创建zookeeper节点并将此方案写入/brokers/topics/《topic》节点。
五个。确定分区副本分配方案(即每个分区的副本分配给哪个分区)broker上);创建zookeeper节点,把这个方案写入/brokers/topics/《topic》节点下。
6、Kafka controller这一部分的主要任务是:创建分区;创建副本;为每个分区选择leaderISR;;更新各种缓存。
Kafka 源码解析之 Topic 的新建/扩容/删除
本篇接着讲述 Controller 的功能方面的内容,在 Kafka 中,一个 Topic 的新建、扩容或者删除都是由 Controller 来操作的,本篇文章也是主要聚焦在 Topic 的操作处理上(新建、扩容、删除),实际上 Topic 的创建在 Kafka 源码解析之 topic 创建过程(三) 中已经讲述过了,本篇与前面不同的是,本篇主要是从 Controller 角度来讲述,而且是把新建、扩容、删除这三个 Topic 级别的操作放在一起做一个总结。
这里把 Topic 新建与扩容放在一起讲解,主要是因为无论 Topic 是新建还是扩容,在 Kafka 内部其实都是 Partition 的新建,底层的实现机制是一样的,Topic 的新建与扩容的整体流程如下图所示:
Topic 新建与扩容触发条件的不同如下所示:
下面开始详细讲述这两种情况。
Topic 扩容
Kafka 提供了 Topic 扩容工具,假设一个 Topic(topic_test)只有一个 partition,这时候我们想把它扩容到两个 Partition,可以通过下面两个命令来实现:
这两种方法的区别是:第二种方法直接指定了要扩容的 Partition 2 的副本需要分配到哪台机器上,这样的话我们可以精确控制到哪些 Topic 放下哪些机器上。
无论是使用哪种方案,上面两条命令产生的结果只有一个,将 Topic 各个 Partition 的副本写入到 ZK 对应的节点上,这样的话 /brokers/topics/topic_test 节点的内容就会发生变化,PartitionModificati***Listener **器就会被触发 ,该**器的处理流程如下:
其 doHandleDataChange() 方法的处理流程如下:
下面我们看下 onNewPartitionCreation() 方法,其实现如下:
关于 Partition 的新建,总共分了以下四步:
经过上面几个阶段,一个 Partition 算是真正创建出来,可以正常进行读写工作了,当然上面只是讲述了 Controller 端做的内容,Partition 副本所在节点对 LeaderAndIsr 请求会做更多的工作,这部分会在后面关于 LeaderAndIsr 请求的处理中只能够详细讲述。
Topic 新建
Kafka 也提供了 Topic 创建的工具,假设我们要创建一个名叫 topic_test,Partition 数为2的 Topic,创建的命令如下:
跟前面的类似,方法二是可以精确控制新建 Topic 每个 Partition 副本所在位置,Topic 创建的本质上是在 /brokers/topics 下新建一个节点信息,并将 Topic 的分区详情写入进去,当 /brokers/topics 有了新增的 Topic 节点后,会触发 TopicChangeListener **器,其实现如下:
只要 /brokers/topics 下子节点信息有变化(topic 新增或者删除),TopicChangeListener 都会被触发,其 doHandleChildChange() 方法的处理流程如下:
接着看下 onNewTopicCreation() 方法实现
上述方法主要做了两件事:
onNewPartitionCreation() 的实现在前面 Topic 扩容部分已经讲述过,这里不再重复,最好参考前面流程图来梳理 Topic 扩容和新建的整个过程。
Kafka Topic 删除这部分的逻辑是一个单独线程去做的,这个线程是在 Controller 启动时初始化和启动的。
TopicDeletionManager 初始化
TopicDeletionManager 启动实现如下所示:
TopicDeletionManager 启动时只是初始化了一个 DeleteTopicsThread 线程,并启动该线程。TopicDeletionManager 这个类从名字上去看,它是 Topic 删除的管理器,它是如何实现 Topic 删除管理呢,这里先看下该类的几个重要的成员变量:
前面一小节,简单介绍了 TopicDeletionManager、DeleteTopicsThread 的启动以及它们之间的关系,这里我们看下一个 Topic 被设置删除后,其处理的整理流程,简单做了一个小图,如下所示:
这里先简单讲述上面的流程,当一个 Topic 设置为删除后:
先看下 DeleteTopicsListener 的实现,如下:
其 doHandleChildChange() 的实现逻辑如下:
接下来,看下 Topic 删除线程 DeleteTopicsThread 的实现,如下所示:
doWork() 方法处理逻辑如下:
先看下 onTopicDeletion() 方法,这是 Topic 最开始删除时的实现,如下所示:
Topic 的删除的真正实现方法还是在 startReplicaDeletion() 方法中,Topic 删除时,会先调用 onPartitionDeletion() 方法删除所有的 Partition,然后在 Partition 删除时,执行 startReplicaDeletion() 方法删除该 Partition 的副本,该方法的实现如下:
该方法的执行逻辑如下:
在将副本状态从 OfflineReplica 转移成 ReplicaDeletionStarted 时,会设置一个回调方法 deleteTopicStopReplicaCallback(),该方**将删除成功的 Replica 设置为 ReplicaDeletionSuccessful 状态,删除失败的 Replica 设置为 ReplicaDeletionIneligible 状态(需要根据 StopReplica 请求处理的过程,看下哪些情况下 Replica 会删除失败,这个会在后面讲解)。
下面看下这个方法 completeDeleteTopic(),当一个 Topic 的所有 Replica 都删除成功时,即其状态都在 ReplicaDeletionSuccessful 时,会调用这个方法,如下所示:
当一个 Topic 所有副本都删除后,会进行如下处理:
至此,一个 Topic 算是真正删除完成。
Kafka的Topic配置详解
配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值。
创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic"例子,它设置了2个参数max message size 和 flush rate.
(A)创建topic时配置参数
(B)修改topic时配置参数
覆盖已经有topic参数,下面例子修改"my-topic"的max message属性
(C)删除topic级别配置参数
注:配置的kafka集群的根目录为/config/mobile/mq/mafka02,因此所有节点信息都在此目录下。
cleanup.policy
delete.retention.ms
delete.retention.ms
flush.messages
flush.ms
index.interval.bytes
message.max.bytes
min.cleanable.dirty.ratio
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
log.roll.hours
参考资料:
***隐藏网址***
***隐藏网址***
Kafka之主题创建与修改
向一个不存在的主题发送和消费都会创建一个新的主题,很多时候,非预期的创建主题,会导致很多意想不到的问题,建议关掉该特性。
创建主题的更加推荐采用脚本的方式,位置为:
其中日志路径下查看broker的分配信息,也可以在ZooKeeper中获取分配信息:
查看主题详情:展示主题,分区数,副本数(AR),Leader副本所在broker,ISR等信息
如果没有指定--topic则会展示全部主题信息
有相应的Java API对应kafka-topics.sh脚本实现的功能。
Kafka
深入了解Kafka
kafka极简入门(三)--创建topic
回顾 kafka极简入门(二)--安装
topic是kafka的生产者和消费者最小交互的单位,我们先从topic入手,创建第一个topic.
或
所以执行上面命令将会创建一个名为mytest的topic,该topic下面有1个分区,并且该分区只有1个副本。
PS:除了手动创建主题外,还可以将代理配置为在发布不存在的主题时自动创建主题
Partition:0 表示该分区的id为0
leader: 9 表示分区的首领副本所在的broker(本例子中broker.id配置为9,所以这里显示9,具体在config/server.properties配置。这里只有一个分区,所以首领分区也就是自己)
Replicas: 9 表示分区的跟随副本所在的broker
Isr: 9 表示分区的同步副本所在的broker(同步副本可以认为跟首领副本准实时同步的副本,可以配置判断条件,后面会讲,首领副本挂掉后,服务器会从同步副本中选举新的首领)
发送三个消息,分别是hello, world和!
注意: --from-beginning 表示从最开始的offset处开始消费。如果不写表示从最新的offset处消费,那么先发送了消息再开启消费者是收不到已发送的信息的
kafka 常见命令以及增加topic的分区数
kafka-topics.sh --bootstrap-server ${kafkaAddress} --create --topic ${topicName} --partiti*** ${partiparti***} --replication-factor ${replication}
kafka-topics.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --describe
kafka-topics.sh --bootstrap-server ${kafkaAddress} --delete--topic ${topicName} --partiti*** ${partiti***} --replication-factor ${replication}
kafka-topics.sh --bootstrap-server ${kafkaAddress} --list
kafka-c***ole-c***umer.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --from-beginning
kafka-c***umer-groups.sh --describe --bootstrap-server ${kafkaAddress} --group ${groupName}
a.修改partiti***数量
kafka-topics.sh --bootstrap-server ${kafkaAddress} --topic ${topicName} --alter --partiti*** 4
b.创建increase-replication-factor.json in config,配置各分区replication-factor位置
c.更新replication-factor
kafka-reassign-partiti***.sh --bootstrap-server ${kafkaAddress} --reassignment-json-file config/increase-replication-factor.json --execute
更多文章:
sqlserver存储过程面试题(**Lserver 数据库触发器 存储过程问题)
2026年4月1日 18:00
crm客户管理系统模板(CRM客户管理系统如何进行数据统计分析的)
2026年4月1日 17:20
javascript onfocus(javascript中怎么设置文本框获得焦点)
2026年4月1日 16:40
kafka创建topic(ApacheKafka开源消息系统_kafka源码分析)
2026年4月1日 16:20
cmd grep命令(如何在Command Line 命令中使用Grep)
2026年4月1日 15:20
excel条件函数怎么用(excel中多条件函数的使用方法(excel多条件函数公式))
2026年4月1日 15:00




