ElasticSearch-2.0.0集群安装配置与API使用实践

ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。
ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下:

  • 全文检索
  • 提供插件机制,可以共享重用插件的功能
  • 分布式文件存储
  • 分布式实时索引和搜索
  • 实时统计分析
  • 可以横向扩展,支持大规模数据的搜索
  • 简单易用的RESTful API
  • 基于Replication实现了数据的高可用特性
  • 与其他系统的集成
  • 支持结构化和非结构化数据
  • 灵活的Schema设计(Mappings)
  • 支持多编程语言客户端

我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布式设计方面,ElasticSearch更轻量一些,用起来更简单,而使用Solr的分布式分片功能需要使用SolrCloud,它基于ZooKeeper来实现配置管理,以及Replication功能,而且Solr需要使用Web容器来部署,相对来说有点复杂一些(我个人之前使用的SolrCloud版本大概是3.1~3.5左右,比较早,现在可能更加完善了)。

基本概念

我们熟悉一下ElasticSearch中涉及到的一些基本概念:

  • 索引(Index)

索引(Index)是文档的集合,它是根据实际业务逻辑进行划分的,通常会把相对独立且具有相似结构或者性质的数据作为文档,放在一起,形成一个索引,比如,用户相关信息可以作为一个索引,交易相关信息也可应作为另一个索引。

  • 类型(Type)

类型(Type)是索引内部的一个逻辑划分,在一个索引内部可以定义多个类型(Type),类型将一个索引在逻辑上划分为多个集合,每个类型包含多个属性(字段)。比如,我们基于手机客户端应用App,创建一个了用户相关信息的索引,然后再在这个索引内部定义多个类型:基本信息类型、设备信息类型、行为信息类型,基本信息类型中包含用户编号、证件号码、名称、手机号码、年龄、出生日期,设备信息类型包括设备类型、设备名称、App版本号、渠道来源、系统版本、IMEI、mac地址,用户行为信息包含用户编号、事件编号、事件类型、时间、浏览页面代码、地区编码,这样有3个类型在一个索引当中。ElasticSearch中类型,与HBase中列簇(Column Family)的概念很相似。

  • 文档(Document)

文档(Document)是索引的基本单元,它与关系数据库中的一条记录相类似,包含了一组属性信息,同时包含一个唯一标识这一组属性值的ID,通过该ID可以更新一个文档,也可以删除一个文档。

  • 分片(Shards)&副本(Replicas)

一个索引是很多文档的集合,将一个索引进行分割,分成多个片段(一个索引的子集),每一个片段称为一个分片(Shard),这样划分可以很好地管理索引,跨节点存储,为分布式存储于搜索提供了便利。副本(Replica)是为了保证一个分片(Shard)的可用性,冗余复制存储,当一个分片对应的数据无法读取时,可以读取其副本,正常提供搜索服务。

集群安装配置

ElasticSearch集群安装配置非常容易,安装可以执行如下命令行:

wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.0.0/elasticsearch-2.0.0.zip
unzip elasticsearch-2.0.0.zip

拿出集群的一个节点的进行配置,修改配置文件config/elasticsearch.yml的内容,如下所示:

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please see the documentation for further information on configuration options:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html>
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: dw_search_engine
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: esnode-01
#
# Add custom attributes to the node:
#
# node.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /data/dw_search_storage
#
# Path to log files:
#
path.logs: /tmp/es/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
# bootstrap.mlockall: true
#
# Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory
# available on the system and that the owner of the process is allowed to use this limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind adress to a specific IP (IPv4 or IPv6):
#
network.host: 10.10.2.62
#
# Set a custom port for HTTP:
#
http.port: 9200
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
# gateway.recover_after_nodes: 3
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html>
#
# --------------------------------- Discovery ----------------------------------
#
# Elasticsearch nodes will find each other via unicast, by default.
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.zen.ping.unicast.hosts: ["es-01", "es-02"]
#
# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
#
# discovery.zen.minimum_master_nodes: 3
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
#
# ---------------------------------- Various -----------------------------------
#
# Disable starting multiple nodes on a single system:
#
# node.max_local_storage_nodes: 1
#
# Require explicit names when deleting indices:
#
# action.destructive_requires_name: true

其它节点的配置,在保证基本存储目录相同的前提下,可以根据需要修改如下几个参数:

node.name
network.host
http.port

最后,在每个节点上分别启动ElasticSearch,执行如下命令:

cd elasticsearch-2.0.0
bin/elasticsearch -d

然后可以查看Web管理界面,需要安装插件elasticsearch-head,后面会介绍,Web管理界面,如下所示:
es-cluster-plugin-head
上图中,我们已经创建了一个索引,可以看到节点的状态,及其分片(Shard)的情况。

RESTful API基本操作

尤其是在进行搜索的时候,为了使得其他系统能够与ElasticSearch搜索系统很好地解耦合,使用ElasticSearch提供的RESTful API是一种不错的选择。下面,我们介绍RESTful API的基本操作。

  • 插件管理

插件的存放目录为elasticsearch-2.0.0/plugins/,插件都是基于该存储目录进行操作的。
安装插件:

bin/plugin install analysis-icu
bin/plugin install mobz/elasticsearch-head

可以从不同的位置安装插件,上面第一个称为Core Elasticsearch plugin,它是Elasticsearch提供的,会从Elasticsearch上下载并安装;上面第一个是从Github上自动下载安装。还有其他的方式安装,如从特定的文件系统等进行安装。
列出插件:

bin/plugin list

删除插件:

bin/plugin remove analysis-icu

安装完一个插件,我们可以查看,例如查看elasticsearch_head插件,查看如下链接:


http://10.10.2.62:9200/_plugin/head/

  • 创建索引
curl -XPUT 'http://10.10.2.62:9200/basis_device_info/'

创建的索引名称为basis_device_info,我们也可以不指定一个索引对应的Mappings,而是在索引的时候自动生成Mappings,所以如果没有指定一个索引的Mappings,则这个索引可以支持任何的Mappings。同样可知,一个索引可以自动地增加不同的type,非常灵活。
也可以指定索引的基本配置,如分片(Shard)数目、副本(Replica)数目,如下所示:

curl -XPUT 'http://10.10.2.62:9200/basis_device_info /' -d '{
    "settings" : {
        "index" : {
            "number_of_shards" : 10,
            "number_of_replicas" : 1
        }
    }
}'

默认是5个分片,不进行复制,上面配置表示索引basis_device_info有10个分片,每个分片1个副本。
下面在创建索引的时候,指定设计的schema,即配置mappings,如下所示:

curl -XPUT 'http://10.10.2.62:9200/basis_device_info/' -d '
{
  "mappings": {
    "user": {
      "_all":       { "enabled": false  },
      "properties": {
        "installid":    { "type": "string"  },
        "appid":    { "type": "string"  },
        "channel":  { "type":   "string", "index":  "analyzed" },
        "version":    { "type": "string"  },
        "osversion":    { "type": "string"  },
        "device_name":    { "type": "string", "index":  "analyzed"   },
        "producer":    { "type": "string"  },
        "device_type":    { "type": "string"  },
        "resolution":    { "type": "string", "index":  "analyzed"  },
        "screen_size":    { "type": "string", "index":  "analyzed"  },
        "mac":    { "type": "string", "index":  "not_analyzed"  },
        "idfa":    { "type": "string"  },
        "idfv":    { "type": "string", "index":  "not_analyzed"  },
        "imei":    { "type": "string", "index":  "not_analyzed"  },
        "create_time":  {
          "type":   "date",
          "format": "yyyy-MM-dd HH:mm:ss",
       "index":  "not_analyzed"
        }
      }
    }
  }
}'

上面创建了索引basis_device_info,同时type为user,有了mappings,我们就知道需要索引的数据的格式了。

  • 删除索引
curl -XDELETE 'http://10.10.2.62:9200/basis_device_info/'

删除索引basis_device_info。

  • 索引文档
curl -PUT 'http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA' -d '{
        "installid":    "0000000L",
        "appid":    "0",
        "udid":     "CC49E748588490D41BFB89584007B0FA",
        "channel":  "wulei1",
        "version":    "3.1.2",
        "osversion":    "8.1",
        "device_name":    "iPhone Retina4 Simulator",
        "producer":    "apple",
        "device_type":    "1",
        "resolution":    "640*1136",
        "screen_size":    "320*568",
        "mac":    "600308A20C5E",
        "idfa":    "dbbbs-fdsfa-fafda-321saf",
        "idfv":    "4283FAE1-19EB-4FA9-B739-8148F76BC8C3",
        "imei":    "af-sfd0fdsa-fad-ff",
        "create_time":  "2015-01-14 20:32:05"
}'

