Zookeeper Too many connections情况

采菊篱下 回复了问题 • 2 人关注 • 1 个回复 • 554 次浏览 • 2016-11-03 12:05 • 来自相关话题

Elasticsearch中常用的API接口整理

小白菜 发表了文章 • 0 个评论 • 640 次浏览 • 2016-10-27 18:21 • 来自相关话题

Elasticsearch中常用API分类

elasticsearch中常用的API分类如下:
文档API: 提供对文档的增删改查操作搜索API: 提供对文档进行某个字段的查询索引API: 提供对索引进行操作,查看索引信息等查看API: 按照更直观的形式返回数据,更适用于控制台请求展示集群API: 对集群进行查看和操作的API
下面简单的一一介绍记录一下。
 

文档类API

Index API: 创建并建立索引
PUT twitter/tweet/1
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}官方文档参考:Index API 。
 
Get API: 获取文档
curl -XGET 'http://localhost:9200/twitter/tweet/1'官方文档参考:Get API 。
 
DELETE API: 删除文档
$ curl -XDELETE 'http://localhost:9200/twitter/tweet/1'官方文档参考:Delete API 。
 
UPDATE API: 更新文档
PUT test/type1/1
{
"counter" : 1,
"tags" : ["red"]
}官方文档参考:Update API 。
 
Multi Get API: 一次批量获取文档
curl 'localhost:9200/_mget' -d '{
"docs" : [
{
"_index" : "test",
"_type" : "type",
"_id" : "1"
},
{
"_index" : "test",
"_type" : "type",
"_id" : "2"
}
]
}'官方文档参考:Multi Get API 。
 
Bulk API: 批量操作,批量操作中可以执行增删改查
$ curl -s -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"type1","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}官方文档参考:Bulk API 。
 
DELETE By Query API: 根据查询删除
POST twitter/_delete_by_query
{
"query": {
"match": {
"message": "some message"
}
}
}官方文档参考:Delete By Query API 。
 
Update By Query API: 根据查询更新
POST twitter/_update_by_query?conflicts=proceed官方文档参考:Update By Query API 。
 
Reindex API:重建索引
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}官方文档参考:Reindex API 。
 
Term Vectors: 词组分析,只能针对一个文档
curl -XGET 'http://localhost:9200/twitter/tweet/1/_termvectors?pretty=true'官方文档参考:Term Vectors 。
 
Multi termvectors API: 多个文档的词组分析
curl 'localhost:9200/_mtermvectors' -d '{
"docs": [
{
"_index": "testidx",
"_type": "test",
"_id": "2",
"term_statistics": true
},
{
"_index": "testidx",
"_type": "test",
"_id": "1",
"fields": [
"text"
]
}
]
}'官方文档参考:Multi termvectors API 。 更多关于文档类API请参考:Document APIs 。
 

搜索类API

URI Search:url中传参
GET twitter/tweet/_search?q=user:kimchy官方文档参考:URI Search 。
 
Request Body搜索接口: 搜索的条件在请求的body中
GET /twitter/tweet/_search
{
"query" : {
"term" : { "user" : "kimchy" }
}
}官方文档参考:Request Body Search 。  
 
搜索模版设置接口: 可以设置搜索的模版,模版的功能是可以根据不同的传入参数,进行不同的实际搜索搜索分片查询接口: 查询这个搜索会使用到哪个索引和分片Suggest接口: 搜索建议接口,输入一个词,根据某个字段,返回搜索建议。批量搜索接口: 把批量请求放在一个文件中,批量搜索接口读取这个文件,进行搜索查询Count接口: 只返回符合搜索的文档个数文档存在接口: 判断是否有符合搜索的文档存在验证接口: 判断某个搜索请求是否合法,不合法返回错误信息解释接口: 使用这个接口能返回某个文档是否符合某个查询,为什么符合等信息抽出器接口: 简单来说,可以用这个接口指定某个文档符合某个搜索,事先未文档建立对应搜索
官方文档参考:Search APIS 。
 

索引类API

创建索引接口(POST my_index)删除索引接口(DELETE my_index)获取索引信息接口(GET my_index)索引是否存在接口(HEAD my_index)打开/关闭索引接口(my_index/_close, my_index/_open)设置索引映射接口(PUT my_index/_mapping)获取索引映射接口(GET my_index/_mapping)获取字段映射接口(GET my_index/_mapping/field/my_field)类型是否存在接口(HEAD my_index/my_type)删除映射接口(DELTE my_index/_mapping/my_type)索引别名接口(_aliases)更新索引设置接口(PUT my_index/_settings)获取索引设置接口(GET my_index/_settings)分析接口(_analyze): 分析某个字段是如何建立索引的建立索引模版接口(_template): 为索引建立模版,以后新创建的索引都可以按照这个模版进行初始化预热接口(_warmer): 某些查询可以事先预热,这样预热后的数据存放在内存中,增加后续查询效率状态接口(_status): 索引状态批量索引状态接口(_stats): 批量查询索引状态分片信息接口(_segments): 提供分片信息级别的信息索引恢复接口(_recovery): 进行索引恢复操作清除缓存接口(_cache/clear): 清除所有的缓存输出接口(_flush)刷新接口(_refresh)优化接口(_optimize): 对索引进行优化升级接口(_upgrade): 这里的升级指的是把索引升级到lucence的最新格式
官方文档参考:Indices APIS 。
 

查看类API

查看别名接口(_cat/aliases): 查看索引别名查看分配资源接口(_cat/allocation)查看文档个数接口(_cat/count)查看字段分配情况接口(_cat/fielddata)查看健康状态接口(_cat/health)查看索引信息接口(_cat/indices)查看master信息接口(_cat/master)查看nodes信息接口(_cat/nodes)查看正在挂起的任务接口(_cat/pending_tasks)查看插件接口(_cat/plugins)查看修复状态接口(_cat/recovery)查看线城池接口(_cat/thread_pool)查看分片信息接口(_cat/shards)查看lucence的段信息接口(_cat/segments)
官方文档参考:Cat APIS 。
 

集群类API

查看集群健康状态接口(_cluster/health)查看集群状况接口(_cluster/state)查看集群统计信息接口(_cluster/stats)查看集群挂起的任务接口(_cluster/pending_tasks)集群重新路由操作(_cluster/reroute)更新集群设置(_cluster/settings)节点状态(_nodes/stats)节点信息(_nodes)节点的热线程(_nodes/hot_threads)关闭节点(/nodes/_master/_shutdown)
官方文档参考:Cluster APIS 。  尽在:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html 查看全部
elasticsearch.png


Elasticsearch中常用API分类


elasticsearch中常用的API分类如下:
  • 文档API: 提供对文档的增删改查操作
  • 搜索API: 提供对文档进行某个字段的查询
  • 索引API: 提供对索引进行操作,查看索引信息等
  • 查看API: 按照更直观的形式返回数据,更适用于控制台请求展示
  • 集群API: 对集群进行查看和操作的API

下面简单的一一介绍记录一下。
 


文档类API


Index API: 创建并建立索引
PUT twitter/tweet/1
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"message" : "trying out Elasticsearch"
}
官方文档参考:Index API 。
 
Get API: 获取文档
curl -XGET 'http://localhost:9200/twitter/tweet/1'
官方文档参考:Get API 。
 
DELETE API: 删除文档
$ curl -XDELETE 'http://localhost:9200/twitter/tweet/1'
官方文档参考:Delete API 。
 
UPDATE API: 更新文档
PUT test/type1/1
{
"counter" : 1,
"tags" : ["red"]
}
官方文档参考:Update API 。
 
Multi Get API: 一次批量获取文档
curl 'localhost:9200/_mget' -d '{
"docs" : [
{
"_index" : "test",
"_type" : "type",
"_id" : "1"
},
{
"_index" : "test",
"_type" : "type",
"_id" : "2"
}
]
}'
官方文档参考:Multi Get API 。
 
Bulk API: 批量操作,批量操作中可以执行增删改查
$ curl -s -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"type1","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
官方文档参考:Bulk API 。
 
DELETE By Query API: 根据查询删除
POST twitter/_delete_by_query
{
"query": {
"match": {
"message": "some message"
}
}
}
官方文档参考:Delete By Query API 。
 
Update By Query API: 根据查询更新
POST twitter/_update_by_query?conflicts=proceed
官方文档参考:Update By Query API 。
 
Reindex API:重建索引
POST _reindex
{
"source": {
"index": "twitter"
},
"dest": {
"index": "new_twitter"
}
}
官方文档参考:Reindex API 。
 
Term Vectors: 词组分析,只能针对一个文档
curl -XGET 'http://localhost:9200/twitter/tweet/1/_termvectors?pretty=true'
官方文档参考:Term Vectors 。
 
