HBase Thrift客户端Java API实践

HBase的Thrift API定义,可以通过链接 http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift?view=markup看到,我们需要安装Thrift编译器,才能生成HBase跨语言的API。
首先下载上面链接的内容,保存为Hbase.thrift。
然后,执行如下命令,生成不同编程语言的HBase API:

[hadoop@master hbase]$ thrift --gen cpp Hbase.thrift
[hadoop@master hbase]$ thrift --gen java Hbase.thrift
[hadoop@master hbase]$ thrift --gen py Hbase.thrift
[hadoop@master hbase]$ thrift --gen perl Hbase.thrift
[hadoop@master hbase]$ thrift --gen csharp Hbase.thrift
[hadoop@master hbase]$ thrift --gen php Hbase.thrift
[hadoop@master hbase]$ thrift --gen js Hbase.thrift
[hadoop@master hbase]$ thrift --gen go Hbase.thrift
[hadoop@master hbase]$ thrift --gen erl Hbase.thrift
[hadoop@master hbase]$ thrift --gen delphi Hbase.thrift
[hadoop@master hbase]$ thrift --gen hs Hbase.thrift
[hadoop@master hbase]$ thrift --gen html Hbase.thrift
[hadoop@master hbase]$ thrift --gen c_glib Hbase.thrift
[hadoop@master hbase]$ thrift --gen cocoa Hbase.thrift
[hadoop@master hbase]$ thrift --gen rb Hbase.thrift
[hadoop@master hbase]$ thrift --gen st Hbase.thrift
[hadoop@master hbase]$ thrift --gen xsd Hbase.thrift
[hadoop@master hbase]$ ls
gen-as3     gen-cocoa  gen-csharp  gen-erl  gen-hs    gen-java  gen-perl  gen-py  gen-st   Hbase.thrift
gen-c_glib  gen-cpp    gen-delphi  gen-go   gen-html  gen-js    gen-php   gen-rb  gen-xsd

这里,我们基于Java语言,使用HBase 的Thrift 客户端API访问HBase表。事实上,如果使用Java来实现对HBase表的操作,最好是使用HBase的原生API,无论从性能还是便利性方面,都会提供更好的体验。使用Thrift API访问,实际也是在HBase API之上进行了一层封装,可能初次使用Thrift API感觉很别扭,有时候还要参考Thrift服务端的实现代码。
准备工作如下:

    1. 下载Thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)
    2. 将上面生成的gen-java目录中代码拷贝到工作区
    3. 保证HBase集群正常运行,接着启动HBase的Thrift服务,执行如下命令:
bin/hbase thrift -b master -p 9090 start

上面,HBase的Thrift服务端口为9090,下面通过Thrift API访问的时候,需要用到,而不是HBase的服务端口(默认60000)。
接着,实现一个简单的例子,访问Hbase表。
首先,我们通过HBase Shell创建一个表:

create 'test_info', 'info'

表名为test_info,列簇名称为info。
然后,我们开始基于上面生成的Thrift代码来实现对HBase表的操作。
我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,代码如下所示:

