kafka命令总结

topic的相关操作

创建topic

  • 创建一个叫做“test”的topic,它只有二个分区,一个副本

  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test

    名词 解释
    --zookeeper 后面加zookeeper的地址
    --replication-factor 副本数
    --partitions 分区数
    --topic topic的名字

删除topic

  • bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic name】

修改topic

  • bin/kafka-topics.sh --zookeeper localhost:2181 --alter --partitions 20 --topic test
  • 修改topic为test的分区为20

查看topic

  • bin/kafka-topics.sh --list --zookeeper localhost:2181

  • 查看集群中 topic的数量

查看某个topic的详细内容

  • bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

消费数据

消费数据相关

  • bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test <--from-beginning>
    --from-beginning 从头开始
  • bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group cjpt
  • bin/kafka-consumer-groups.sh --group cjpt-test-htga --describe --zookeeper localhost:2181
  • 2.2版本后的
  • bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.64:9092 --from-beginning --topic test
  • 查看分区情况
  • bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test_topic --time -1 --broker-list localhost:9092 --partitions 5
  • 从头开始消费
  • bin/kafka-console-consumer.sh --zookeeper 10.4.2.66:2181 --from-beginning --topic test --consumer.config config/consumer.properties

查看消费情况

  • bin/kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092
  • 查看testGroup的消费情况的
  • bin/kafka-consumer-groups.sh --zookeeper 10.4.2.66:2181 --describe --group test-consumer-group

查看数据的生产情况

  • bin/kafka-console-producer.sh --broker-list docker-master:9092,docker-slave01:9092,docker-slave02:9092 --topic topic_test

生产数据

  • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka性能测试

生产性能测试

bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=192.168.1.63:9092,192.168.1.64:9092,192.168.1.65:9092 --record-size 102400 --num-records 10000 --throughput -1 --topic test

消费性能测试

bin/kafka-consumer-perf-test.sh --zookeeper 192.168.1.63:2181,192.168.1.64:2181,192.168.1.65:2181 --broker-list 192.168.1.63:9092,192.168.1.64:9092,192.168.1.65:9092 --fetch-size 1024 --messages 100000 --topic test --threads 3 --group producer_consumer

kafka权限

赋Consumer权限命令

1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>

赋Producer权限命令,由管理员用户操作。

1bin/kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群IP:24002/kafka > --add --allow-principal User:<用户名> --producer --topic <Topic名称>

分区计划

 1修改topic的备份数量
 2//校验是否正确
 3bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file ./topic-test.json  --verify
 4
 5输出信息:修改之前报这个信息,说备份数量和文本不一致(因为文本内容是要添加副本数,肯定不一致),证明文本没有问题,可以执行
 6
 7[root@cjmaster01 kafka_2.11-0.9.0.1]# bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file ./topic-test.json  --verify
 8Status of partition reassignment:
 9ERROR: Assigned replicas (0) don't match the list of replicas for reassignment (0,1) for partition [topic-test,1]
10ERROR: Assigned replicas (2) don't match the list of replicas for reassignment (1,2) for partition [topic-test,3]
11ERROR: Assigned replicas (2) don't match the list of replicas for reassignment (2,1) for partition [topic-test,0]
12ERROR: Assigned replicas (1) don't match the list of replicas for reassignment (1,2) for partition [topic-test,2]
13ERROR: Assigned replicas (1) don't match the list of replicas for reassignment (1,0) for partition [topic-test,5]
14ERROR: Assigned replicas (0) don't match the list of replicas for reassignment (1,0) for partition [topic-test,4]
15Reassignment of partition [topic-test,1] failed
16Reassignment of partition [topic-test,3] failed
17Reassignment of partition [topic-test,0] failed
18Reassignment of partition [topic-test,2] failed
19Reassignment of partition [topic-test,5] failed
20Reassignment of partition [topic-test,4] failed
21
22//上边返回成功了再执行
23bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --reassignment-json-file ./topic-test.json  --execute
24
25topic-test.json文件的格式
26{
27        "partitions": [{
28                        "topic": "topic-test"
29                        "partition": 0,
30                        "replicas": [2, 1]
31                },
32                {
33                        "topic": "topic-test",
34                        "partition": 1,
35                        "replicas": [0, 1]
36                }
37	}
38  ],
39  "version": 1
40
41}
42

kafka 相关例子

1bin/kafka-console-producer.sh \
2--broker-list master:9092,slave01:9092,slave02:9092 \
3--topic test
4
5
6

kafka 集群的注意点

获取到 Kafka 集群的 zookeeper.connect 信息。

 1通过读取 config/server.properties 文件,获取到 Kafka 集群的 zookeeper.connect 信息。
 2为简化后续步骤的信息输入,这里 创建一个临时变量 zkUrls 保存这一信息,
 3例如:
 43. cat config/server.properties | grep  "^zookeeper.connect " 然后对 zkUrls 进行赋值:
 54. zkUrls=" 192.168.115.11:24002,192.168.115.12:24002,192.168.115.3:24002/kafka"
 65. 创建一个 Topic,名为 testTopic: bin/kafka-topics.sh --zookeeper $zkUrls
 7--create  --topic  testTopic  --partitions 1 --replication-factor 2
 86. 查询集群中的 Topic 信息:
 9bin/kafka-topics.sh --zookeeper $zkUrls --list bin/kafka-topics.sh --zookeeper $zkUrls --describe --topic testTopic
107. 删除 5 中创建的Topic:
11bin/kafka-topics.sh --zookeeper $zkUrls  --delete --topic  testTopic
128. 再次查询集群中的 Topic 信息:
13bin/kafka-topics.sh --zookeeper $zkUrls --list bin/kafka-topics.sh --zookeeper $zkUrls --describe --topic testTopic