Multi termvectors API: 多个文档的词组分析
curl 'localhost:9200/_mtermvectors' -d '{
"docs": [
{
"_index": "testidx",
"_type": "test",
"_id": "2",
"term_statistics": true
},
{
"_index": "testidx",
"_type": "test",
"_id": "1",
"fields": [
"text"
]
}
]
}'
官方文档参考:Multi termvectors API 。 更多关于文档类API请参考:Document APIs 。
 


搜索类API


URI Search:url中传参
GET twitter/tweet/_search?q=user:kimchy
官方文档参考:URI Search 。
 
Request Body搜索接口: 搜索的条件在请求的body中
GET /twitter/tweet/_search
{
"query" : {
"term" : { "user" : "kimchy" }
}
}
官方文档参考:Request Body Search 。  
 
  • 搜索模版设置接口: 可以设置搜索的模版,模版的功能是可以根据不同的传入参数,进行不同的实际搜索
  • 搜索分片查询接口: 查询这个搜索会使用到哪个索引和分片
  • Suggest接口: 搜索建议接口,输入一个词,根据某个字段,返回搜索建议。
  • 批量搜索接口: 把批量请求放在一个文件中,批量搜索接口读取这个文件,进行搜索查询
  • Count接口: 只返回符合搜索的文档个数
  • 文档存在接口: 判断是否有符合搜索的文档存在
  • 验证接口: 判断某个搜索请求是否合法,不合法返回错误信息
  • 解释接口: 使用这个接口能返回某个文档是否符合某个查询,为什么符合等信息
  • 抽出器接口: 简单来说,可以用这个接口指定某个文档符合某个搜索,事先未文档建立对应搜索

官方文档参考:Search APIS 。
 


索引类API


  • 创建索引接口(POST my_index)
  • 删除索引接口(DELETE my_index)
  • 获取索引信息接口(GET my_index)
  • 索引是否存在接口(HEAD my_index)
  • 打开/关闭索引接口(my_index/_close, my_index/_open)
  • 设置索引映射接口(PUT my_index/_mapping)
  • 获取索引映射接口(GET my_index/_mapping)
  • 获取字段映射接口(GET my_index/_mapping/field/my_field)
  • 类型是否存在接口(HEAD my_index/my_type)
  • 删除映射接口(DELTE my_index/_mapping/my_type)
  • 索引别名接口(_aliases)
  • 更新索引设置接口(PUT my_index/_settings)
  • 获取索引设置接口(GET my_index/_settings)
  • 分析接口(_analyze): 分析某个字段是如何建立索引的
  • 建立索引模版接口(_template): 为索引建立模版,以后新创建的索引都可以按照这个模版进行初始化
  • 预热接口(_warmer): 某些查询可以事先预热,这样预热后的数据存放在内存中,增加后续查询效率
  • 状态接口(_status): 索引状态
  • 批量索引状态接口(_stats): 批量查询索引状态
  • 分片信息接口(_segments): 提供分片信息级别的信息
  • 索引恢复接口(_recovery): 进行索引恢复操作
  • 清除缓存接口(_cache/clear): 清除所有的缓存
  • 输出接口(_flush)
  • 刷新接口(_refresh)
  • 优化接口(_optimize): 对索引进行优化
  • 升级接口(_upgrade): 这里的升级指的是把索引升级到lucence的最新格式

官方文档参考:Indices APIS 。
 


查看类API


  • 查看别名接口(_cat/aliases): 查看索引别名
  • 查看分配资源接口(_cat/allocation)
  • 查看文档个数接口(_cat/count)
  • 查看字段分配情况接口(_cat/fielddata)
  • 查看健康状态接口(_cat/health)
  • 查看索引信息接口(_cat/indices)
  • 查看master信息接口(_cat/master)
  • 查看nodes信息接口(_cat/nodes)
  • 查看正在挂起的任务接口(_cat/pending_tasks)
  • 查看插件接口(_cat/plugins)
  • 查看修复状态接口(_cat/recovery)
  • 查看线城池接口(_cat/thread_pool)
  • 查看分片信息接口(_cat/shards)
  • 查看lucence的段信息接口(_cat/segments)

官方文档参考:Cat APIS 。
 


集群类API


  • 查看集群健康状态接口(_cluster/health)
  • 查看集群状况接口(_cluster/state)
  • 查看集群统计信息接口(_cluster/stats)
  • 查看集群挂起的任务接口(_cluster/pending_tasks)
  • 集群重新路由操作(_cluster/reroute)
  • 更新集群设置(_cluster/settings)
  • 节点状态(_nodes/stats)
  • 节点信息(_nodes)
  • 节点的热线程(_nodes/hot_threads)
  • 关闭节点(/nodes/_master/_shutdown)

官方文档参考:Cluster APIS 。  尽在:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

kafka consumer offset out of range错误

回复

Ansible 回复了问题 • 1 人关注 • 2 个回复 • 848 次浏览 • 2016-10-27 14:18 • 来自相关话题

Kafka topic 常用命令介绍

采菊篱下 发表了文章 • 0 个评论 • 559 次浏览 • 2016-10-26 22:47 • 来自相关话题

本文主要记录平时kafka topic命令常使用的命令集,包括listTopic,createTopic,deleteTopic和describeTopic和alertTopic等,我这里是基于kafka 0.8.1.1版本,具体情况如下所示。
 
一、 describe topic 显示topic详细信息# ./kafka-topics.sh --describe --zookeeper localhost:2181
Topic:mobTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: mobTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: mobTopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: mobTopic Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: mobTopic Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic:serverTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: serverTopic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: serverjsTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: serverjsTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: serverjsTopic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic:bugTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: bugTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: bugTopic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: bugTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: bugTopic Partition: 3 Leader: 1 Replicas: 1 Isr: 11. 如上面可见,如果指定了--topic就是只显示给定topic的信息,否则显示所有topic的详细信息。
2. 如果指定了under-replicated-partitions,那么就显示那些副本数量不足的分区(ISR size < AR.size)
3. 如果指定了unavailable-partitions,那么就显示那些leader副本已不可用的分区
4. 从zookeeper上获取当前所有可用的broker
5. 遍历每个要describe的topic,
6. 获取这个topic的分区副本分配信息,若该信息不存在说明topic不存在
7. 否则将分配信息按照分区号进行排序
10. 如果没有指定步骤2中的参数也没有指定步骤3中的参数,那么显示分区数信息、副本系数信息以及配置信息
11. 默认情况下还会显示各个分区的信息
12. 从zookeeper中获取每个分区的ISR、Leader、AR信息并显示
 
二、create topic 创建topic# kafka-topics.sh --zookeeper localhost:2181 --create --topic mobTopic --replication-factor 1 --partitions 4
 从命令行中获取要创建的topic名称解析命令行指定的topic配置(如果存在的话),配置都是x=a的格式若指定了replica-assignment参数表明用户想要自己分配分区副本与broker的映射——通常都不这么做,如果不提供该参数Kafka帮你做这件事情检查必要的参数是否已指定,包括:zookeeper, replication-factor,partition和topic获取/brokers/ids下所有broker并按照broker id进行升序排序在broker上分配各个分区的副本映射 (没有指定replica-assignment参数,这也是默认的情况)检查topic名字合法性、自定义配置的合法性,并且要保证每个分区都必须有相同的副本数若zookeeper上已有对应的路径存在,直接抛出异常表示该topic已经存在确保某个分区的多个副本不会被分配到同一个broker若提供了自定义的配置,更新zookeeper的/config/topics/[topic]节点的数据创建/brokers/topics/[topic]节点,并将分区副本分配映射数据写入该节点
  
三、delete topic 删除topic# ./kafka-topics.sh --zookeeper locahost:2181 --delete --topic mobTopic
获取待删除的topic,如果没有指定--topic就是删除所有的topic对于每个要删除的topic,在zookeeper上的/admin/delete_topics下创建对应的子节点。kafka目前的删除topic逻辑只是在Zookeeper上标记而已,会有专门的线程负责监听该路径下的变更并负责更新zookeeper上其他节点上的数据,但底层的日志文件目前还是需要手动删除。
   
四、alert 修改topic的partion# ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mobTopic --partitions 10减少目前kakfa应该是不支持. 查看全部
kafka.png

本文主要记录平时kafka topic命令常使用的命令集,包括listTopic,createTopic,deleteTopic和describeTopic和alertTopic等,我这里是基于kafka 0.8.1.1版本,具体情况如下所示。
 
