kafka删除topic分析

官方kafka中提供了创建和查看topic的命令: [attach]502[/attach] 从0.8的官方文档提供了一个删除topic的命令:
kafka-topics.sh --delete[size=13] [/size]
但是在运行时会报错找不到这个方法。 kafka-topics.sh最终是运行了kafka.admin.TopicCommand这个类,在0.8的源码中这个类中没有找到有delete topic相关的代码。   在kafka的admin包下,提供了一个DeleteTopicCommand的类,可以实现删除topic的功能。  kafka.admin.DeleteTopicCommand  其中删除topic的具体实现代码如下:
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Utils, ZKStringSerializer, ZkUtils}
.......
    val topic = options.valueOf(topicOpt)
    val zkConnect = options.valueOf(zkConnectOpt)
    var zkClient: ZkClient = null
    try {
      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
      zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))  //其实最终还是通过删除zk里面对应的路径来实现删除topic的功能
      println("deletion succeeded!")
    }
    catch {
      case e: Throwable =>
        println("delection failed because of " + e.getMessage)
        println(Utils.stackTrace(e))
    }
    finally {
      if (zkClient != null)
        zkClient.close()
    }
因为这个命令只会删除zk里面的信息,真实的数据还是没有删除,所以需要登录各个broker,把对应的topic的分区数据目录删除,也可能正因为这一点,delete命令才没有集成到kafka.admin.TopicCommand这个类。   Kafka删除的操作,首先,在zookeeper的/admin/delete_topic中添加要删除的topic,此时并不真实删除broker中的数据。 如果Kafka的配置中配置了delete.topic.enable=true,broker检查到zookeeper中的delete_topic中有要删除的项时,才会真实的去删除topic数据。否则(delete.topic.enable=false),topic数据不会被删除。

0 个评论

要回复文章请先登录注册