基于我们前面创建的type为user的索引,索引一个文档,文档_id为CC49E748588490D41BFB89584007B0FA,文档内容为一个用户设备信息,使用JSON格式表示。

  • 批量索引

批量索引,可以根据自己熟悉的编程语言或者脚本来实现,ElasticSearch也提供了一些客户端库。下面我们首先根据数据文件,构造成ElasticSearch索引支持的JSON格式,导出文件,然后通过curl工具去进行批量索引,实际上使用的是ElasticSearch提供的bulk API来实现的。
首先处理原始带索引数据,代码如下所示:

package org.shirdrn.es;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;

import net.sf.json.JSONObject;

import com.google.common.base.Throwables;

public class EsIndexingClient {

     public static void closeQuietly(Closeable... closeables) {
          if(closeables != null) {
               for(Closeable closeable : closeables) {
                    try {
                         closeable.close();
                    } catch (Exception e) { }
               }
          }
     }
    
     public static void main(String[] args) {
          String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
          String out = "C:\\Users\\yanjun\\Desktop\\basis_device_info.json";
          File in = new File(f);
          BufferedReader reader = null;
          BufferedWriter writer = null;
          try {
               writer = new BufferedWriter(new FileWriter(out));
               reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
               String line = null;
               while((line = reader.readLine()) != null) {
                    String[] a = line.split("\t", -1);
                    if(a.length == 16) {
                         String udid = a[2];
                        
                         JSONObject c = new JSONObject();
                         c.put("_index", "basis_device_info");
                         c.put("_type", "user");
                         c.put("_id", udid);
                        
                         JSONObject index = new JSONObject();
                         index.put("index", c);
                        
                         JSONObject doc = new JSONObject();
                         doc.put("installid", a[0]);
                         doc.put("appid", a[1]);
                         doc.put("udid", a[2]);
                         doc.put("channel", a[3]);
                         doc.put("version", a[4]);
                         doc.put("osversion", a[5]);
                         doc.put("device_name", a[6]);
                         doc.put("producer", a[7]);
                         doc.put("device_type", a[8]);
                         doc.put("resolution", a[9]);
                         doc.put("screen_size", a[10]);
                         doc.put("mac", a[11]);
                         doc.put("idfa", a[12]);
                         doc.put("idfv", a[13]);
                         doc.put("imei", a[14]);
                         doc.put("create_time", a[15]);
                        
                         writer.write(index.toString() + "\n");
                         writer.write(doc.toString() + "\n");
                    }
               }
              
          } catch (Exception e) {
               throw Throwables.propagate(e);
          } finally {
               closeQuietly(reader, writer);
          }

     }
}

运行代码,输出的数据文件为basis_device_info.json,该文件的格式了,示例如下所示:

{"index":{"_index":"basis_device_info","_type":"user","_id":"1c207122a4b2c9632212ab86bac10f60"}}
{"installid":"00000002","appid":"0","udid":"1c207122a4b2c9632212ab86bac10f60","channel":"itings","version":"3.1.1","osversion":"4.1.2","device_name":"Lenovo P770","producer":"Lenovo","device_type":"0","resolution":"540*960","screen_size":"4.59","mac":"d4:22:3f:83:17:06","idfa":"","idfv":"","imei":"861166023335745","create_time":"2015-01-14 19:39:35"}
{"index":{"_index":"basis_device_info","_type":"user","_id":"FA6B1B98E6FF4E6994A1505A996F6102"}}
{"installid":"00000003","appid":"0","udid":"FA6B1B98E6FF4E6994A1505A996F6102","channel":"appstore","version":"3.1.1","osversion":"8.1.2","device_name":"iPhone 6Plus","producer":"apple","device_type":"1","resolution":"640*1136","screen_size":"320*568","mac":"020000000000","idfa":"84018625-A3C9-47A8-88D0-C57C12F80520","idfv":"9D1E2514-9DC8-47A8-ABD0-129FC0FB3171","imei":"","create_time":"2015-01-14 19:41:21"}
{"index":{"_index":"basis_device_info","_type":"user","_id":"8c5fe70b2408f184abcbe4f34b8f23c3"}}
{"installid":"00000004","appid":"0","udid":"8c5fe70b2408f184abcbe4f34b8f23c3","channel":"itings","version":"3.1.1.014","osversion":"4.2.2","device_name":"2014011","producer":"Xiaomi","device_type":"0","resolution":"720*1280","screen_size":"4.59","mac":"0c:1d:af:4f:48:9f","idfa":"","idfv":"","imei":"865763025472173","create_time":"2015-01-14 19:46:37"}