一、 describe topic 显示topic详细信息
# ./kafka-topics.sh --describe --zookeeper localhost:2181
Topic:mobTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: mobTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: mobTopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: mobTopic Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: mobTopic Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic:serverTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: serverTopic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: serverjsTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: serverjsTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: serverjsTopic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic:bugTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: bugTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: bugTopic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: bugTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: bugTopic Partition: 3 Leader: 1 Replicas: 1 Isr: 1
1. 如上面可见,如果指定了--topic就是只显示给定topic的信息,否则显示所有topic的详细信息。
2. 如果指定了under-replicated-partitions,那么就显示那些副本数量不足的分区(ISR size < AR.size)
3. 如果指定了unavailable-partitions,那么就显示那些leader副本已不可用的分区
4. 从zookeeper上获取当前所有可用的broker
5. 遍历每个要describe的topic,
6. 获取这个topic的分区副本分配信息,若该信息不存在说明topic不存在
7. 否则将分配信息按照分区号进行排序
10. 如果没有指定步骤2中的参数也没有指定步骤3中的参数,那么显示分区数信息、副本系数信息以及配置信息
11. 默认情况下还会显示各个分区的信息
12. 从zookeeper中获取每个分区的ISR、Leader、AR信息并显示
 
二、create topic 创建topic
# kafka-topics.sh --zookeeper localhost:2181 --create --topic mobTopic --replication-factor 1  --partitions 4

  1.  从命令行中获取要创建的topic名称
  2. 解析命令行指定的topic配置(如果存在的话),配置都是x=a的格式
  3. 若指定了replica-assignment参数表明用户想要自己分配分区副本与broker的映射——通常都不这么做,如果不提供该参数Kafka帮你做这件事情
  4. 检查必要的参数是否已指定,包括:zookeeper, replication-factor,partition和topic
  5. 获取/brokers/ids下所有broker并按照broker id进行升序排序
  6. 在broker上分配各个分区的副本映射 (没有指定replica-assignment参数,这也是默认的情况)
  7. 检查topic名字合法性、自定义配置的合法性,并且要保证每个分区都必须有相同的副本数
  8. 若zookeeper上已有对应的路径存在,直接抛出异常表示该topic已经存在
  9. 确保某个分区的多个副本不会被分配到同一个broker
  10. 若提供了自定义的配置,更新zookeeper的/config/topics/[topic]节点的数据
  11. 创建/brokers/topics/[topic]节点,并将分区副本分配映射数据写入该节点

  
三、delete topic 删除topic
# ./kafka-topics.sh --zookeeper locahost:2181 --delete --topic mobTopic

  1. 获取待删除的topic,如果没有指定--topic就是删除所有的topic
  2. 对于每个要删除的topic,在zookeeper上的/admin/delete_topics下创建对应的子节点。kafka目前的删除topic逻辑只是在Zookeeper上标记而已,会有专门的线程负责监听该路径下的变更并负责更新zookeeper上其他节点上的数据,但底层的日志文件目前还是需要手动删除。

   
四、alert 修改topic的partion
# ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mobTopic --partitions 10
减少目前kakfa应该是不支持.

Druid任务分配策略配置详解

Geek小A 发表了文章 • 1 个评论 • 455 次浏览 • 2016-10-21 14:20 • 来自相关话题

在说任务配置策略之前,先给大家看一下druid任务处理的大概架构图




如上图可以看出overlord节点如何将任务分配到middlemanager节点进行处理,如果在架构中有多个middlemanager节点,那任务将怎么分配呢,分配的测试是什么?
 
默认策略是fillCapacity, 意思是当一个MiddleManager的worker capacity满了的时候,再有任务到来时,才会分配给另外的MiddleManager节点。
 
补充: middlemanager的capacity意思是,能容纳任务的数量,通过修改middleManager节点下的 runtime.properties配置文件里的druid.worker.capacity属性配置。 




那么,除了这个策略,还有其他策略吗?另外,这个策略如何修改呢? 除了这个策略,还有fillCapacityWithAffinity, equalDistribution and javascript策略,那么策略如何修改呢? 
 
通过向Overlord节点发送个一个HTTP请求来修改,实质上是修改保存druid元数据的数据库,即 MetadataStorage,修改步骤如下: 
http://10.1.3.9:8090/druid/indexer/v1/worker(http://<OVERLORD_IP>: <port>/druid/indexer/v1/worke
X-Druid-Author sdx(修改配置的作者,可以随意写) X-Druid-Comment equal policy(修改配置的注释,可以随意写) Content-Type application/json


http://10.1.3.9:8090/druid/indexer/v1/worker(http://<OVERLORD_IP>: <port>/druid/indexer/v1/worker)
{
"selectStrategy": {
"type": "equalDistribution" }
}



通过访问http://10.1.3.9:8090/druid/indexer/v1/worker/history查看配置是否成功 




或者通过查看MetadataStorage的druid_conifg和druid_audit表查看是否配置成功 
 
注意: linux上通过如下指令配置:curl -XPOST -H 'X-Druid-Author: lucky' -H 'X-Druid-Comment: lucky' -H 'Content-Type: application/json' http://10.1.3.9:8090/druid/indexer/v1/worker -d '{ "selectStrategy": { "type": "equalDistribution" } }'更多内容请参考官网:http://druid.io/docs/0.9.1.1/configuration/indexing-service.html。 查看全部
Druid.png

在说任务配置策略之前,先给大家看一下druid任务处理的大概架构图
DruidWorkFlow.png

如上图可以看出overlord节点如何将任务分配到middlemanager节点进行处理,如果在架构中有多个middlemanager节点,那任务将怎么分配呢,分配的测试是什么?
 
默认策略是fillCapacity, 意思是当一个MiddleManager的worker capacity满了的时候,再有任务到来时,才会分配给另外的MiddleManager节点。
 
补充: middlemanager的capacity意思是,能容纳任务的数量,通过修改middleManager节点下的 runtime.properties配置文件里的druid.worker.capacity属性配置。 
capacity.png

那么,除了这个策略,还有其他策略吗?另外,这个策略如何修改呢? 除了这个策略,还有fillCapacityWithAffinity, equalDistribution and javascript策略,那么策略如何修改呢? 
 
通过向Overlord节点发送个一个HTTP请求来修改,实质上是修改保存druid元数据的数据库,即 MetadataStorage,修改步骤如下: 
http://10.1.3.9:8090/druid/indexer/v1/worker(http://<OVERLORD_IP>: <port>/druid/indexer/v1/worke
X-Druid-Author sdx(修改配置的作者,可以随意写) X-Druid-Comment equal policy(修改配置的注释,可以随意写) Content-Type application/json
postman.png
http://10.1.3.9:8090/druid/indexer/v1/worker(http://<OVERLORD_IP>: <port>/druid/indexer/v1/worker)
{
"selectStrategy": {
"type": "equalDistribution" }
}
send.png

通过访问http://10.1.3.9:8090/druid/indexer/v1/worker/history查看配置是否成功 
history.png

或者通过查看MetadataStorage的druid_conifg和druid_audit表查看是否配置成功 
 
注意: linux上通过如下指令配置:
curl -XPOST -H 'X-Druid-Author: lucky' -H 'X-Druid-Comment: lucky' -H 'Content-Type: application/json' http://10.1.3.9:8090/druid/indexer/v1/worker -d '{ "selectStrategy": { "type": "equalDistribution" } }'
更多内容请参考官网:http://druid.io/docs/0.9.1.1/configuration/indexing-service.html

Druid中Segements保留和自动删除规则配置

采菊篱下 发表了文章 • 0 个评论 • 421 次浏览 • 2016-10-20 22:43 • 来自相关话题

经测试发现: 
DeepStorage里所有的segements都需要在Historical节点中有一份。其实这样说是不严格的,有时候我们需要DeepStorage里所有的segements(或者某类datasource)在Historical节点中有一份或者n份。这样做的好处是,提高数据查询效率,那么这个n在哪里配置呢?
 
原来是在druid_rules表里面配置,默认情况下,druid_rules表里面只有一条数据,其中payload字段默认值如下:[{"tieredReplicants":{"_default_tier":2},"type":"loadForever"}]意思是 保证deepstorage里面的数据,在Historical节点集群存在两份,即副本为2,这两份数据一定保存在不同的Historical服务器。
 
如果只有一台Historical服务器,那么则只会有一份数据,如果你添加一台Historical服务器,则就会在新的节点复制一份数据。
 
如果想修改默认的副本数,不需要数据备份,进行如下操作就好:update druid_rules set payload='[{"tieredReplicants":{"_default_tier":1},"type":"loadForever"}]' where id="_default_2016-09-23T08:50:09.457Z";只需把_default_tier的值改为1即可,id得看druid_rules表中的具体值。
 
segment执行过程如下:
聚合任务生成segment将segment push到Deep Storage Historical节点从 DeepStorage加载segmentsegment加载成功后,调用回调方法结束任务 
 
所以,如果Historical节点硬盘上缓存的segment占满磁盘空间,任务会一直挂起, 最后任务数量达到MiddleManager节点的容量,导致任务排队。 
 
那么现实业务中,如果DeepStorage里所有的segments 都需要在Historical节点中有一份,会非常 浪费空间,浪费空间就是浪费金钱。 
 