package org.shirdrn.cloud.hbase.thrift;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public abstract class AbstractHBaseThriftService {

	protected static final String CHARSET = "UTF-8";
	private String host = "localhost";
	private int port = 9090;
	private final TTransport transport;
	protected final Hbase.Client client;

	public AbstractHBaseThriftService() {
		transport = new TSocket(host, port);
		TProtocol protocol = new TBinaryProtocol(transport, true, true);
		client = new Hbase.Client(protocol);
	}

	public AbstractHBaseThriftService(String host, int port) {
		super();
		transport = new TSocket(host, port);
		TProtocol protocol = new TBinaryProtocol(transport, true, true);
		client = new Hbase.Client(protocol);
	}

	public void open() throws TTransportException {
		if(transport != null) {
			transport.open();
		}
	}

	public void close() {
		if(transport != null) {
			transport.close();
		}
	}
	
	public abstract List<String> getTables() throws TException;
	
	public abstract void update(String table, String rowKey, boolean writeToWal,
			String fieldName, String fieldValue, Map<String, String> attributes) throws TException;
	public abstract void update(String table, String rowKey, boolean writeToWal,
			Map<String, String> fieldNameValues, Map<String, String> attributes) throws TException;
	
	public abstract void deleteCell(String table, String rowKey, boolean writeToWal,
			String column, Map<String, String> attributes) throws TException;
	public abstract void deleteCells(String table, String rowKey, boolean writeToWal,
			List<String> columns, Map<String, String> attributes) throws TException;
	
	 public abstract void deleteRow(String table, String rowKey,
		        Map<String, String> attributes) throws TException;
		        
	public abstract int scannerOpen(String table, String startRow, List<String> columns,
	        Map<String, String> attributes) throws TException;
	public abstract int scannerOpen(String table, String startRow, String stopRow, List<String> columns,
	        Map<String, String> attributes) throws TException;
	public abstract int scannerOpenWithPrefix(String table, String startAndPrefix,
            List<String> columns, Map<String, String> attributes) throws TException;
	public abstract int scannerOpenTs(String table, String startRow,
	        List<String> columns, long timestamp, Map<String, String> attributes) throws TException;
	public abstract int scannerOpenTs(String table, String startRow, String stopRow,
	        List<String> columns, long timestamp, Map<String, String> attributes) throws TException;
		        
	public abstract List<TRowResult> scannerGetList(int id, int nbRows) throws TException;
	public abstract List<TRowResult> scannerGet(int id) throws TException;
	
	public abstract List<TRowResult> getRow(String table, String row,
		        Map<String, String> attributes) throws TException;
	public abstract List<TRowResult> getRows(String table,
             List<String> rows, Map<String, String> attributes) throws TException;
	public abstract List<TRowResult> getRowsWithColumns(String table,
             List<String> rows, List<String> columns, Map<String, String> attributes) throws TException;
	
	public abstract void scannerClose(int id) throws TException;
	
	/**
	 * Iterate result rows(just for test purpose)
	 * @param result
	 */
	public abstract void iterateResults(TRowResult result);

}

这里,简单叙述一下,我们提供的客户端API的基本功能:

  • 建立到Thrift服务的连接:open()
  • 获取到HBase中的所有表名:getTables()
  • 更新HBase表记录:update()
  • 删除HBase表中一行的记录的数据(cell):deleteCell()和deleCells()
  • 删除HBase表中一行记录:deleteRow()
  • 打开一个Scanner,返回id:scannerOpen()、scannerOpenWithPrefix()和scannerOpenTs();然后用返回的id迭代记录:scannerGetList()和scannerGet()
  • 获取一行记录结果:getRow()、getRows()和getRowsWithColumns()
  • 关闭一个Scanner:scannerClose()
  • 迭代结果,用于调试:iterateResults()

比如,我们想要实现分页的逻辑,可能和传统的关系型数据库操作有些不同。基于HBase表的实现是,首先打开一个Scanner实例(例如调用scannerOpen()),返回一个id,然后再使用该id,调用scannerGetList()方法(可以指定每次返回几条记录的变量nbRows的值),返回一个记录列表,反复调用该scannerGetList()方法,直到此次没有结果返回为止。后面会通过测试用例来实际体会。
现在,我们基于上抽象出来的客户端操作接口,给出一个基本的实现,代码如下所示:

package org.shirdrn.cloud.hbase.thrift;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.thrift.TException;


public class HBaseThriftClient extends AbstractHBaseThriftService {

	public HBaseThriftClient() {
		super();
	}
	
	public HBaseThriftClient(String host, int port) {
		super(host, port);
	}
	
	@Override
	public List<String> getTables() throws TException {
		List<String> list = new ArrayList<String>(0);
		for (ByteBuffer buf : client.getTableNames()) {
			byte[] name = decode(buf);
			list.add(new String(name));
		}
		return list;
	}
	