奇数编号行的内容为索引的指令信息,包括索引名称(_index)、类型(_type)、唯一标识(_id),偶数编号行的内容为实际待索引的文档数据。
然后,通过curl命令来进行批量索引,执行如下命令:

curl -s -XPOST http://10.10.2.62:9200/basis_device_info/_bulk --data-binary "@basis_device_info.json"
  • 搜索文档

简单的搜索,可以通过GET方式搜索,如下所示:


http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA


http://10.10.2.62:9200/basis_device_info/user/_search?q=channel:B-hicloud

上面第一个根据唯一的_id进行搜索,结果返回0个或者1个文档;第二个通过指定GET方式参数,其中_search和q是ElasticSearch内置的接口关键字,通过指定字段名称和搜索关键词的方式进行搜索,结果以JSON格式返回。

  • Request Body搜索

可以设置请求的body内容,能够支持更加复杂的查询条件然后请求搜索,如下所示:

curl -XGET 'http://10.10.2.245:9200/basis_device_info/user/_search' -d '{
    "query" : {
        "term" : { "udid": "bc0af2ca66a96725b8b0e0056d4213b6" }
    }
}'

结果示例,如下所示:

{"took":11,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":9.45967,"hits":[{"_index":"basis_device_info","_type":"user","_id":"bc0af2ca66a96725b8b0e0056d4213b6","_score":9.45967,"_source":{"installid":"00000FPq","appid":"0","udid":"bc0af2ca66a96725b8b0e0056d4213b6","channel":"B-hicloud","version":"3.1.1","osversion":"4.4.2","device_name":"H60-L02","producer":"HUAWEI","device_type":"0","resolution":"720*1184","screen_size":"4.64","mac":"ec:cb:30:c4:93:e3","idfa":"","idfv":"","imei":"864103021536104","create_time":"2015-01-18 01:29:16"}}]}}
  • 基于Lucene查询语法搜索

如果熟悉Lucene查询(Query),可以构造通过构造复杂的Term关系字符串来进行搜索,示例如下所示:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{
  "query": {
    "query_string": { "query": "(channel:baidu OR device_name:HUAWEI)" }
   }
}'

查询query字符串的含义是:从channel字段搜索baidu,从device_name字段搜索HUAWEI,然后两者取并集,这实际上一个布尔查询,返回最终结果。

  • 使用multi_match搜索

ElasticSearch支持给定搜索关键词,从多个字段中进行搜索,示例如下所示:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{
    "query": {
        "multi_match" : {
            "query":    "HTC",
            "fields": [ "channel", "device_name" ]
        }
    }
}'

这样,只要在channel和device_name两个字段中出现关键词HTC,则都返回结果,结果应该是两个字段匹配上的文档集合的并集。

  • 支持Filter搜索

可以在制定Filter进行搜索。例如下面是一个按照时间范围进行过滤,得到搜索结果的查询:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
{
  "query": {
    "filtered": {
            "query": { "match_all": {} },
            "filter" : {
                "range" : {
                    "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }
                 }
            }
      }
    }
}'
  • 分页搜索

ElasticSearch支持分页搜索,可以通过在RESTful连接中指定size和from参数,来进行分页搜索,如下所示:

curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search?size=10&from=20' -d '
{
  "query": {
    "filtered": {
            "query": { "match_all": {} },
            "filter" : {
                "range" : {
                    "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }
                 }
            }
      }
    }
}'

上面搜索的含义是:按照时间范围搜索,从第20个文档开始,返回10个文档,相当于一页取10个文档。

Java客户端

如果熟悉Java语言,而不想使用脚本等其他方式操作ElasticSearch搜索集群,则可以使用ElasticSearch提供的Java客户端API来编码实现,能够更加灵活地控制。ElasticSearch提供的Java客户端支持全部常用操作,如更新索引、索引文档、搜索文档、删除索引等等操作,而且还支持其他一些功能,如同步异步模式、explain查询等,下面我们通过代码来了解一下。
如果使用Maven管理Java代码,可以在pom.xml文件中加入如下依赖:

          <dependency>
               <groupId>org.elasticsearch</groupId>
               <artifactId>elasticsearch</artifactId>
               <version>2.0.0</version>
          </dependency>

创建一个ElasticSearch客户端,代码如下所示:

          // create & configure client
          Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "dw_search_engine")
                    .put("client.transport.sniff", true)
                    .build();
          final Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(newAddress("es-01", 9300))
                    .addTransportAddress(newAddress("es-02", 9300));