很不能理解这种方式,并且我们对DeepStorage节点和Historical节点之间的关系一直都是这样理解的,当查询的数据不在Historical节点的时候,才会从DeepStorage加载。但是,现实是残酷的,现实不是这样的。 
 
如何解决这个问题呢?这时我们就需要用到druid的数据保留和自动删除规则配置。 通过这个配置,我们可以为每个datasource配置一个据保留和自动删除规则。 
 
这个配置可以通过druid提供的HTTP接口配置,也可以通过Coordinator界面配置,如下: 




此配置的意思是: 我们为agentToic-1m设置了两个rule,第一个rule的意思是(Load-Period-P30D)保 留最近30天的数据。第二个rule的意思是(Drop-Forever)删除所有的数据。另外还需要填写,修改 配置的作者和注释。最后Save all rules。 通过如下界面查看,配置是否成功,或者通过查看MetadataStorage的druid_rules表查看配置是否成功。




最后,结合下图,观察左侧segment列表是否会发生变化(shareds的数量和intergvals的数量)。 




经过验证,左侧列表只会展示最近30天的数据,通过查看MetadataStorage的druid_segments表, 发现30天以前的数据都被假删了,即used字段设置为了0,表示配置成功。 
 
注意: 
如果druid_segments表中的某条数据used字段为0,即此条数据对应的segment不再支持可查,同 时会再Historical节点删除。 如果上面的P30D改为P1M,意思是1个月,这个月不是自然月,而是最近30天的意思。 如果按照如上的方式设置了,再修改第一条配置规则,改为P50D,那么数据是不能恢复的,即还 是只会保留最近30天的数据。 一个笨的恢复数据方法是,可以通过修改MetadataStorage的druid_segments表中的used字段来恢复数据。 
 
论坛:If you configure a per datasource rule that drops data for the current month, and there i
s a default rule where everything is loaded, then yes, data for the current month is dropp ed and all older data is loaded. If you instead configure a load rule for the current month followed by a drop rule for everything else, then the current month of data is kept, and
all older data is dropped.具体内容,参考官网:http://druid.io/docs/latest/operations/rule-configuration.html
任务分配策略,会让Geek小A  明天写! 查看全部
Druid.png

经测试发现: 
DeepStorage里所有的segements都需要在Historical节点中有一份。其实这样说是不严格的,有时候我们需要DeepStorage里所有的segements(或者某类datasource)在Historical节点中有一份或者n份。这样做的好处是,提高数据查询效率,那么这个n在哪里配置呢?
 
原来是在druid_rules表里面配置,默认情况下,druid_rules表里面只有一条数据,其中payload字段默认值如下:
[{"tieredReplicants":{"_default_tier":2},"type":"loadForever"}]
意思是 保证deepstorage里面的数据,在Historical节点集群存在两份,即副本为2,这两份数据一定保存在不同的Historical服务器。
 
如果只有一台Historical服务器,那么则只会有一份数据,如果你添加一台Historical服务器,则就会在新的节点复制一份数据。
 
如果想修改默认的副本数,不需要数据备份,进行如下操作就好:
update druid_rules set payload='[{"tieredReplicants":{"_default_tier":1},"type":"loadForever"}]' where id="_default_2016-09-23T08:50:09.457Z";
只需把_default_tier的值改为1即可,id得看druid_rules表中的具体值。
 
segment执行过程如下:
  1. 聚合任务生成segment
  2. 将segment push到Deep Storage 
  3. Historical节点从 DeepStorage加载segment
  4. segment加载成功后,调用回调方法结束任务 

 
所以,如果Historical节点硬盘上缓存的segment占满磁盘空间,任务会一直挂起, 最后任务数量达到MiddleManager节点的容量,导致任务排队。 
 
那么现实业务中,如果DeepStorage里所有的segments 都需要在Historical节点中有一份,会非常 浪费空间,浪费空间就是浪费金钱。 
 
很不能理解这种方式,并且我们对DeepStorage节点和Historical节点之间的关系一直都是这样理解的,当查询的数据不在Historical节点的时候,才会从DeepStorage加载。但是,现实是残酷的,现实不是这样的。 
 
如何解决这个问题呢?这时我们就需要用到druid的数据保留和自动删除规则配置。 通过这个配置,我们可以为每个datasource配置一个据保留和自动删除规则。 
 
这个配置可以通过druid提供的HTTP接口配置,也可以通过Coordinator界面配置,如下: 
coordinate.png

此配置的意思是: 我们为agentToic-1m设置了两个rule,第一个rule的意思是(Load-Period-P30D)保 留最近30天的数据。第二个rule的意思是(Drop-Forever)删除所有的数据。另外还需要填写,修改 配置的作者和注释。最后Save all rules。 通过如下界面查看,配置是否成功,或者通过查看MetadataStorage的druid_rules表查看配置是否成功。
rules.png

最后,结合下图,观察左侧segment列表是否会发生变化(shareds的数量和intergvals的数量)。 
datasource.png

经过验证,左侧列表只会展示最近30天的数据,通过查看MetadataStorage的druid_segments表, 发现30天以前的数据都被假删了,即used字段设置为了0,表示配置成功。 
 
注意: 
如果druid_segments表中的某条数据used字段为0,即此条数据对应的segment不再支持可查,同 时会再Historical节点删除。 如果上面的P30D改为P1M,意思是1个月,这个月不是自然月,而是最近30天的意思。 如果按照如上的方式设置了,再修改第一条配置规则,改为P50D,那么数据是不能恢复的,即还 是只会保留最近30天的数据。 一个笨的恢复数据方法是,可以通过修改MetadataStorage的druid_segments表中的used字段来恢复数据。 
 
论坛:
If you configure a per datasource rule that drops data for the current month, and there i
s a default rule where everything is loaded, then yes, data for the current month is dropp ed and all older data is loaded. If you instead configure a load rule for the current month followed by a drop rule for everything else, then the current month of data is kept, and
all older data is dropped.
具体内容,参考官网:http://druid.io/docs/latest/operations/rule-configuration.html
任务分配策略,会让Geek小A  明天写!

Python的数据序列化「Json & Pickle」

采菊篱下 发表了文章 • 0 个评论 • 549 次浏览 • 2016-10-20 00:06 • 来自相关话题

在介绍Python的数据序列化模块「Json & Pickle」之前,我们先来看看为什么需要数据序列化,什么是数据序列化。
 
为什么需要数据序列化,我认为有如下两种原因:
一个原因是将对象(一切皆对象)的状态保持在存储媒介(硬盘、网盘......)中,以便可以在以后重新创建精确的副本,相当于镜像的概念,比如我们平时利用VMware虚拟机中的挂起功能,这个挂起功能就是利用数据的序列化,把虚拟机当前的状态序列化保存在本地磁盘的文件中,然后恢复的时候只需反序列化,把状态恢复即可。
 
另一个原因是通过值将对象从一个应用程序域发送到另一个应用程序域中。例如,你利用Python监控采集程序采集到的数据想传送给Zabbix处理。当两个进程在进行远程通信时,彼此可以发送各种类型的数据。无论是何种类型的数据,都会以二进制序列的形式在网络上传送。发送方需要把这个对象转换为字节序列,才能在网络上传送;接收方则需要把字节序列再恢复为对象。
 
序列化和反序列化:
序列化: 将数据结构或对象转换成二进制串的过程。反序列化:将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。

序列化的目的就是为了跨进程传递格式化数据和保存某个时刻的状态。
 
什么是数据序列化:
数据序列化就是将对象或者数据结构转化成特定的格式,使其可在网络中传输,或者可存储在内存或者文件中。反序列化则是相反的操作,将对象从序列化数据中还原出来。而对象序列化后的数据格式可以是二进制,可以是XML,也可以是JSON等任何格式。对象/数据序列化的重点在于数据的交换和传输,例如在远程调用技术(如EJB,XML-RPC, Web Service),或者在GUI控件开发(JavaBean)等等。
 
清楚了数据格式化的必要和简单认识了什么是数据格式化之后,我们就来看看Python中两个数据格式化模块的使用。
 

Json Module

Json:用于字符串和 python数据类型间进行转换;
Json模块提供了四个功能:dumps、dump、loads、load
dumps把数据类型转换成字符串 dump把数据类型转换成字符串并存储在文件中 loads把字符串转换成数据类型  load把文件打开从字符串转换成数据类型
实例如下:
#!/usr/bin/env python3
# _*_coding:utf-8_*_
# Author: Lucky.chen

import json

info = {'1MinLoad': 5, 'MemUse': '5G', 'DiskUse': '80G'}

print('dumps 操作之前数据类型: %s' % type(info))
JsonInfo = json.dumps(info)
print(JsonInfo)
# dumps 将数据通过特殊的形式转换为所有程序语言都识别的字符串
print('dumps 操作之后数据类型: %s' % type(JsonInfo))

