Solr DIH: 基于MySQL表数据建立索引

选择使用Solr,对数据库中数据进行索引,可以单独写程序将数据库中的数据导出并建立索引,这个过程可能对于数据处理的控制更灵活一些,但是却可能带来很大的工作量。选择使用Solr的DIH组件,可以很方便的对数据库表中数据进行索引,下面基于MySQL数据库实现建立索引。

首先,需要设计你的schema,最主要的工作是,将数据库表中字段映射为Lucene索引(Solr直接使用Lucene的索引格式和数据)的Field,从而将数据表中的一条记录映射为Lucene中的Document,然后进行索引。另外,在schema.xml配置文件中,还需要指定各个字段在索引数据中的属性信息(如是否索引、是否存储、是否分词、排序规则等),以及Field所使用的分析器、过滤器等。在schema.xml文件进行配置,下面是配置实例:

<?xml version="1.0" ?>
<schema name="example core zero" version="1.1">
	<types>
		<fieldtype name="int" class="solr.IntField" omitNorms="true" />
		<fieldtype name="string" class="solr.TextField" sortMissingLast="true" omitNorms="true">
			<analyzer type="index">
				<charFilter class="solr.MappingCharFilterFactory" mapping="mapping-ISOLatin1Accent.txt" />
				<tokenizer class="solr.KeywordTokenizerFactory" />
				<filter class="solr.LowerCaseFilterFactory" />
			</analyzer>
			<analyzer type="query">
				<tokenizer class="solr.KeywordTokenizerFactory" />
				<filter class="solr.LowerCaseFilterFactory" />
			</analyzer>
		</fieldtype>
		<fieldtype name="long" class="solr.LongField" omitNorms="true" />
		<fieldtype name="date" class="solr.TrieDateField" sortMissingLast="true" omitNorms="true" />
		<fieldtype name="text" class="solr.TextField" sortMissingLast="true" omitNorms="true">
			<analyzer type="index">
				<tokenizer class="solr.StandardTokenizerFactory" />
			</analyzer>
			<analyzer type="query">
				<tokenizer class="solr.StandardTokenizerFactory" />
			</analyzer>
		</fieldtype>
	</types>

	<fields>
		<field name="id" type="int" indexed="true" stored="true" multiValued="false" required="true" />
		<field name="domain" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="alex_rank" type="int" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="server_port" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_validity_notBefore" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_validity_notAfter_yyyyMMdd" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_issuer_brand" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_validation" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_isMultiDomain" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_issuer_brand_isXRelated" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_subject_C" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_isWildcard" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="cert_notAfter" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="special_ssl" type="int" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="competitor_logo" type="string" indexed="true" stored="true" multiValued="false" required="false" />
		<field name="segment" type="text" indexed="true" stored="false" multiValued="true" required="false" />
	</fields>

	<!-- field to use to determine and enforce document uniqueness. -->
	<uniqueKey>id</uniqueKey>

	<!-- field for the QueryParser to use when an explicit fieldname is absent -->
	<defaultSearchField>domain</defaultSearchField>

	<!-- SolrQueryParser configuration: defaultOperator="AND|OR" -->
	<solrQueryParser defaultOperator="OR" />
</schema>

定义好上面的内容,就应该考虑从数据库表中如何查询出记录,然后通过处理进行索引。通常,对于已经存在的基于数据库的系统或应用,很可能需要对某些字段的值进行一些处理,然后再进行索引,使用Solr定义的一些组件,在一定程度上可以满足需要。比如,数据表中时间字段包含到毫秒,实际我们只需要到日期,所以进行索引之前要把时间字符串做截断处理,等等。
通过使用Solr的DIH(Data Import Handler)组件,可以很容易地进行配置,就能实现将数据库的数据进行索引,而且还提供了一些方便的操作,如全量索引、增量索引等功能。
下面,在solrconfig.xml中配置DIH对应的requestHandler,实际身上是暴露一个REST接口来实现数据库数据导出并索引,配置如下:

	<requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler">
		<lst name="defaults">
			<str name="config">data-config.xml</str>
		</lst>
	</requestHandler>

上面的请求接口为“/dataimport”,可以通过如下类似的URL执行数据导入处理:


http://172.0.8.212:8080/seaarch-server/core0/dataimport?command=full-import

上面DIH对应的requestHandler配置中,配置文件data-config.xml定义了数据库的基本配置,以及导出数据的映射规则,即导出数据库表中对应哪些字段的值,以及对特定字段的值做如何处理,下面是一个示例:

<dataConfig>
	<dataSource name="jdbc" driver="com.mysql.jdbc.Driver" url="jdbc:mysql://172.0.8.249:5606/marketing_db_saved?zeroDateTimeBehavior=convertToNull" user="developer" password="sedept@shiyanjun.cn" />
	<document name="mkt_data">
		<entity name="marketing_data" pk="id" query="select * from marketing_data where id between ${dataimporter.request.offset} and ${dataimporter.request.offset}+1000000" deltaQuery="select * from marketing_data where updated_at &gt; '${dih.last_index_time}'" transformer="RegexTransformer">
			<field column="id" name="id" />
			<field column="domain" name="domain" />
			<field column="alex_rank" name="alex_rank" />
			<field column="server_port" name="server_port" />
			<field column="cert_validity_notBefore" name="cert_validity_notBefore" />
			<field column="cert_validity_notAfter" />
			<field column="cert_validity_notAfter_yyyyMMdd" regex="(.*?)\s+.*" name="cert_validity_notAfter_yyyyMMdd" sourceColName="cert_validity_notAfter" />
			<field column="cert_issuer_brand" name="cert_issuer_brand" />
			<field column="cert_validation" name="cert_validation" />
			<field column="cert_isMultiDomain" name="cert_isMultiDomain" />
			<field column="cert_issuer_brand_isXRelated" name="cert_issuer_brand_isXRelated" />
			<field column="cert_isWildcard" name="cert_isWildcard" />
			<field column="cert_notAfter" name="cert_notAfter" />
			<field column="special_ssl" name="special_ssl" />
			<field column="competitor_logo" name="competitor_logo" />
			<field column="segment" name="segment" />
		</entity>
	</document>
</dataConfig>

我们说明一下上面配置中的一些关键点:

全量索引

下面的SQL语句是与全量索引相关的:

SELECT * from marketing_data WHERE id between ${dataimporter.request.offset} AND ${dataimporter.request.offset}+1000000

从表marketing_data中查询,表主键为id,查询条件为id between ${dataimporter.request.offset} and ${dataimporter.request.offset}+1000000,也就是id属于一个区间,而不是直接SELECT全表。Solr的DIH暴露了请求中传递的变量 ${dataimporter.request.offset},也就是在请求的requestHandler中可以附带附加属性条件,例如,下面请求URL中的offset=5000000参数:


http://172.0.8.212:8080/seaarch-server/core0/dataimport?command=full-import&offset=5000000

另外,还有一个参数是很重要的,它决定着是否清除已经存在的索引数据,默认为clean=true,如果不想删除以前的索引数据,一定要在请求的URL中指定该属性为false,请求URL如下:


http://172.0.8.212:8080/seaarch-server/core0/dataimport?command=full-import&offset=5000000&clean=false

另外,索引完成后一半需要执行commit操作,将内存中索引数据持久化到文件系统,防止改变丢失,所以需要在请求的URL中增加commit=true,例如:


http://172.0.8.212:8080/seaarch-server/core0/dataimport?command=full-import&offset=5000000&clean=false&commit=true

对于数据表中数据量很大的应用场景,通过这种方式,可以实现每次请求处理一批数据,避免对整个表造成过大的压力,影响正常线上业务操作数据库。

增量索引

上面requestHandler的配置中,属性配置内容:

deltaQuery="SELECT * from marketing_data WHERE updated_at &gt; '${dih.last_index_time}'

表示请求中指定的命令为增量索引方式,只需要通过指定请求的命令为delta-import即可,对应的请求URL为:


http://172.0.8.212:8080/search-server/core0/dataimport?command=delta-import

字段updated_at是数据表中时间戳的字段,where条件的含义是,如果上次索引时间点之后,表中记录的时间戳发生变化(即发生在上次索引之后),则对这些最近更新的记录进行索引(因为数据库表字段到LuceneField的映射中,使用的表主键id,如果是新增记录,则添加索引,就增加Document,如果只是更新表中记录,则对索引中已经存在的id相同的Document的数据进行更新)。

不过,上面两种方式还是需要手动干预处理批次,或者写个附加脚本进行成批索引。
还有一种更好的方式,可以直接在配置文件中进行配置,那就是属性文件dataimport.properties,它能够记录很多有用的状态,以及配置很多有用的选项。然而,这个功能在Solr 3.x版本中不能使用,如果使用的是Solr 4.x,可以使用。下面看一个示例配置:

#Tue Jul 21 12:10:50 CEST 2010
metadataObject.last_index_time=2010-09-20 11\:12\:47
last_index_time=2010-09-20 11\:12\:47


#################################################
#                                               #
#       dataimport scheduler properties         #
#                                               #
#################################################

#  to sync or not to sync
#  1 - active; anything else - inactive
syncEnabled=1

#  which cores to schedule
#  in a multi-core environment you can decide which cores you want syncronized
#  leave empty or comment it out if using single-core deployment
syncCores=coreHr,coreEn

#  solr server name or IP address
#  [defaults to localhost if empty]
server=localhost

#  solr server port
#  [defaults to 80 if empty]
port=8080

#  application name/context
#  [defaults to current ServletContextListener's context (app) name]
webapp=solrTest_WEB

#  URL params [mandatory]
#  remainder of URL
params=/search?qt=/dataimport&command=delta-import&clean=false&commit=true

#  schedule interval
#  number of minutes between two runs
#  [defaults to 30 if empty]
interval=10

上面配置中,最后一个选项interval=10可以指定程序自动根据配置的时间间隔进行索引,而且上面的配置适合增量索引(可以从params=/search?qt=/dataimport&command=delta-import&clean=false&commit=true中的command=delta-import看出),因为它只需要根据对比上次索引时间和数据表中的更新时间戳字段来判断哪些数据需要进行索引或者更新索引数据。如果是实时性较强的应用,这个间隔自然可以设置短一些,保证基于搜索的应用的查询能够更接近实时,不过要根据自己应用的实际的需要去选择。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>