假设,现在我们有这样一个需求:
要通过一个代理平台,将查询请求转发到后端服务器进行查询。后端存在多种查询服务器,查询方式也不同,比如,有基于SQL的关系数据库查询,也有基于搜索引擎Solr的查询。通过代理平台,将
服务暴露给具有任何编程语言技能的开发人员进行调用。
我们可以选择Thrift来定义语言中性的服务接口,然后通过Thrift编译器将定义生成多种编程语言的客户端代码框架,服务器端使用指定语言进行开发,如Java,最后通过连接Thrift服务器来进行查
询调用。
根据我们的需求,后端服务使用Java实现,而外部使用C#进行调用返回结果,再执行进一步的处理。
Thrift服务定义
首先,看一下,我们给出的示例服务定义,文件命名为queryproxy.thrift,内容如下所示:
namespace java org.shirdrn.queryproxy.thrift.protocol namespace csharp Query.Proxy.Thrift.Protocol namespace py queryproxy.thrift.protocol typedef i16 short typedef i32 int typedef i64 long enum QueryType { SOLR = 1, SQL = 2 } struct QueryParams { 1:QueryType type, 2:string table, 3:list<string> paramList } struct QueryResult { 1:int offset, 2:int length 3:list<string> results } exception QueryFailureException { 1:string message } service QueryProxyService { QueryResult query(1:QueryParams paramList) throws (1:QueryFailureException qe) }
上面定义的内容的含义如下所示:
- QueryType 指定查询类型,包括两种类型:查询Solr服务器,或SQL查询
- QueryParams 用来设置请求参数
- QueryResult 是返回结果对象,封装了查询结果列表,我们将查询结果以JSON列表形式返回
- QueryFailureException 如果查询失败,返回该异常
- QueryProxyService 定义了服务调用接口
编译Thrift服务定义
根据上面定义的服务,使用Thrift编译器生成不同编程语言的代码,我们生成用于服务器端的Java代码,和客户端服务调用的C#代码,执行命令如下所示:
thrift --gen java queryproxy.thrift thrift --gen csharp queryproxy.thrift
然后可以在当前目录下面查看到编译生成的代码目录:
ls gen-csharp gen-java
可以直接基于这些代码进行开发服务器端和客户端代码,详见后面说明。
Thrift服务实现
我们使用Java语言实现服务器端的Thrift服务。首先,需要从Thrift的发行包中给出的jar文件,添加到classpath中,基于该库文件进行服务开发。
通过上一步使用Thrift编译器编译生成的Java代码,可以看到一个服务接口:
org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface
我们要对两种类型的查询服务(Solr和SQL)给出实现,首先基于该服务接口来抽象出一层,在抽象类中定义了服务配置对象(读取Properties文件),其中配置文件内容大概如下所示:
query.proxy.thrift.port=9966 query.proxy.thrift.worker.thread.minCount=1 query.proxy.thrift.worker.thread.maxCount=200 query.proxy.solr.zkHost=master:2181
抽象服务类如下所示:
package org.shirdrn.queryproxy.common; import java.io.Closeable; import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface; public abstract class ConfiguredQueryService implements Iface, Closeable { protected final Configurable context; public ConfiguredQueryService(Configurable context) { super(); this.context = context; } }
然后实现上面提到的两种类型的服务,都基于该抽象类进行开发。
- Solr查询服务实现
因为后端已经存在一个Solr查询服务器集群(SolrCloud),我们实际上是通过solrj客户端调用来执行查询,所以Thrift服务端的查询也是基于这个原理。下面,看一下Solr查询服务实现类
SolrQueryService的实现内容,代码如下所示:
package org.shirdrn.queryproxy.thrift.service.solr; import java.io.IOException; import java.net.MalformedURLException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrServer; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.MapSolrParams; import org.apache.solr.common.params.SolrParams; import org.apache.thrift.TException; import org.shirdrn.queryproxy.common.Configurable; import org.shirdrn.queryproxy.common.ConfiguredQueryService; import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException; import org.shirdrn.queryproxy.thrift.protocol.QueryParams; import org.shirdrn.queryproxy.thrift.protocol.QueryResult; import org.shirdrn.queryproxy.utils.ResultUtils; public class SolrQueryService extends ConfiguredQueryService { private static final Log LOG = LogFactory.getLog(SolrQueryService.class); private CloudSolrServer solrServer; private static final String writerType = "json"; public SolrQueryService(Configurable context) { super(context); String zkHost = context.get("query.proxy.solr.zkHost"); try { solrServer = new CloudSolrServer(zkHost); } catch (MalformedURLException e) { throw new RuntimeException(e); } } @Override public QueryResult query(QueryParams params) throws QueryFailureException, TException { int offset = 0; int length = 10; Map<String,String> map = new HashMap<>(); Iterator<String> iter = params.getParamListIterator(); while(iter.hasNext()) { String kv = iter.next(); if(kv != null) { String[] items = kv.split("="); if(items.length == 2) { String key = items[0].trim(); String value = items[1].trim(); map.put(key, value); if(key.equals(CommonParams.START)) { offset = Integer.parseInt(value); } if(key.equals(CommonParams.ROWS)) { length = Integer.parseInt(value); } } } } map.put("collection", params.getTable()); map.put("wt", writerType); LOG.info("Solr params: " + map); // query using Solr QueryResponse response = null; SolrParams solrParams = new MapSolrParams(map); try { response = solrServer.query(solrParams); } catch (SolrServerException e) { LOG.error("Failed to query solr server: ", e); throw new QueryFailureException(e.toString()); } // process result QueryResult result = new QueryResult(); result.setOffset(offset); result.setLength(length); if(response != null) { result.setResults(ResultUtils.getJSONResults(response)); } return result; } @Override public void close() throws IOException { solrServer.shutdown(); } }
为简单起见,上面只是使用了一个CloudSolrServer客户端来连接Solr服务器集群(通过ZooKeeper集群)。在query方法中,首先解析查询参数数据,然后构建成Solr查询支持的参数格式,提交到
Solr查询服务器集群,然后等待返回结果QueryResponse ,接着从返回的QueryResponse对象中提取查询命中的结果文档集合,然后转换成Thrift服务定义中满足的返回结果对象形式,通过下面的累
ResultUtils类来实现转换操作,getJSONResults(response)实现如下所示:
private static final String KEY_VERSION = "_version_"; private static final DateFormat DF = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static List<String> getJSONResults(QueryResponse response) { ListIterator<SolrDocument> iter = response.getResults().listIterator(); List<String> resultDocs = new ArrayList<String>(); while(iter.hasNext()) { SolrDocument doc = iter.next(); JSONObject jDoc = new JSONObject(); Set<String> ks = doc.keySet(); if(ks.contains(KEY_VERSION)) { ks.remove(KEY_VERSION); } for(String key : ks) { Object v = doc.getFieldValue(key); if(v instanceof Date) { jDoc.put(key, DF.format((Date) v)); continue; } jDoc.put(key, v); } resultDocs.add(jDoc.toString()); } return resultDocs; }
一条结果,构建一个JSON对象,返回一个JSON对象列表。这样,Solr查询服务的Thrift服务就实现了。
- SQL查询服务
基于关系数据库的SQL查询就比较容易了,我们简单地使用JDBC来直接进行。我们基于MysQL数据库,实现SQL查询的JDBC配置文件内容,如下所示:
jdbc.jdbcUrl=jdbc:mysql://localhost:3306/wordpress?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true jdbc.driverClass=com.mysql.jdbc.Driver jdbc.user=shirdrn jdbc.password=shiyanjun
Thrift查询服务实现类为SQLQueryService,实现代码,如下所示:
package org.shirdrn.queryproxy.thrift.service.sql; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.thrift.TException; import org.shirdrn.queryproxy.common.Configurable; import org.shirdrn.queryproxy.common.ConfiguredQueryService; import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException; import org.shirdrn.queryproxy.thrift.protocol.QueryParams; import org.shirdrn.queryproxy.thrift.protocol.QueryResult; import org.shirdrn.queryproxy.utils.PropertiesConfig; import org.shirdrn.queryproxy.utils.ResultUtils; public class SQLQueryService extends ConfiguredQueryService { private static final Log LOG = LogFactory.getLog(SQLQueryService.class); private static String JDBC_PROPERTIES = "jdbc.properties"; Configurable jdbcConf; private String jdbcUrl; private String user; private String password; Connection connection; public SQLQueryService(Configurable context) { super(context); jdbcConf = new PropertiesConfig(JDBC_PROPERTIES); String driverClass = jdbcConf.get("jdbc.driverClass"); try { Class.forName(driverClass); jdbcUrl = jdbcConf.get("jdbc.jdbcUrl"); user = jdbcConf.get("jdbc.user"); password = jdbcConf.get("jdbc.password"); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } finally { LOG.info("JDBC: driver=" + driverClass + ", url=" + jdbcUrl + ", user=" + user + ", password=******"); } } @Override public QueryResult query(QueryParams params) throws QueryFailureException, TException { QueryResult result = new QueryResult(); if(!params.getParamList().isEmpty()) { // get SQL statement String sql = params.getParamList().remove(0); Connection conn = getConnection(); Statement stmt = null; ResultSet rs = null; try { stmt = conn.createStatement(); rs = stmt.executeQuery(sql); result.setResults(ResultUtils.getJSONResults(rs, params.getParamList())); } catch (SQLException e) { throw new QueryFailureException(e.toString()); } } return result; } private synchronized final Connection getConnection() { try { if(connection == null || connection.isClosed()) { if(user != null) { connection = DriverManager.getConnection(jdbcUrl, user, password); } else { connection = DriverManager.getConnection(jdbcUrl); } } } catch (SQLException e) { e.printStackTrace(); } return connection; } @Override public void close() throws IOException { if(connection != null) { try { connection.close(); } catch (SQLException e) { throw new IOException(e); } } } }
上面也使用了ResultUtils类实现了结果的转换方法,实现如下所示:
public static List<String> getJSONResults(ResultSet rs, List<String> fields) throws SQLException { List<String> results = new ArrayList<String>(); while(rs.next()) { JSONObject jo = new JSONObject(); for(String field : fields) { jo.put(field, rs.getObject(field).toString()); } results.add(jo.toString()); } return results; }
返回一组JSON对象,客户端只需要单个解析每一个对象即可。这样基于SQL的Thrift服务也实现完成了。
上面的两类服务都已经实现了,我们最终还要组合成一个服务,然后通过Thrift协议暴露给外部。组合服务的实现类同样实现了org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface
接口,实现代码如下所示:
package org.shirdrn.queryproxy; import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.thrift.TException; import org.shirdrn.queryproxy.common.Configurable; import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException; import org.shirdrn.queryproxy.thrift.protocol.QueryParams; import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface; import org.shirdrn.queryproxy.thrift.protocol.QueryResult; import org.shirdrn.queryproxy.thrift.protocol.QueryType; import org.shirdrn.queryproxy.utils.ReflectionUtils; public class ThriftQueryService implements Iface { private static final Log LOG = LogFactory.getLog(ThriftQueryService.class); private Configurable context; static Map<QueryType, Iface> SERVICES = new HashMap<QueryType, Iface>(0); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for(Map.Entry<QueryType, Iface> entry : SERVICES.entrySet()) { try { ((Closeable) entry.getValue()).close(); } catch (IOException e) { e.printStackTrace(); } finally { LOG.info("Closed: type=" + entry.getKey() + ", service=" + entry.getValue()); } } } }); } @Override public QueryResult query(QueryParams params) throws QueryFailureException, TException { int type = params.getType().getValue(); Iface service = SERVICES.get(QueryType.findByValue(type)); if(service == null) { throw new QueryFailureException("Unknown service: type=" + params.getType().name()); } return service.query(params); } public void register(QueryType queryType, Class<?> serviceClass) { Iface service = (Iface) ReflectionUtils.getInstance(serviceClass, new Object[] {context}); SERVICES.put(queryType, service); } public void setContext(Configurable context) { this.context = context; } }
上面实现,就是通过一个注册方法,将前面实现的两类服务注册管理起来。
下面,我们看一下,基于Thrift的库将组合后的服务以Thrift协议暴露给外部,实际上就是单独启动了一个Thrift服务(关联一个端口),实现的Thrift服务器代码,如下所示:
package org.shirdrn.queryproxy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TBinaryProtocol.Factory; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadPoolServer.Args; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; import org.shirdrn.queryproxy.common.Configurable; import org.shirdrn.queryproxy.thrift.protocol.QueryType; import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Iface; import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService.Processor; import org.shirdrn.queryproxy.thrift.service.solr.SolrQueryService; import org.shirdrn.queryproxy.thrift.service.sql.SQLQueryService; import org.shirdrn.queryproxy.utils.PropertiesConfig; public class QueryProxyServer { private static final Log LOG = LogFactory.getLog(QueryProxyServer.class); static final String config = "config.properties"; private final Configurable context; public QueryProxyServer() { super(); context = loadContext(); } private Configurable loadContext() { return new PropertiesConfig(config); } public void startUp() throws TTransportException { int port = context.getInt("query.proxy.thrift.port", 9966); LOG.info("Thrift service port: port=" + port); TServerSocket serverTransport = new TServerSocket(port); ThriftQueryService service = new ThriftQueryService(); service.setContext(context); service.register(QueryType.SOLR, SolrQueryService.class); service.register(QueryType.SQL, SQLQueryService.class); int minWorkerThreads = context.getInt("query.proxy.thrift.worker.thread.minCount", 1); int maxWorkerThreads = context.getInt("query.proxy.thrift.worker.thread.maxCount", 1); LOG.info("Thrift thread pool: minWorkerThreads=" + minWorkerThreads + ", maxWorkerThreads=" + maxWorkerThreads); TProcessor processor = new Processor<Iface>(service); Factory factory = new TBinaryProtocol.Factory(true, true); Args tArgs = new Args(serverTransport); tArgs .minWorkerThreads(minWorkerThreads) .maxWorkerThreads(maxWorkerThreads) .processor(processor) .protocolFactory(factory); TServer server = new TThreadPoolServer(tArgs); server.serve(); } public static void main(String[] args) throws Exception { QueryProxyServer server = new QueryProxyServer(); server.startUp(); } }
启动该服务以后,外部客户端连接我们配置的9966端口,就可以进行查询调用。
Thrift服务调用
Thrift服务已经发布,我们可以基于Thrift协议进行调用,这里基于Java和C#语言来实现客户端查询调用。
- Java查询客户端实现
我们以测试用例的形式展示客户端如何调用。
查询关系数据的SQL调用,客户端实现代码如下所示:
package org.shirdrn.queryproxy.thrift.service.sql; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.junit.Test; import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException; import org.shirdrn.queryproxy.thrift.protocol.QueryParams; import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService; import org.shirdrn.queryproxy.thrift.protocol.QueryResult; import org.shirdrn.queryproxy.thrift.protocol.QueryType; public class TestSQLQueryService { private static final Log LOG = LogFactory.getLog(TestSQLQueryService.class); @Test public void query() throws QueryFailureException, TException { String host = "server.query-proxy.local"; TTransport transport = new TSocket(host, 9966); TProtocol protocol = new TBinaryProtocol(transport, true, true); transport.open(); QueryProxyService.Client client = new QueryProxyService.Client(protocol); QueryParams params = new QueryParams(); params.setTable("wp_posts"); params.setType(QueryType.SQL); params.addToParamList("select id, post_author from wordpress.wp_posts"); params.addToParamList("id"); params.addToParamList("post_author"); QueryResult result = client.query(params); LOG.info("result=" + result.getResults()); transport.close(); } }
运行结果输出,大概如下格式:
result=[{"id":"1","post_author":"2"}, {"id":"4","post_author":"2"}, {"id":"57","post_author":"2"}, {"id":"66","post_author":"2"}, {"id":"72","post_author":"2"}, {"id":"75","post_author":"2"}, {"id":"78","post_author":"2"}, {"id":"94","post_author":"2"}, {"id":"100","post_author":"2"}, {"id":"107","post_author":"2"}]
查询Solr服务器的调用,客户端实现代码如下所示:
package org.shirdrn.queryproxy.thrift.service.solr; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.junit.Test; import org.shirdrn.queryproxy.thrift.protocol.QueryFailureException; import org.shirdrn.queryproxy.thrift.protocol.QueryParams; import org.shirdrn.queryproxy.thrift.protocol.QueryProxyService; import org.shirdrn.queryproxy.thrift.protocol.QueryResult; import org.shirdrn.queryproxy.thrift.protocol.QueryType; public class TestSolrQueryProxy { private static final Log LOG = LogFactory.getLog(TestSolrQueryProxy.class); @Test public void query() throws QueryFailureException, TException { String host = "server.query-proxy.local"; TTransport transport = new TSocket(host, 9966); TProtocol protocol = new TBinaryProtocol(transport, true, true); transport.open(); QueryProxyService.Client client = new QueryProxyService.Client(protocol); QueryParams params = new QueryParams(); params.setTable("collection1"); params.setType(QueryType.SOLR); params.addToParamList("q=上海"); params.addToParamList("fl=*"); params.addToParamList("fq=building_type:1"); params.addToParamList("start=50"); params.addToParamList("rows=10"); params.addToParamList("wt=json"); QueryResult result = client.query(params); LOG.info("offset=" + result.getOffset()); LOG.info("length=" + result.getLength()); LOG.info("result=" + result.getResults()); transport.close(); } }
调用Thrift服务查询结果输出,大概如下格式:
result=[{"area":"上海","building_type":1,"floor":28,"category":"住宅","temperature":18,"code":576546387,"latitude":63.054478,"longitude":77.491035,"when":"2013-10-15 02:21:12","id":"0ca9ff70-6b33-42fd-a4e1-23a534b739f5"}, {"area":"上海","building_type":1,"floor":45,"category":"办公建筑","temperature":- 71,"code":824153427,"latitude":6.464198,"longitude":14.567751,"when":"2013-10-15 02:21:20","id":"25f46be0-f875-48b7-9f33-077602b2e820"}, {"area":"上 海","building_type":1,"floor":21,"category":"工业建筑","temperature":16,"code":215543388,"latitude":66.565796,"longitude":34.4735,"when":"2013-10-15 02:21:36","id":"0b32cd10 -ea40-4cdc-b23b-c6edccd7505d"}, {"area":"上海","building_type":1,"floor":53,"category":"工业建筑","temperature":- 23,"code":435344533,"latitude":24.783417,"longitude":119.01849,"when":"2013-10-15 02:21:37","id":"f324b65d-f3be-485a-9ed1-2f12a70b5d87"}, {"area":"上 海","building_type":1,"floor":78,"category":"教育建筑","temperature":-76,"code":298014795,"latitude":9.153033,"longitude":8.273011,"when":"2013-10-15 02:21:39","id":"7a9ea8d8-fad1-4c0b-bdb4-09851eee1432"}, {"area":"上海","building_type":1,"floor":35,"category":"办公建筑","temperature":- 3,"code":228287827,"latitude":13.038927,"longitude":62.056316,"when":"2013-10-15 02:21:42","id":"d6ed8172-a5e6-4391-b039-3b4465eabe9b"}, {"area":"上 海","building_type":1,"floor":77,"category":"公寓","temperature":28,"code":942108396,"latitude":61.970222,"longitude":4.418831,"when":"2013-10-15 02:21:42","id":"85a76085- 90e9-4f62-b10c-5a4895a4d08b"}, {"area":"上海","building_type":1,"floor":40,"category":"办公建 筑","temperature":50,"code":175416908,"latitude":38.17303,"longitude":148.83298,"when":"2013-10-15 02:21:51","id":"1496f73b-8057-4c7b-8ba9-bff4a4ee5189"}, {"area":"上 海","building_type":1,"floor":32,"category":"办公建筑","temperature":-52,"code":299718959,"latitude":30.68149,"longitude":54.327663,"when":"2013-10-15 02:22:05","id":"216a0b93-4872-45aa-baec-2f11e128f76e"}, {"area":"上海","building_type":1,"floor":78,"category":"住 宅","temperature":5,"code":586785207,"latitude":36.27908,"longitude":143.60094,"when":"2013-10-15 02:22:06","id":"10065088-02bf-422e-8761-54635668e1fe"}]
- C#查询客户端实现
使用C#实现查询服务的调用,首先从Thrift发行包的thrift-0.9.1\lib\csharp目录下C#相关的Thrift库文件导入到开发环境,然后还要将Thrift编译器编译生成的C#代码也加入到开发环境,然后就
可以实现客户端调用查询的代码,我们实现的如下所示:
using System; using System.Collections.Generic; using Thrift.Transport; using Thrift.Protocol; using Query.Proxy.Thrift.Protocol; namespace CSharpQueryClient { class QueryClient { static void Main(string[] args) { string host = "server.query-proxy.local"; TTransport transport = new TSocket(host, 9966); TProtocol protocol = new TBinaryProtocol(transport, true, true); transport.Open(); QueryProxyService.Client client = new QueryProxyService.Client(protocol); QueryParams queryParams = new QueryParams(); queryParams.Table = "collection1"; queryParams.Type = QueryType.SOLR; queryParams.ParamList = new List<string>(); queryParams.ParamList.Add("q=上海"); queryParams.ParamList.Add("fl=*"); queryParams.ParamList.Add("fq=building_type:1"); queryParams.ParamList.Add("start=50"); queryParams.ParamList.Add("rows=10"); queryParams.ParamList.Add("wt=json"); QueryResult result = client.query(queryParams); Console.WriteLine("offset=" + result.Offset); Console.WriteLine("length=" + result.Length); foreach (string record in result.Results) { Console.WriteLine("record=" + record); } Console.Read(); } } }
执行结果,与前面Java客户端的结果一致。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。