# loads 将字符串通过特殊的形式转为python是数据类型 (将字符串转为字典)
NewInfo = json.loads(JsonInfo)
print('loads 操作之后数据类型为: %s' % type(NewInfo))

print('分割线'.center(50, '-'))

# dump 将数据通过特殊的形式转换为所有语言都识别的字符串并写入文件
with open('SystemInfo.txt', 'w') as f:
json.dump(info, f)
print('dump file end!!')

# load 从文件读取字符串并转换为python的数据类型
with open('SystemInfo.txt', 'r') as f:
LoadInfo = json.load(f)
print('load file end, data type is %s' % type(LoadInfo), LoadInfo)结果如下:
dumps 操作之前数据类型: <class 'dict'>
{"MemUse": "5G", "DiskUse": "80G", "1MinLoad": 5}
dumps 操作之后数据类型: <class 'str'>
loads 操作之后数据类型为: <class 'dict'>
-----------------------分割线------------------------
dump file end!!
load file end, data type is <class 'dict'> {'MemUse': '5G', '1MinLoad': 5, 'DiskUse': '80G'}一个错误案例如下:
#!/usr/bin/env python3
# _*_coding:utf-8_*_
# Author: Lucky.chen

import json


def test():
print('Test Func')

info = {'Name': 'crh', 'age': 18, 'Func': test}

json.dumps(info)结果:
raise TypeError(repr(o) + " is not JSON serializable")
TypeError: <function test at 0x108e7a0d0> is not JSON serializable如上可知函数不能被json序列化。
 

Pickle Module

pickle,用于python特有的类型 和 python的数据类型间进行转换
Pickle模块同样提供了四个功能:dumps、dump、loads、load
dumps把数据类型转换成字符串 dump把数据类型转换成字符串并存储在文件中 loads把字符串转换成数据类型  load把文件打开从字符串转换成数据类型

Pickle可以序列化一些较复杂的数据,和json的区别在于pickle序列化的时候,存放的是二进制的文件,所以打开一个文件的时候,我们要以二进制的格式打开。
 实例如下:
#!/usr/bin/env python3
# _*_coding:utf-8_*_
# Author: Lucky.chen

import pickle


def test(name):
print('%s write Test Func' % name)

info = {'Name': 'crh', 'age': 18, 'Func': test}

print('dumps 之前数据的类型为: %s' % type(info))

# pickle.dumps 将数据通过特殊的形式转换为只有python语言认识bytes类型(Python2.*中是字符串类型)
NewInfo = pickle.dumps(info)
print('dumps result is %s, data type is %s' % (NewInfo, type(NewInfo)))

# pickle.loads 将bytes通过特殊的形式转为python是数据类型
LoadInfo = pickle.loads(NewInfo)
print('loads result is %s, data type is %s' % (LoadInfo, type(LoadInfo)))
LoadInfo['Func']('crh')

print('分割线'.center(50, '-'))

# pickle.dump 将数据通过特殊的形式转换为只有python语言认识的字符串,并写入文件
with open('pickle.rb', 'wb') as f:
pickle.dump(info, f)

# pickle.load 从文件读取只有python语言认识的字符串并转换为python的数据类型
with open('pickle.rb', 'rb') as f:
Info = pickle.load(f)

print(Info, 'type is %s' % type(Info))结果如下:
dumps 之前数据的类型为: <class 'dict'>
dumps result is b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x12X\x04\x00\x00\x00Nameq\x02X\x03\x00\x00\x00crhq\x03X\x04\x00\x00\x00Funcq\x04c__main__\ntest\nq\x05u.', data type is <class 'bytes'>
loads result is {'age': 18, 'Name': 'crh', 'Func': <function test at 0x1032f10d0>}, data type is <class 'dict'>
crh write Test Func
-----------------------分割线------------------------
{'age': 18, 'Name': 'crh', 'Func': <function test at 0x1032f10d0>} type is <class 'dict'>
 

总结

很多情况下不同的程序之间传送数据我们一般通过文件的方式,但是这个方法是最原始的,而dumps可以直接让数据格式化传送给对方,但是不是所有的程序都是python的,所以只利用pickle是不现实的,比如一个python的程序需要发送一段数据给一个java程序开发的应用,这时候很多内存数据的交换,就得用json了。
 
并且josn能dump的结果更可读,那么有人就问了,那还用pickle做什么不直接用josn,是这样的josn只能把常用的数据类型序列化(列表、字典、列表、字符串、数字、),比如日期格式、类对象!josn就不行了。
 
为什么他不能序列化上面的东西呢?因为josn是跨语言的!注定了它只能规范出一些通用的数据类型的格式,统一标准。
  查看全部
jason.jpg
在介绍Python的数据序列化模块「Json & Pickle」之前,我们先来看看为什么需要数据序列化,什么是数据序列化。
 
为什么需要数据序列化,我认为有如下两种原因:
一个原因是将对象(一切皆对象)的状态保持在存储媒介(硬盘、网盘......)中,以便可以在以后重新创建精确的副本,相当于镜像的概念,比如我们平时利用VMware虚拟机中的挂起功能,这个挂起功能就是利用数据的序列化,把虚拟机当前的状态序列化保存在本地磁盘的文件中,然后恢复的时候只需反序列化,把状态恢复即可。
 
另一个原因是通过值将对象从一个应用程序域发送到另一个应用程序域中。例如,你利用Python监控采集程序采集到的数据想传送给Zabbix处理。当两个进程在进行远程通信时,彼此可以发送各种类型的数据。无论是何种类型的数据,都会以二进制序列的形式在网络上传送。发送方需要把这个对象转换为字节序列,才能在网络上传送;接收方则需要把字节序列再恢复为对象。
 
序列化和反序列化:
  • 序列化: 将数据结构或对象转换成二进制串的过程。
  • 反序列化:将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。


序列化的目的就是为了跨进程传递格式化数据和保存某个时刻的状态。
 
什么是数据序列化:
数据序列化就是将对象或者数据结构转化成特定的格式,使其可在网络中传输,或者可存储在内存或者文件中。反序列化则是相反的操作,将对象从序列化数据中还原出来。而对象序列化后的数据格式可以是二进制,可以是XML,也可以是JSON等任何格式。对象/数据序列化的重点在于数据的交换和传输,例如在远程调用技术(如EJB,XML-RPC, Web Service),或者在GUI控件开发(JavaBean)等等。
 
清楚了数据格式化的必要和简单认识了什么是数据格式化之后,我们就来看看Python中两个数据格式化模块的使用。
 


Json Module


Json:用于字符串和 python数据类型间进行转换;
Json模块提供了四个功能:dumps、dump、loads、load
  1. dumps把数据类型转换成字符串 
  2. dump把数据类型转换成字符串并存储在文件中 
  3. loads把字符串转换成数据类型  
  4. load把文件打开从字符串转换成数据类型

实例如下:
#!/usr/bin/env python3
# _*_coding:utf-8_*_
# Author: Lucky.chen

import json

info = {'1MinLoad': 5, 'MemUse': '5G', 'DiskUse': '80G'}

print('dumps 操作之前数据类型: %s' % type(info))
JsonInfo = json.dumps(info)
print(JsonInfo)
# dumps 将数据通过特殊的形式转换为所有程序语言都识别的字符串
print('dumps 操作之后数据类型: %s' % type(JsonInfo))

# loads 将字符串通过特殊的形式转为python是数据类型 (将字符串转为字典)
NewInfo = json.loads(JsonInfo)
print('loads 操作之后数据类型为: %s' % type(NewInfo))

print('分割线'.center(50, '-'))

# dump 将数据通过特殊的形式转换为所有语言都识别的字符串并写入文件
with open('SystemInfo.txt', 'w') as f:
json.dump(info, f)
print('dump file end!!')

# load 从文件读取字符串并转换为python的数据类型
with open('SystemInfo.txt', 'r') as f:
LoadInfo = json.load(f)
print('load file end, data type is %s' % type(LoadInfo), LoadInfo)
结果如下:
dumps 操作之前数据类型: <class 'dict'>
{"MemUse": "5G", "DiskUse": "80G", "1MinLoad": 5}
dumps 操作之后数据类型: <class 'str'>
loads 操作之后数据类型为: <class 'dict'>
-----------------------分割线------------------------
dump file end!!
load file end, data type is <class 'dict'> {'MemUse': '5G', '1MinLoad': 5, 'DiskUse': '80G'}
一个错误案例如下:
#!/usr/bin/env python3
# _*_coding:utf-8_*_
# Author: Lucky.chen

import json


def test():
print('Test Func')

info = {'Name': 'crh', 'age': 18, 'Func': test}

json.dumps(info)
结果:
 raise TypeError(repr(o) + " is not JSON serializable")