可以将你的ElasticSearch集群的节点通过上面的addTransportAddress方法,都与Client对象关联起来,这样在操作ElasticSearch集群中的索引/更新/删除/搜索文档的时候,就能够自动感知。上面newAddress方法如下:

     private static InetSocketTransportAddress newAddress(String host, int port) throws UnknownHostException {
          return new InetSocketTransportAddress(InetAddress.getByName(host), port);
     }

另外,也可以通过在配置文件elasticsearch.yml中指定相关配置,例如:

cluster.name: dw_search_engine
client.transport.sniff: true
client.transport.ping_timeout: 10s
client.transport.nodes_sampler_interval: 10s

那么,创建客户端需要从配置文件中读取配置内容,具体可以查看官方文档。

  • 准备工作

索引的时候,我们是从一个本地文件中读取数据,并构建索引文档需要的格式,然后请求ElasticSearch集群执行索引操作,下面代码是一些基本准备工作:

          final String index = "basis_device_info";
          final String type = "user";
         
          // index documents
          String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
          File in = new File(f);

从文件中,每次读取一行记录,然后构建一个JSON格式字符串,通过XContentBuilder来表示,代码如下所示:

     protected static XContentBuilder createSource(String[] a) throws IOException {
          return jsonBuilder()
                  .startObject()
                      .field("installid", a[0])
                         .field("appid", a[1])
                         .field("udid", a[2])
                         .field("channel", a[3])
                         .field("version", a[4])
                         .field("osversion", a[5])
                         .field("device_name", a[6])
                         .field("producer", a[7])
                         .field("device_type", a[8])
                         .field("resolution", a[9])
                         .field("screen_size", a[10])
                         .field("mac", a[11])
                         .field("idfa", a[12])
                         .field("idfv", a[13])
                         .field("imei", a[14])
                         .field("create_time", a[15])
                  .endObject();
     }

下面我们从API的功能入手,分别详细说明,并附加代码展示用法。

  • 创建索引

可以直接通过Java客户端库来创建索引,代码如下所示:

     protected static void createIndex(final Client client, String index) {
          Map<String, Object> indexSettings = Maps.newHashMap();
          indexSettings.put("number_of_shards", "4");
          indexSettings.put("number_of_replicas", "1");
          CreateIndexRequest createIndexRequest = new CreateIndexRequest(
                    index, Settings.settingsBuilder().put(indexSettings).build());
          CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet();
          System.out.println(createIndexResponse);
     }
  • 创建Mappings

通过Java客户端创建Mappings,相对比较复杂一点,需要拼接对应的JSON字符串,实现代码如下所示:

     protected static void createMappings(final Client client, String index) throws IOException, InterruptedException, ExecutionException {
          XContentBuilder basisInfoMapping = jsonBuilder()
                    .startObject()
                         .startObject("_all")
                              .field("enabled", "false")
                         .endObject()
                         .startObject("properties")
                              .startObject("id")
                                   .field("type", "string")
                              .endObject()
                              .startObject("name")
                                   .field("type", "string")
                                   .field("index", "analyzed")
                              .endObject()
                              .startObject("age")
                                   .field("type", "int")
                              .endObject()
                              .startObject("birthday")
                                   .field("type", "date")
                                   .field("format", "yyyy-MM-dd HH:mm:ss")
                                   .field("index", "not_analyzed")
                              .endObject()
                         .endObject()
                    .endObject();
         
          XContentBuilder deviceInfoMapping = jsonBuilder()
                    .startObject()
                         .startObject("_all")
                              .field("enabled", "false")
                         .endObject()
                         .startObject("properties")
                              .startObject("udid")
                                   .field("type", "string")
                              .endObject()
                              .startObject("device_name")
                                   .field("type", "string")
                                   .field("index", "analyzed")
                              .endObject()
                              .startObject("privoder")
                                   .field("type", "string")
                                   .field("index", "analyzed")
                              .endObject()
                              .startObject("os_version")
                                   .field("type", "string")
                              .endObject()
                         .endObject()
                    .endObject();
         
          PutMappingRequest putMappingRequest = Requests.putMappingRequest(index)
               .type("basic_info")
               .source(basisInfoMapping)
              .type("device_info")
              .source(deviceInfoMapping);
         
          System.out.println(putMappingRequest.indicesOptions());
         
          PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get();
          System.out.println(putMappingResponse);
     }