	static ByteBuffer wrap(String value) {
		ByteBuffer bb = null;
		try {
			bb = ByteBuffer.wrap(value.getBytes(CHARSET));
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		return bb;
	}
	
	protected byte[] decode(ByteBuffer buffer) {
		byte[] bytes = new byte[buffer.limit()];
		for (int i = 0; i < buffer.limit(); i++) {
			bytes[i] = buffer.get();
		}
		return bytes;
	}

	@Override
	public void update(String table, String rowKey, boolean writeToWal,
			Map<String, String> fieldNameValues, Map<String, String> attributes) throws TException {
		List<Mutation> mutations = new ArrayList<Mutation>();
		for(Map.Entry<String, String> entry : fieldNameValues.entrySet()) {
			mutations.add(new Mutation(false, wrap(entry.getKey()), wrap(entry.getValue()), writeToWal));
		}
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		ByteBuffer tableName = wrap(table);
		ByteBuffer row = wrap(rowKey);
		client.mutateRow(tableName, row, mutations, wrappedAttributes);
	}

	@Override
	public void update(String table, String rowKey, boolean writeToWal, 
			String fieldName, String fieldValue, Map<String, String> attributes) throws IOError, TException {
		Map<String, String> fieldNameValues = new HashMap<String, String>();
		fieldNameValues.put(fieldName, fieldValue);
		update(table, rowKey, writeToWal, fieldNameValues, attributes);
	}
	
	
	@Override
	public void deleteCells(String table, String rowKey, boolean writeToWal, 
			List<String> columns, Map<String, String> attributes) throws TException {
		List<Mutation> mutations = new ArrayList<Mutation>();
		for(String column : columns) {
			mutations.add(new Mutation(false, wrap(column), null, writeToWal));
		}
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		ByteBuffer tableName = wrap(table);
		ByteBuffer row = wrap(rowKey);
		client.mutateRow(tableName, row, mutations, wrappedAttributes);
	}

	@Override
	public void deleteCell(String table, String rowKey, boolean writeToWal, 
			String column, Map<String, String> attributes) throws TException {
		List<String> columns = new ArrayList<String>(1);
		columns.add(column);
		deleteCells(table, rowKey, writeToWal, columns, attributes);
	}
	
	
	@Override
	public void deleteRow(String table, String rowKey, Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		ByteBuffer row = wrap(rowKey);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		client.deleteAllRow(tableName, row, wrappedAttributes);
	}
	
	
	@Override
	public int scannerOpen(String table, String startRow, List<String> columns, 
			Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		List<ByteBuffer> fl = encodeColumns(columns);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		return client.scannerOpen(tableName, wrap(startRow), fl, wrappedAttributes);
	}

	@Override
	public int scannerOpen(String table, String startRow, String stopRow, List<String> columns, 
			Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		List<ByteBuffer> fl = encodeColumns(columns);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		return client.scannerOpenWithStop(tableName, wrap(startRow), wrap(stopRow), fl, wrappedAttributes);
	}

	@Override
	public int scannerOpenWithPrefix(String table, String startAndPrefix, List<String> columns, 
			Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		List<ByteBuffer> fl = encodeColumns(columns);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		return client.scannerOpenWithPrefix(tableName, wrap(startAndPrefix), fl, wrappedAttributes);
	}

	@Override
	public int scannerOpenTs(String table, String startRow, List<String> columns, 
			long timestamp, Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		List<ByteBuffer> fl = encodeColumns(columns);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		return client.scannerOpenTs(tableName, wrap(startRow), fl, timestamp, wrappedAttributes);
	}

	@Override
	public int scannerOpenTs(String table, String startRow, String stopRow, List<String> columns, 
			long timestamp, Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		List<ByteBuffer> fl = encodeColumns(columns);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		return client.scannerOpenWithStopTs(tableName, wrap(startRow), wrap(stopRow), fl, timestamp, wrappedAttributes);
	}

	@Override
	public List<TRowResult> scannerGetList(int id, int nbRows) throws TException {
		return client.scannerGetList(id, nbRows);
	}

	@Override
	public List<TRowResult> scannerGet(int id) throws TException {
		return client.scannerGetList(id, 1);
	}
	
	@Override
	public List<TRowResult> getRow(String table, String row, 
			Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		return client.getRow(tableName, wrap(row), wrappedAttributes);
	}

	@Override
	public List<TRowResult> getRows(String table, List<String> rows, 
			Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		List<ByteBuffer> wrappedRows = encodeRows(rows);
		return client.getRows(tableName, wrappedRows, wrappedAttributes);
	}

	@Override
	public List<TRowResult> getRowsWithColumns(String table, List<String> rows, 
			List<String> columns, Map<String, String> attributes) throws TException {
		ByteBuffer tableName = wrap(table);
		List<ByteBuffer> wrappedRows = encodeRows(rows);
		List<ByteBuffer> wrappedColumns = encodeColumns(columns);
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes);
		return client.getRowsWithColumns(tableName, wrappedRows, wrappedColumns, wrappedAttributes);
	}
	