TypeError: <function test at 0x108e7a0d0> is not JSON serializable
如上可知函数不能被json序列化。
 


Pickle Module


pickle,用于python特有的类型 和 python的数据类型间进行转换
Pickle模块同样提供了四个功能:dumps、dump、loads、load
  1. dumps把数据类型转换成字符串 
  2. dump把数据类型转换成字符串并存储在文件中 
  3. loads把字符串转换成数据类型  
  4. load把文件打开从字符串转换成数据类型


Pickle可以序列化一些较复杂的数据,和json的区别在于pickle序列化的时候,存放的是二进制的文件,所以打开一个文件的时候,我们要以二进制的格式打开。
 实例如下:
#!/usr/bin/env python3
# _*_coding:utf-8_*_
# Author: Lucky.chen

import pickle


def test(name):
print('%s write Test Func' % name)

info = {'Name': 'crh', 'age': 18, 'Func': test}

print('dumps 之前数据的类型为: %s' % type(info))

# pickle.dumps 将数据通过特殊的形式转换为只有python语言认识bytes类型(Python2.*中是字符串类型)
NewInfo = pickle.dumps(info)
print('dumps result is %s, data type is %s' % (NewInfo, type(NewInfo)))

# pickle.loads 将bytes通过特殊的形式转为python是数据类型
LoadInfo = pickle.loads(NewInfo)
print('loads result is %s, data type is %s' % (LoadInfo, type(LoadInfo)))
LoadInfo['Func']('crh')

print('分割线'.center(50, '-'))

# pickle.dump 将数据通过特殊的形式转换为只有python语言认识的字符串,并写入文件
with open('pickle.rb', 'wb') as f:
pickle.dump(info, f)

# pickle.load 从文件读取只有python语言认识的字符串并转换为python的数据类型
with open('pickle.rb', 'rb') as f:
Info = pickle.load(f)

print(Info, 'type is %s' % type(Info))
结果如下:
dumps 之前数据的类型为: <class 'dict'>
dumps result is b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x12X\x04\x00\x00\x00Nameq\x02X\x03\x00\x00\x00crhq\x03X\x04\x00\x00\x00Funcq\x04c__main__\ntest\nq\x05u.', data type is <class 'bytes'>
loads result is {'age': 18, 'Name': 'crh', 'Func': <function test at 0x1032f10d0>}, data type is <class 'dict'>
crh write Test Func
-----------------------分割线------------------------
{'age': 18, 'Name': 'crh', 'Func': <function test at 0x1032f10d0>} type is <class 'dict'>

 


总结


很多情况下不同的程序之间传送数据我们一般通过文件的方式,但是这个方法是最原始的,而dumps可以直接让数据格式化传送给对方,但是不是所有的程序都是python的,所以只利用pickle是不现实的,比如一个python的程序需要发送一段数据给一个java程序开发的应用,这时候很多内存数据的交换,就得用json了。
 
并且josn能dump的结果更可读,那么有人就问了,那还用pickle做什么不直接用josn,是这样的josn只能把常用的数据类型序列化(列表、字典、列表、字符串、数字、),比如日期格式、类对象!josn就不行了。
 
为什么他不能序列化上面的东西呢?因为josn是跨语言的!注定了它只能规范出一些通用的数据类型的格式,统一标准。
 

Elasticsearch应用在数据中心的实时协议分析和安全威胁检测

小白菜 发表了文章 • 0 个评论 • 434 次浏览 • 2016-10-17 21:08 • 来自相关话题

数据中心面临的挑战

被DDOS攻击: 网络瘫痪,大面积影响业务植入后门发包: 占用带宽资源,消耗成本运营“黑盒子”: 无法分辨“好人”、“坏人”监控粒度粗: 无法及时响应并定位事件
 
早期解决方案 
Cacti 利用SNMP监控交换 机出入口流量交换机推送Sflow流量采样 数据,使用Solarwids监控遇到DDOS时,使用手动 Sniffer抓包分析
 
第一期改造后 




 
推送Netflow/Sflow 劣势
消耗路由器CPU资源100-1000:1采样比,监测粒度粗业务和应用识别依赖端口号,无法识别日新月异 的业务类型 
 
如何用数据驱动IDC运营 




 
NSM架构设计 




 
第二期改造后 




 
10G下的NSM :




 

实际效果展示 

NSM架构解析 




实时协议分析:Bro日志类型 




Flow: 数据格式 




实时安全威胁检测引擎 




Suricata Today 




实时流量 +ELK + VirusTotal 




构建10G+ NSM的几个关键点 
1、抓包网卡   
2、内核优化 
3、驱动与rss 
4、PF-Ring_zc 
5、ntop、nprobe、ndpi 
6、跨数据中心es 
流量抓包与网卡 




ELK部分的关键点 
1、用Logstash Kafka input接收数据
2、数据量大,处理结构复杂时:
      预设Kafka分区 
      开启多个Logstash实例,分别读取Kafka分区数据 
      分别写入不同es节点
3、多集群互联 
跨数据中心es集群 




10G NSM平台样例 




万兆 实时 安全大数据架构 





作者:张磊@Zooboa  查看全部


数据中心面临的挑战


  1. 被DDOS攻击: 网络瘫痪,大面积影响业务
  2. 植入后门发包: 占用带宽资源,消耗成本
  3. 运营“黑盒子”: 无法分辨“好人”、“坏人”
  4. 监控粒度粗: 无法及时响应并定位事件

 
早期解决方案 
  • Cacti 利用SNMP监控交换 机出入口流量
  • 交换机推送Sflow流量采样 数据,使用Solarwids监控
  • 遇到DDOS时,使用手动 Sniffer抓包分析

 
第一期改造后 
arch.png

 
推送Netflow/Sflow 劣势
  • 消耗路由器CPU资源
  • 100-1000:1采样比,监测粒度粗
  • 业务和应用识别依赖端口号,无法识别日新月异 的业务类型 

 
如何用数据驱动IDC运营 
nsm.png

 
NSM架构设计 
nsmarch.png

 
第二期改造后 
erqi.png

 
10G下的NSM :
nsm10g.png

 


实际效果展示 


NSM架构解析 
nsmp.png

实时协议分析:Bro日志类型 
bro.png

Flow: 数据格式 
format.png

实时安全威胁检测引擎 
sbro.png

Suricata Today 
suricata.png

实时流量 +ELK + VirusTotal 
ELK.png

构建10G+ NSM的几个关键点 
1、抓包网卡   
2、内核优化 
3、驱动与rss 
4、PF-Ring_zc 
5、ntop、nprobe、ndpi 
6、跨数据中心es 
流量抓包与网卡 
ll.png

ELK部分的关键点 
1、用Logstash Kafka input接收数据
2、数据量大,处理结构复杂时:
      预设Kafka分区 
      开启多个Logstash实例,分别读取Kafka分区数据 
      分别写入不同es节点
3、多集群互联 
跨数据中心es集群 
zone.png

10G NSM平台样例 
10g.png

万兆 实时 安全大数据架构 
WZ.png


作者:张磊@Zooboa 


Name node is in safe mode解决

Nock 发表了文章 • 0 个评论 • 286 次浏览 • 2016-10-16 21:05 • 来自相关话题

将本地文件拷贝到hdfs上去,结果上错误:Name node is in safe mode

这是因为在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。
 
可以通过以下命令来手动离开安全模式:
bin/hadoop dfsadmin -safemode leave 用户可以通过dfsadmin -safemode value 来操作安全模式,参数value的说明如下:
enter - 进入安全模式
leave - 强制NameNode离开安全模式
get - 返回安全模式是否开启的信息
wait - 等待,一直到安全模式结束。 查看全部
将本地文件拷贝到hdfs上去,结果上错误:Name node is in safe mode

这是因为在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。
 
可以通过以下命令来手动离开安全模式:
bin/hadoop dfsadmin -safemode leave  
用户可以通过dfsadmin -safemode value 来操作安全模式,参数value的说明如下:
enter - 进入安全模式
leave - 强制NameNode离开安全模式
get - 返回安全模式是否开启的信息
wait - 等待,一直到安全模式结束。

Elasticsearch备份和恢复

采菊篱下 发表了文章 • 0 个评论 • 528 次浏览 • 2016-10-15 13:04 • 来自相关话题

备份

备份数据之前,要创建一个仓库来保存数据,仓库的类型支持共享文件系统、Amazon S3、 HDFS和Azure Cloud。 
 
Elasticsearch的一大特点就是使用简单,api也比较强大,备份也不例外。简单来说,备份分两步:
创建一个仓库备份指定索引
 
一、创建存储仓库
 