上面代码创建了一个名称为app_user_info的索引,该索引具有basic_info和device_info这2个type,可以通过elasticsearch_head插件,在Web管理页面上查看对应的索引信息。

  • 索引单个文档

从文件中读取数据,一条记录构造一个文档,然后执行索引,代码如下所示:

     protected static void indexDocs(final Client client, final String index, final String type, File in) {
          BufferedReader reader = null;
          try {
               reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
               String line = null;
               while((line = reader.readLine()) != null) {
                    String[] a = line.split("\t", -1);
                    if(a.length == 16) {
                         String udid = a[2];
                         IndexResponse response =
                                   client
                                   .prepareIndex(index, type, udid)
                                   .setSource(createSource(a))
                                   .get();
                         System.out.println(response.toString());
                    }
               }
              
          } catch (Exception e) {
               throw Throwables.propagate(e);
          } finally {
               closeQuietly(reader);
          }
     }
  • 批量索引

批量索引有多种方式,首先,通过Bulk API进行索引,我们自己控制每一个batch的大小,代码如下所示:

     protected static void indexBulk(final Client client, final String index, final String type, File in) {
          BulkRequestBuilder bulkRequest = client.prepareBulk();
          final int batchSize = 100;
          int counter = 0;
          BufferedReader reader = null;
          try {
               reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
               String line = null;
               while((line = reader.readLine()) != null) {
                    String[] a = line.split("\t", -1);
                    if(a.length == 16) {
                         String udid = a[2];
                         IndexRequestBuilder indexRequestBuilder =
                                   client
                                   .prepareIndex(index, type, udid)
                                   .setSource(createSource(a));
                         bulkRequest.add(indexRequestBuilder);
                         if(++counter >= batchSize) {
                              System.out.println(!bulkRequest.get().hasFailures());
                              counter = 0;
                              bulkRequest = client.prepareBulk();
                         }
                    }
               }
              
          } catch (Exception e) {
               throw Throwables.propagate(e);
          } finally {
               System.out.println(!bulkRequest.get().hasFailures());
               closeQuietly(reader);
          }
     }

另一种方式,是根据ElasticSearch提供的Bulk Processor来实现,只需要设置相关参数,就可以实现批量索引,这种方式更加灵活,示例如下所示:

     protected static void indexUsingBulkProcessor(final Client client, final String index, final String type, File in) throws InterruptedException {
          String name = "device_info_processor";
          int bulkActions = 1000;
          ByteSizeValue bulkSize = new ByteSizeValue(100, ByteSizeUnit.MB);
          TimeValue flushInterval = TimeValue.timeValueSeconds(60);
          int concurrentRequests = 12;
         
          // create bulk processor
          final BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {

               public void afterBulk(long id, BulkRequest req, BulkResponse resp) {
                    System.out.println("id=" + id + ", resp=" + resp);
               }

               public void afterBulk(long id, BulkRequest req, Throwable cause) {
                    System.out.println("id=" + id + ", req=" + req + ", cause=" + cause);              
               }

               public void beforeBulk(long id, BulkRequest req) {
                    System.out.println("id=" + id + ", req=" + req);              
               }
              
          })
          .setName(name)
          .setBulkActions(bulkActions)
          .setBulkSize(bulkSize)
          .setFlushInterval(flushInterval)
          .setConcurrentRequests(concurrentRequests)
          .build();
         
          // index documents
          BufferedReader reader = null;
          try {
               reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
               String line = null;
               while((line = reader.readLine()) != null) {
                    String[] a = line.split("\t", -1);
                    if(a.length == 16) {
                         String udid = a[2];
                         bulkProcessor.add(new IndexRequest(index, type, udid).source(createSource(a)));
                    }
               }
              
          } catch (Exception e) {
               throw Throwables.propagate(e);
          } finally {
               closeQuietly(reader);
              
               // close bulk processor
               bulkProcessor.awaitClose(60, TimeUnit.SECONDS);
          }
     }