	private List<ByteBuffer> encodeColumns(List<String> columns) {
		List<ByteBuffer> fl = new ArrayList<ByteBuffer>(0);
		for(String column : columns) {
			fl.add(wrap(column));
		}
		return fl;
	}
	
	private Map<ByteBuffer, ByteBuffer> encodeAttributes(Map<String, String> attributes) {
		Map<ByteBuffer, ByteBuffer> wrappedAttributes = null;
		if(attributes != null && !attributes.isEmpty()) {
			wrappedAttributes = new HashMap<ByteBuffer, ByteBuffer>(1);
			for(Map.Entry<String, String> entry : attributes.entrySet()) {
				wrappedAttributes.put(wrap(entry.getKey()), wrap(entry.getValue()));
			}
		}
		return wrappedAttributes;
	}
	
	private List<ByteBuffer> encodeRows(List<String> rows) {
		List<ByteBuffer> list = new ArrayList<ByteBuffer>(0);
		for(String row : rows) {
			list.add(wrap(row));
		}
		return list;
	}
	
	@Override
	public void iterateResults(TRowResult result) {
		Iterator<Entry<ByteBuffer, TCell>> iter = result.columns.entrySet().iterator();
		System.out.println("RowKey=" + new String(result.getRow()));
		while (iter.hasNext()) {
			Entry<ByteBuffer, TCell> entry = iter.next();
			System.out.println("\tCol=" + new String(decode(entry.getKey())) + ", Value=" + new String(entry.getValue().getValue()));
		}
	}
	
	@Override
	public void scannerClose(int id) throws TException {
		client.scannerClose(id);		
	}
}

上面代码,给出了基本的实现,接着我们给出测试用例,调用我们实现的客户端操作,与HBase表进行交互。实现的测试用例类如下所示:

package org.shirdrn.cloud.hbase.thrift;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;

public class Test {

	private static final String CHARSET = "UTF-8";
	static DecimalFormat formatter = new DecimalFormat("00");
	private final AbstractHBaseThriftService client;
	
	public Test(String host, int port) {
		client = new HBaseThriftClient(host, port);
		try {
			client.open();
		} catch (TTransportException e) {
			e.printStackTrace();
		}
	}
	
	public Test() {
		this("master", 9090);
	}
	
	static String randomlyBirthday() {
		Random r = new Random();
		int year = 1900 + r.nextInt(100);
		int month = 1 + r.nextInt(12);
		int date = 1 + r.nextInt(30);
		return String.valueOf(year + "-" + formatter.format(month) + "-" + formatter.format(date));
	}
	
	static String randomlyGender() {
		Random r = new Random();
		int flag = r.nextInt(2);
		return flag == 0 ? "M" : "F";
	}
	
	static String randomlyUserType() {
		Random r = new Random();
		int flag = 1 + r.nextInt(10);
		return String.valueOf(flag);
	}
	