共享文件系统实例如下:
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup
{
"type": "fs",
"settings": {
"location": "/mount/EsDataBackupDir"
}
}上面代码解释:
创建了一个名为EsBackup的存仓库指定的备份方式为共享文件系统(type: fs)指定共享存储的具体路径(location参数)
注意:共享存储路径,必须是所有的ES节点都可以访问的,最简单的就是nfs系统,然后每个节点都需要挂载到本地。
 
如上所示,创建存储仓库的时候,除了可以指定location参数以外,我们还可以指点max_snapshot_bytes_per_sec和max_restore_bytes_per_sec参数来限制备份和恢复时的速度,默认值都是20mb/s,假设我们有一个非常快的网络环境,我们可以增大默认值:
curl -XPOST http://127.0.0.1:9200/_snapshot/EsBackup
{
"type": "fs",
"settings": {
"location": "/mount/EsDataBackupDir"
"max_snapshot_bytes_per_sec" : "50mb",
"max_restore_bytes_per_sec" : "50mb"
}
}注意:这是在第一段代码的基础上来增加配置,第一段代码利用的是PUT请求来创建存储库,这段代码则是利用POST请求来更新已经存在的存储库的settings配置。 
Amazon S3存储库实例如下:
curl -XPUT 'http://localhost:9200/_snapshot/s3-backup' -d '{
"type": "s3",
"settings": {
"bucket": "esbackup",
"region": "cn-north-1",
"access_key": "xxooxxooxxoo",
"secret_key": "xxxxxxxxxooooooooooooyyyyyyyyy"
}
}'参数名词解释:
Type: 仓库类型Setting: 仓库的额外信息Region: AWS RegionAccess_key: 访问秘钥Secret_key: 私有访问秘钥Bucket: 存储桶名称
不同的ES版本支持的region参考:https://github.com/elastic/elasticsearch-cloud-aws#aws-cloud-plugin-for-elasticsearch
 
使用上面的命令,创建一个仓库(s3-backup),并且还创建了存储桶(esbackup),返回{"acknowledged":true} 信息证明创建成功。
 
确认存储桶是否创建成功:curl -XPOST http://localhost:9200/_snapshot/s3-backup/_verify
查看刚创建的存储桶:curl -XGET localhost:9200/_snapshot/s3-backup?pretty
查看所有的存储桶:curl -XGET localhost:9200/_snapshot/_all?pretty
删除一个快照存储桶:curl -XDELETE localhost:9200/_snapshot/s3-backup?pretty
 
二、备份索引
创建好存储仓库之后就可以开始备份了。一个仓库可以包含多个快照(snapshots),快照可以存所有的索引或者部分索引,当然也可以存储一个单独的索引。(要注意的一点就是快照只会备份open状态的索引,close状态的不会备份)
 
 
备份所有索引
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup/snapshot_all上面的代码会将所有正在运行的open状态的索引,备份到EsBacup仓库下一个叫snapshot_all的快照中。上面的api会立刻返回{"accepted":true},然后备份工作在后台运行。如果你想api同步执行,可以加wait_for_completion 标志:
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup/snapshot_all?wait_for_completion=true上面的方法会在备份完全完成后才返回,如果快照数据量大的话,会花很长时间。

备份部分索引
默认是备份所有open状态的索引,如果你想只备份某些或者某个索引,可以指定indices参数来完成:
curl -XPUT 'http://localhost:9200/_snapshot/EsBackup/snapshot_12' -d '{ "indices": "index_1,index_2" }'


 查看快照信息
查看快照信息,只需要发起GET请求就好:
GET _snapshot/my_backup/snapshot_2这将返回关于快照snapshot_2的详细信息:
{
"snapshots": [
{
"snapshot": "snapshot_2",
"indices": [
".marvel_2014_28_10",
"index1",
"index2"
],
"state": "SUCCESS",
"start_time": "2014-09-02T13:01:43.115Z",
"start_time_in_millis": 1409662903115,
"end_time": "2014-09-02T13:01:43.439Z",
"end_time_in_millis": 1409662903439,
"duration_in_millis": 324,
"failures": [],
"shards": {
"total": 10,
"failed": 0,
"successful": 10
}
}
]
}查看所有快照信息如下:
GET http://127.0.0.1:9200/_snapshot/my_backup/_all另外还有个一api可以看到更加详细的信息:
GET http://127.0.0.1:9200/_snapshot/my_backup/snapshot_2/_status
更多详细内容可以到官网查看-官方文档地址。
 
删除快照
DELETE _snapshot/my_backup/snapshot_2重要的是使用API来删除快照,而不是其他一些机制(如手工删除,或使用自动s3清理工具)。因为快照增量,它是可能的,许多快照依靠old seaments。删除API了解最近仍在使用的数据快照,并将只删除未使用的部分。如果你手动文件删除,但是,你有可能严重破坏你的备份,因为你删除数据仍在使用,如果备份正在后台进行,也可以直接删除来取消此次备份。
 
监控快照进展
wait_for_completion标志提供了一个基本形式的监控,但没有足够的快照恢复甚至中等大小的集群。
 
另外两个api会给你更细节的状态的快照。首先你可以执行一个快照ID,就像我们早些时候得到一个特定的快照信息:
GET _snapshot/my_backup/snapshot_3如果当你调用这个快照还在进步,你会看到信息的时候开始,已经运行多长时间,等等。但是请注意,这个API使用相同的threadpool快照机制。如果你是快照非常大的碎片,之间的时间状态更新可以相当大,因为API是争夺相同的threadpool资源。
 
