kafka删除topic分析
官方kafka中提供了创建和查看topic的命令:
[attach]502[/attach]
从0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete[size=13] [/size]但是在运行时会报错找不到这个方法。 kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。 在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。 kafka.admin.DeleteTopicCommand 其中删除topic的具体实现代码如下:
import org.I0Itec.zkclient.ZkClient import kafka.utils.{Utils, ZKStringSerializer, ZkUtils} ....... val topic = options.valueOf(topicOpt) val zkConnect = options.valueOf(zkConnectOpt) var zkClient: ZkClient = null try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) zkClient.deleteRecursive(ZkUtils.getTopicPath(topic)) //其实最终还是通过删除zk里面对应的路径来实现删除topic的功能 println("deletion succeeded!") } catch { case e: Throwable => println("delection failed because of " + e.getMessage) println(Utils.stackTrace(e)) } finally { if (zkClient != null) zkClient.close() }因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。 Kafka删除的操作,首先,在zookeeper的/admin/delete_topic中添加要删除的topic,此时并不真实删除broker中的数据。 如果Kafka的配置中配置了delete.topic.enable=true,broker检查到zookeeper中的delete_topic中有要删除的项时,才会真实的去删除topic数据。否则(delete.topic.enable=false),topic数据不会被删除。