可以通过实现自定义的BulkProcessor.Listener,它提供了Hook的功能,比如,索引某个文档失败的话,可以在Hook方法中增加处理,实现重试的功能;再比如,如果索引成功,给其他系统服务一个回调,等等。

  • 更新文档

更新文档中的某些字段,需要指定id的值,以及需要更新的字段的值,代码如下所示:

     protected static void updateDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {
          String id = "60e90ddcb1a61622028b8d92112a646c";
          UpdateRequest updateRequest = new UpdateRequest(index, type, id);
          updateRequest.doc(jsonBuilder()
                    .startObject()
                      .field("channel", "h-google")
                      .field("appid", "1")
                  .endObject());
          UpdateResponse response = client.update(updateRequest).get();
          System.out.println(response);
     }

如果更新文档的时候,文档不存在,则需要先执行索引操作,再进行更新操作,将这两个操作合并到一起,使用upsert操作,代码如下所示:

     protected static void upsertDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {
          String id = "fdd5ff7f56b613f0acb2c20a1ebc35e4";
          IndexRequest indexRequest = new IndexRequest(index, type, id).source(jsonBuilder()
                      .startObject()
                          .field("installid", "00000BSe")
                          .field("appid", "0")
                          .field("udid", "fdd5ff7f56b613f0acb2c20a1ebc35e4")
                          .field("channel", "A-wandoujia")
                          .field("version", "3.1.1")
                          .field("resolution", "960*540")
                          .field("mac", "00:08:22:be:1b:b7")
                          .field("device_type", "0")
                          .field("device_name", "HTC")
                          .field("producer", "alps")
                          .field("create_time", "2015-01-17 17:15:36")
                      .endObject());
         
          UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonBuilder()
                      .startObject()
                          .field("resolution", "540*960")
                          .field("channel", "h-baidu")
                          .field("version", "3.1.1")
                          .field("imei", "861622010000056")
                      .endObject())
                  .upsert(indexRequest);             
          UpdateResponse response = client.update(updateRequest).get();
          System.out.println(response);
     }
  • 删除文档

删除文档,需要指定文档的id的值,代码如下所示:

     protected static void deleteDoc(final Client client, final String index, final String type) {
          String id = "60e90ddcb1a61622028b8d92112a646c";
          DeleteResponse response = client.prepareDelete(index, type, id).get();
          System.out.println(response);
     }
  • 搜索文档

搜索文档,可以根据需要构造指定的查询(Query),可以设置过滤器等等,然后提交搜索,示例代码如下所示:

     protected static void searchDocs(final Client client, final String index, final String type) {
          SearchResponse response = client
               .prepareSearch(index)
               .setTypes(type)
               .setQuery(QueryBuilders.termQuery("device_name", "xiaomi"))
               .setPostFilter(QueryBuilders.rangeQuery("create_time").from("2015-01-16 00:00:00").to("2015-01-16 23:59:59"))
               .setFrom(30).setSize(10).setExplain(true)
               .execute()
               .actionGet();
          System.out.println(response);
     }

查询(Query)的构造有很多的方式,比如构造布尔查询,指定与、或、非关系,然后提交搜索。执行搜索,可以设置搜索文档的起始偏移位置以及每次取多少个结果文档,这便能实现分页功能。

其他话题

ElasticSearch最经典的软件栈组合就是ELK(ElasticSearch Logstash Kibana),其中ElasticSearch提供了实时查询分析数据的功能,是一个非常通用的搜索引擎系统,而Logstash是一个日志管理工具,能够收集日志,对日志进行管理,Kibana是一个基于页面的前端展示工具,非常方便地使ElasticSearch中的数据可视化,具体使用起来如何,如果感兴趣可以尝试一下。
另外,ElasticSearch也被好多开源大数据系统所拥抱,比如Cloudera的CDH也整合了ElasticSearch作为搜索系统,ElasticSearch也可以和其他系统,如Hadoop、HBase等进行整合,使用领域比较广泛。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>