	static ByteBuffer wrap(String value) {
		ByteBuffer bb = null;
		try {
			bb = ByteBuffer.wrap(value.getBytes(CHARSET));
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
		return bb;
	}
	
	static DecimalFormat rowKeyFormatter = new DecimalFormat("0000");
	
	public void caseForUpdate() throws TException {
		boolean writeToWal = false;
		Map<String, String> attributes = new HashMap<String, String>(0);
		String table = setTable();
		// put kv pairs
		for (long i = 0; i < 10000; i++) {
			String rowKey = rowKeyFormatter.format(i);
			Map<String, String> fieldNameValues = new HashMap<String, String>();
			fieldNameValues.put("info:birthday", randomlyBirthday());
			fieldNameValues.put("info:user_type", randomlyUserType());
			fieldNameValues.put("info:gender", randomlyGender());
			client.update(table, rowKey, writeToWal, fieldNameValues, attributes);
		}
	}
	
	public void caseForDeleteCells() throws TException {
		boolean writeToWal = false;
		Map<String, String> attributes = new HashMap<String, String>(0);
		String table = setTable();
		// put kv pairs
		for (long i = 5; i < 10; i++) {
			String rowKey = rowKeyFormatter.format(i);
			List<String> columns = new ArrayList<String>(0);
			columns.add("info:birthday");
			client.deleteCells(table, rowKey, writeToWal, columns, attributes);
		}
	}

	private String setTable() {
		String table = "test_info";
		return table;
	}
	
	public void caseForDeleteRow() throws TException {
		Map<String, String> attributes = new HashMap<String, String>(0);
		String table = setTable();
		// delete rows
		for (long i = 5; i < 10; i++) {
			String rowKey = rowKeyFormatter.format(i);
			client.deleteRow(table, rowKey, attributes);
		}
	}
	
	public void caseForScan() throws TException {
		Map<String, String> attributes = new HashMap<String, String>(0);
		String table = setTable();
		String startRow = "0005";
		String stopRow = "0015";
		List<String> columns = new ArrayList<String>(0);
		columns.add("info:birthday");
		int id = client.scannerOpen(table, startRow, stopRow, columns, attributes);
		int nbRows = 2;
		List<TRowResult> results = client.scannerGetList(id, nbRows);
		while(results != null && !results.isEmpty()) {
			for(TRowResult result : results) {
				client.iterateResults(result);
			}
			results = client.scannerGetList(id, nbRows);
		}
		client.scannerClose(id);
	}
	
	public void caseForGet() throws TException {
		Map<String, String> attributes = new HashMap<String, String>(0);
		String table = setTable();
		List<String> rows = new ArrayList<String>(0);
		rows.add("0009");
		rows.add("0098");
		rows.add("0999");
		List<String> columns = new ArrayList<String>(0);
		columns.add("info:birthday");
		columns.add("info:gender");
		List<TRowResult> results = client.getRowsWithColumns(table, rows, columns, attributes);
		for(TRowResult result : results) {
			client.iterateResults(result);
		}
	}
	
	public static void main(String[] args) 
			throws IOError, IllegalArgument, TException, UnsupportedEncodingException {
		Test test = new Test();
//		test.caseForUpdate(); // insert or update rows/cells
//		test.caseForDelete(); // delete cells
//		test.caseForDeleteRow(); // delete rows
//		test.caseForScan(); // scan rows
		test.caseForGet(); // get rows
	}
	
}

上面的测试可以实现操作Hbase表数据。另外,在生成的Thrift客户端代码中,org.apache.hadoop.hbase.thrift.generated.Hbase.Iface中给出了全部的服务接口,可以根据需要来选择,客户端org.apache.hadoop.hbase.thrift.generated.Hbase.Client实现了与Thrift交互的一些逻辑的处理,通过该类对象可以代理HBase提供的Thrift服务。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(3): “HBase Thrift客户端Java API实践

  1. Pingback: 基于C#+Thrift操作HBase实践 | 简单之美

  2. Pingback: Impala与HBase整合实践 | 简单之美

  3. Pingback: Impala与HBase整合实践 | IDO

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>