这时候有个更好的选择_status的api接口:
GET _snapshot/my_backup/snapshot_3/_status_status API立即返回并给出一个更详细的输出的统计:
{
"snapshots": [
{
"snapshot": "snapshot_3",
"repository": "my_backup",
"state": "IN_PROGRESS",
"shards_stats": {
"initializing": 0,
"started": 1,
"finalizing": 0,
"done": 4,
"failed": 0,
"total": 5
},
"stats": {
"number_of_files": 5,
"processed_files": 5,
"total_size_in_bytes": 1792,
"processed_size_in_bytes": 1792,
"start_time_in_millis": 1409663054859,
"time_in_millis": 64
},
"indices": {
"index_3": {
"shards_stats": {
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 5,
"failed": 0,
"total": 5
},
"stats": {
"number_of_files": 5,
"processed_files": 5,
"total_size_in_bytes": 1792,
"processed_size_in_bytes": 1792,
"start_time_in_millis": 1409663054859,
"time_in_millis": 64
},
"shards": {
"0": {
"stage": "DONE",
"stats": {
"number_of_files": 1,
"processed_files": 1,
"total_size_in_bytes": 514,
"processed_size_in_bytes": 514,
"start_time_in_millis": 1409663054862,
"time_in_millis": 22
}
},
...快照当前运行将显示IN_PROGRESS作为其状态,这个特定的快照有一个碎片仍然转移(其他四个已经完成)。
 
响应包括总体状况的快照,但还深入每和每个实例统计数据。这给你一个令人难以置信的详细视图快照是如何进展的。碎片可以以不同的方式完成:
INITIALIZING: 集群的碎片是检查状态是否可以快照。这通常是非常快。
STARTED:数据被转移到存储库。
FINALIZING:数据传输完成;碎片现在发送快照的元数据。
DONE:快照完成。
FAILED:在快照过程中错误的出处,这碎片/索引/快照无法完成。检查你的日志以获取更多信息。
 

恢复

备份好后,恢复就更容易了,恢复snapshot_1里的全部索引:
POST http://127.0.0.1:9200/_snapshot/my_backup/snapshot_1/_restore这个api还有额外的参数:
POST http://127.0.0.1:9200/_snapshot/my_backup/snapshot_1/_restore
{
"indices": "index_1",
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_index_$1"
}参数indices 设置只恢复index_1索引,参数rename_pattern 和rename_replacement用来正则匹配要恢复的索引,并且重命名。和备份一样,api会立刻返回值,然后在后台执行恢复,使用wait_for_completion 标记强制同步执行。
 
另外可以使用下面两个api查看状态:
GET http://127.0.0.1:9200/_recovery/restored_index_3
GET http://127.0.0.1:9200/_recovery/如果要取消恢复过程(不管是已经恢复完,还是正在恢复),直接删除索引即可:
DELETE http://127.0.0.1:9200/restored_index_3更多内容参考-官方文档。
 
参考官方文档地址:

1、https://www.elastic.co/guide/en/elasticsearch/guide/current/backing-up-your-cluster.html#_listing_information_about_snapshots
2、https://www.elastic.co/guide/en/elasticsearch/guide/current/_restoring_from_a_snapshot.html 查看全部
elasticsearch.png


备份


备份数据之前,要创建一个仓库来保存数据,仓库的类型支持共享文件系统、Amazon S3、 HDFS和Azure Cloud。 
 
Elasticsearch的一大特点就是使用简单,api也比较强大,备份也不例外。简单来说,备份分两步:
  1. 创建一个仓库
  2. 备份指定索引

 
一、创建存储仓库
 
共享文件系统实例如下:
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup
{
"type": "fs",
"settings": {
"location": "/mount/EsDataBackupDir"
}
}
上面代码解释:
  1. 创建了一个名为EsBackup的存仓库
  2. 指定的备份方式为共享文件系统(type: fs)
  3. 指定共享存储的具体路径(location参数)

注意:共享存储路径,必须是所有的ES节点都可以访问的,最简单的就是nfs系统,然后每个节点都需要挂载到本地。
 
如上所示,创建存储仓库的时候,除了可以指定location参数以外,我们还可以指点max_snapshot_bytes_per_secmax_restore_bytes_per_sec参数来限制备份和恢复时的速度,默认值都是20mb/s,假设我们有一个非常快的网络环境,我们可以增大默认值:
curl -XPOST http://127.0.0.1:9200/_snapshot/EsBackup
{
"type": "fs",
"settings": {
"location": "/mount/EsDataBackupDir"
"max_snapshot_bytes_per_sec" : "50mb",
"max_restore_bytes_per_sec" : "50mb"
}
}
注意:这是在第一段代码的基础上来增加配置,第一段代码利用的是PUT请求来创建存储库,这段代码则是利用POST请求来更新已经存在的存储库的settings配置。 
Amazon S3存储库实例如下:
curl -XPUT 'http://localhost:9200/_snapshot/s3-backup' -d '{
"type": "s3",
"settings": {
"bucket": "esbackup",
"region": "cn-north-1",
"access_key": "xxooxxooxxoo",
"secret_key": "xxxxxxxxxooooooooooooyyyyyyyyy"
}
}'
参数名词解释:
  1. Type: 仓库类型
  2. Setting: 仓库的额外信息
  3. Region: AWS Region
  4. Access_key: 访问秘钥
  5. Secret_key: 私有访问秘钥
  6. Bucket: 存储桶名称

不同的ES版本支持的region参考:https://github.com/elastic/elasticsearch-cloud-aws#aws-cloud-plugin-for-elasticsearch
 
使用上面的命令,创建一个仓库(s3-backup),并且还创建了存储桶(esbackup),返回{"acknowledged":true} 信息证明创建成功。
 
确认存储桶是否创建成功:curl -XPOST http://localhost:9200/_snapshot/s3-backup/_verify
查看刚创建的存储桶:curl -XGET localhost:9200/_snapshot/s3-backup?pretty
查看所有的存储桶:curl -XGET localhost:9200/_snapshot/_all?pretty
删除一个快照存储桶:curl -XDELETE localhost:9200/_snapshot/s3-backup?pretty
 
二、备份索引
创建好存储仓库之后就可以开始备份了。一个仓库可以包含多个快照(snapshots),快照可以存所有的索引或者部分索引,当然也可以存储一个单独的索引。(要注意的一点就是快照只会备份open状态的索引,close状态的不会备份)
 
 
备份所有索引
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup/snapshot_all
上面的代码会将所有正在运行的open状态的索引,备份到EsBacup仓库下一个叫snapshot_all的快照中。上面的api会立刻返回{"accepted":true},然后备份工作在后台运行。如果你想api同步执行,可以加wait_for_completion 标志:
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup/snapshot_all?wait_for_completion=true
上面的方法会在备份完全完成后才返回,如果快照数据量大的话,会花很长时间。

备份部分索引
默认是备份所有open状态的索引,如果你想只备份某些或者某个索引,可以指定indices参数来完成:
curl -XPUT 'http://localhost:9200/_snapshot/EsBackup/snapshot_12' -d '{ "indices": "index_1,index_2" }'


 查看快照信息
查看快照信息,只需要发起GET请求就好:
GET _snapshot/my_backup/snapshot_2
这将返回关于快照snapshot_2的详细信息:
{
"snapshots": [
{
"snapshot": "snapshot_2",
"indices": [
".marvel_2014_28_10",
"index1",
"index2"
],
"state": "SUCCESS",
"start_time": "2014-09-02T13:01:43.115Z",
"start_time_in_millis": 1409662903115,
"end_time": "2014-09-02T13:01:43.439Z",
"end_time_in_millis": 1409662903439,
"duration_in_millis": 324,
"failures": [],
"shards": {
"total": 10,
"failed": 0,
"successful": 10
}
}
]
}
查看所有快照信息如下:
GET http://127.0.0.1:9200/_snapshot/my_backup/_all
另外还有个一api可以看到更加详细的信息:
GET http://127.0.0.1:9200/_snapshot/my_backup/snapshot_2/_status
更多详细内容可以到官网查看-官方文档地址
 
删除快照
DELETE _snapshot/my_backup/snapshot_2
重要的是使用API来删除快照,而不是其他一些机制(如手工删除,或使用自动s3清理工具)。因为快照增量,它是可能的,许多快照依靠old seaments。删除API了解最近仍在使用的数据快照,并将只删除未使用的部分。如果你手动文件删除,但是,你有可能严重破坏你的备份,因为你删除数据仍在使用,如果备份正在后台进行,也可以直接删除来取消此次备份。
 
监控快照进展
wait_for_completion标志提供了一个基本形式的监控,但没有足够的快照恢复甚至中等大小的集群。
 
另外两个api会给你更细节的状态的快照。首先你可以执行一个快照ID,就像我们早些时候得到一个特定的快照信息:
GET _snapshot/my_backup/snapshot_3
如果当你调用这个快照还在进步,你会看到信息的时候开始,已经运行多长时间,等等。但是请注意,这个API使用相同的threadpool快照机制。如果你是快照非常大的碎片,之间的时间状态更新可以相当大,因为API是争夺相同的threadpool资源。
 
这时候有个更好的选择_status的api接口:
GET _snapshot/my_backup/snapshot_3/_status
_status API立即返回并给出一个更详细的输出的统计:
{
"snapshots": [
{
"snapshot": "snapshot_3",
"repository": "my_backup",
"state": "IN_PROGRESS",
"shards_stats": {
"initializing": 0,
"started": 1,
"finalizing": 0,
"done": 4,
"failed": 0,
"total": 5
},
"stats": {
"number_of_files": 5,
"processed_files": 5,
"total_size_in_bytes": 1792,
"processed_size_in_bytes": 1792,
"start_time_in_millis": 1409663054859,
"time_in_millis": 64
},
"indices": {
"index_3": {
"shards_stats": {
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 5,
"failed": 0,
"total": 5
},
"stats": {
"number_of_files": 5,
"processed_files": 5,
"total_size_in_bytes": 1792,
"processed_size_in_bytes": 1792,
"start_time_in_millis": 1409663054859,
"time_in_millis": 64
},
"shards": {
"0": {
"stage": "DONE",
"stats": {
"number_of_files": 1,
"processed_files": 1,
"total_size_in_bytes": 514,
"processed_size_in_bytes": 514,
"start_time_in_millis": 1409663054862,
"time_in_millis": 22
}
},
...
快照当前运行将显示IN_PROGRESS作为其状态,这个特定的快照有一个碎片仍然转移(其他四个已经完成)。
 
响应包括总体状况的快照,但还深入每和每个实例统计数据。这给你一个令人难以置信的详细视图快照是如何进展的。碎片可以以不同的方式完成:
INITIALIZING: 集群的碎片是检查状态是否可以快照。这通常是非常快。
STARTED:数据被转移到存储库。
FINALIZING:数据传输完成;碎片现在发送快照的元数据。
DONE:快照完成。
FAILED:在快照过程中错误的出处,这碎片/索引/快照无法完成。检查你的日志以获取更多信息。
 


恢复


备份好后,恢复就更容易了,恢复snapshot_1里的全部索引:
POST http://127.0.0.1:9200/_snapshot/my_backup/snapshot_1/_restore
这个api还有额外的参数:
POST http://127.0.0.1:9200/_snapshot/my_backup/snapshot_1/_restore
{
"indices": "index_1",
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_index_$1"
}
参数indices 设置只恢复index_1索引,参数rename_pattern 和rename_replacement用来正则匹配要恢复的索引,并且重命名。和备份一样,api会立刻返回值,然后在后台执行恢复,使用wait_for_completion 标记强制同步执行。
 
另外可以使用下面两个api查看状态:
GET http://127.0.0.1:9200/_recovery/restored_index_3
GET http://127.0.0.1:9200/_recovery/
如果要取消恢复过程(不管是已经恢复完,还是正在恢复),直接删除索引即可:
DELETE http://127.0.0.1:9200/restored_index_3
更多内容参考-官方文档
 
参考官方文档地址:


1、https://www.elastic.co/guide/en/elasticsearch/guide/current/backing-up-your-cluster.html#_listing_information_about_snapshots
2、https://www.elastic.co/guide/en/elasticsearch/guide/current/_restoring_from_a_snapshot.html