基于C#+Thrift操作HBase实践

在基于HBase数据库的开发中,对应Java语言来说,可以直接使用HBase的原生API来操作HBase表数据,当然你要是不嫌麻烦可以使用Thrift客户端Java API,这里有我曾经使用过的 HBase Thrift客户端Java API实践,可以参考。对于具有其他编程语言背景的开发人员,为了获取HBase带来的好处,那么就可以选择使用HBase Thrift客户端对应编程语言的API,来实现与HBase的交互。
这里,我们使用C#客户端来操作HBase。HBase的Thrift接口的定义,可以通过链接http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift?view=markup看到,我们需要安装Thrift编译器,才能生成HBase跨语言的API,这里,我使用的版本是0.9.0。需要注意的是,一定要保证,安装了某个版本Thrift的Thrift编译器,在导入对应语言库的时候,版本一定要统一,否则就会出现各种各样的问题,因为不同Thrift版本,对应编程语言的库API可能有变化。
首先,下载上面链接的内容,保存为Hbase.thrift。
然后,执行如下命令,生成C#编程语言的HBase Thrift客户端API:

[hadoop@master hbase]$ thrift --gen csharp Hbase.thrift
[hadoop@master hbase]$ ls
gen-csharp

这里,我们基于C#语言,使用HBase 的Thrift 客户端API访问HBase表。事实上,如果使用Java来实现对HBase表的操作,最好是使用HBase的原生API,无论从性能还是便利性方面,都会提供更好的体验。使用Thrift API访问,实际也是在HBase API之上进行了一层封装,可能初次使用Thrift API感觉很别扭,有时候还要参考Thrift服务端的实现代码。
准备工作如下:

    1. 下载Thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)
    2. 将上面生成的gen-csharp目录中代码拷贝到工作区
    3. 保证HBase集群正常运行,接着启动HBase的Thrift服务,执行如下命令:
bin/hbase thrift -b master -p 9090 start

上面,HBase的Thrift服务端口为9090,下面通过Thrift API访问的时候,需要用到,而不是HBase的服务端口(默认60000)。
接着,实现一个简单的例子,访问Hbase表。
首先,我们通过HBase Shell创建一个表:

create 'test_info', 'info'

表名为test_info,列簇名称为info。
然后,我们开始基于上面生成的Thrift代码来实现对HBase表的操作。
这里,我们实际上是对HBase Thrift客户端Java API实践中的Java代码进行了翻译,改写成C#语言的相关操作。我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,对应的命名空间为HbaseThrift.HBase.Thrift,该类实现代码如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using Thrift.Transport;
using Thrift.Protocol;

namespace HbaseThrift.HBase.Thrift
{
    public abstract class AbstractHBaseThriftService
    {
        protected static readonly string CHARSET = "UTF-8";
	    private string host = "localhost";
	    private int port = 9090;
	    private readonly TTransport transport;
	    protected readonly Hbase.Client client;

        public AbstractHBaseThriftService() : this("localhost", 9090)
        {
            
        }

        public AbstractHBaseThriftService(string host, int port)
        {
            this.host = host;
            this.port = port;
            transport = new TSocket(host, port);
            TProtocol protocol = new TBinaryProtocol(transport, true, true);
            client = new Hbase.Client(protocol);
        }

        public void Open() {
            if (transport != null)
            {
                transport.Open();
            }
	    }

        public void Close()
        {
            if (transport != null)
            {
                transport.Close();
            }
        }

        public abstract List<string> GetTables();
	
	    public abstract void Update(string table, string rowKey, bool writeToWal,
			string fieldName, string fieldValue, Dictionary<string, string> attributes);
        public abstract void Update(string table, string rowKey, bool writeToWal,
			Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes);
	
	    public abstract void DeleteCell(string table, string rowKey, bool writeToWal,
			    string column, Dictionary<string, string> attributes);
	    public abstract void DeleteCells(string table, string rowKey, bool writeToWal,
			    List<string> columns, Dictionary<string, string> attributes);
	
	     public abstract void DeleteRow(string table, string rowKey,
		            Dictionary<string, string> attributes);
		        
	    public abstract int ScannerOpen(string table, string startRow, List<string> columns,
	            Dictionary<string, string> attributes);
	    public abstract int ScannerOpen(string table, string startRow, string stopRow, List<string> columns,
	            Dictionary<string, string> attributes);
	    public abstract int ScannerOpenWithPrefix(string table, string startAndPrefix,
                List<string> columns, Dictionary<string, string> attributes);
	    public abstract int ScannerOpenTs(string table, string startRow,
	            List<string> columns, long timestamp, Dictionary<string, string> attributes);
	    public abstract int ScannerOpenTs(string table, string startRow, string stopRow,
	            List<string> columns, long timestamp, Dictionary<string, string> attributes);
		        
	    public abstract List<TRowResult> ScannerGetList(int id, int nbRows);
	    public abstract List<TRowResult> ScannerGet(int id);
	
	    public abstract List<TRowResult> GetRow(string table, string row,
		            Dictionary<string, string> attributes);
	    public abstract List<TRowResult> GetRows(string table,
                 List<string> rows, Dictionary<string, string> attributes);
	    public abstract List<TRowResult> GetRowsWithColumns(string table,
                 List<string> rows, List<string> columns, Dictionary<string, string> attributes);
	
	    public abstract void ScannerClose(int id);
	
	    /**
	     * Iterate result rows(just for test purpose)
	     * @param result
	     */
	    public abstract void IterateResults(TRowResult result);

    }
}

这里,简单叙述一下,我们提供的客户端API的基本功能:

  • 建立到Thrift服务的连接:Open()
  • 获取到HBase中的所有表名:GetTables()
  • 更新HBase表记录:Update()
  • 删除HBase表中一行的记录的数据(cell):DeleteCell()和DeleCells()
  • 删除HBase表中一行记录:deleteRow()
  • 打开一个Scanner,返回id:ScannerOpen()、ScannerOpenWithPrefix()和ScannerOpenTs();然后用返回的id迭代记录:ScannerGetList()和ScannerGet()
  • 获取一行记录结果:GetRow()、GetRows()和GetRowsWithColumns()
  • 关闭一个Scanner:ScannerClose()
  • 迭代结果,用于调试:IterateResults()

比如,我们想要实现分页的逻辑,可能和传统的关系型数据库操作有些不同。基于HBase表的实现是,首先打开一个Scanner实例(例如调用ScannerOpen()),返回一个id,然后再使用该id,调用ScannerGetList()方法(可以指定每次返回几条记录的变量nbRows的值),返回一个记录列表,反复调用该ScannerGetList()方法,直到此次没有结果返回为止。后面会通过测试用例来实际体会。
现在,我们基于上抽象出来的客户端操作接口,给出一个基本的实现,代码如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HbaseThrift.HBase.Thrift
{
    class HBaseThriftClient : AbstractHBaseThriftService
    {
        public HBaseThriftClient() : this("localhost", 9090)
        {

        }

        public HBaseThriftClient(string host, int port) : base(host, port)
        {
            
        }

        public override List<string> GetTables()
        {
            List<byte[]> tables = client.getTableNames();
            List<String> list = new List<String>();
            foreach(byte[] table in tables)
            {
                list.Add(Decode(table));
            }
            return list;
        }

        public override void Update(string table, string rowKey, bool writeToWal, string fieldName, string fieldValue, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            Mutation mutation = new Mutation();
            mutation.IsDelete = false;
            mutation.WriteToWAL = writeToWal;
            mutation.Column = Encode(fieldName);
            mutation.Value = Encode(fieldValue);
            mutations.Add(mutation);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        public override void Update(string table, string rowKey, bool writeToWal, Dictionary<string, string> fieldNameValues, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            foreach (KeyValuePair<String, String> pair in fieldNameValues)
            {
                Mutation mutation = new Mutation();
                mutation.IsDelete = false;
                mutation.WriteToWAL = writeToWal;
                mutation.Column = Encode(pair.Key);
                mutation.Value = Encode(pair.Value);
                mutations.Add(mutation);
            }
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        public override void DeleteCell(string table, string rowKey, bool writeToWal, string column, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            Mutation mutation = new Mutation();
            mutation.IsDelete = true;
            mutation.WriteToWAL = writeToWal;
            mutation.Column = Encode(column);
            mutations.Add(mutation);
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        public override void DeleteCells(string table, string rowKey, bool writeToWal, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            List<Mutation> mutations = new List<Mutation>();
            foreach (string column in columns)
            {
                Mutation mutation = new Mutation();
                mutation.IsDelete = true;
                mutation.WriteToWAL = writeToWal;
                mutation.Column = Encode(column);
                mutations.Add(mutation);
            }
            client.mutateRow(tableName, row, mutations, encodedAttributes);
        }

        public override void DeleteRow(string table, string rowKey, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] row = Encode(rowKey);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            client.deleteAllRow(tableName, row, encodedAttributes);
        }

        public override int ScannerOpen(string table, string startRow, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpen(tableName, start, encodedColumns, encodedAttributes);
        }

        public override int ScannerOpen(string table, string startRow, string stopRow, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            byte[] stop = Encode(stopRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithStop(tableName, start, stop, encodedColumns, encodedAttributes);
        }

        public override int ScannerOpenWithPrefix(string table, string startAndPrefix, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] prefix = Encode(startAndPrefix);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithPrefix(tableName, prefix, encodedColumns, encodedAttributes);
        }

        public override int ScannerOpenTs(string table, string startRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenTs(tableName, start, encodedColumns, timestamp, encodedAttributes);
        }

        public override int ScannerOpenTs(string table, string startRow, string stopRow, List<string> columns, long timestamp, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] start = Encode(startRow);
            byte[] stop = Encode(stopRow);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.scannerOpenWithStopTs(tableName, start, stop, encodedColumns, timestamp, encodedAttributes);
        }

        public override List<TRowResult> ScannerGetList(int id, int nbRows)
        {
            return client.scannerGetList(id, nbRows);
        }

        public override List<TRowResult> ScannerGet(int id)
        {
            return client.scannerGet(id);
        }

        public override List<TRowResult> GetRow(string table, string row, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            byte[] startRow = Encode(row);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.getRow(tableName, startRow, encodedAttributes);
        }

        public override List<TRowResult> GetRows(string table, List<string> rows, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            List<byte[]> encodedRows = EncodeStringList(rows);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.getRows(tableName, encodedRows, encodedAttributes);
        }

        public override List<TRowResult> GetRowsWithColumns(string table, List<string> rows, List<string> columns, Dictionary<string, string> attributes)
        {
            byte[] tableName = Encode(table);
            List<byte[]> encodedRows = EncodeStringList(rows);
            List<byte[]> encodedColumns = EncodeStringList(columns);
            Dictionary<byte[], byte[]> encodedAttributes = EncodeAttributes(attributes);
            return client.getRowsWithColumns(tableName, encodedRows, encodedColumns, encodedAttributes);
        }

        public override void ScannerClose(int id)
        {
            client.scannerClose(id);
        }

        public override void IterateResults(TRowResult result)
        {
            foreach (KeyValuePair<byte[], TCell> pair in result.Columns)
            {
                Console.WriteLine("\tCol=" + Decode(pair.Key) + ", Value=" + Decode(pair.Value.Value));
            }
        }

        private String Decode(byte[] bs)
        {
            return UTF8Encoding.Default.GetString(bs);
        }

        private byte[] Encode(String str)
        {
            return UTF8Encoding.Default.GetBytes(str);
        }

        private Dictionary<byte[], byte[]> EncodeAttributes(Dictionary<String, String> attributes)
        {
            Dictionary<byte[], byte[]> encodedAttributes = new Dictionary<byte[], byte[]>();
            foreach (KeyValuePair<String, String> pair in attributes)
            {
                encodedAttributes.Add(Encode(pair.Key), Encode(pair.Value));
            }
            return encodedAttributes;
        }

        private List<byte[]> EncodeStringList(List<String> strings) 
        {
            List<byte[]> list = new List<byte[]>();
            if (strings != null)
            {
                foreach (String str in strings)
                {
                    list.Add(Encode(str));
                }
            }
            return list;
        }
    }
}

上面代码,给出了基本的实现,接着我们给出测试用例,调用我们实现的客户端操作,与HBase表进行交互。实现的测试用例类如下所示:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace HbaseThrift.HBase.Thrift
{
    class Test
    {
        private readonly AbstractHBaseThriftService client;

        public Test(String host, int port)
        {
            client = new HBaseThriftClient(host, port);
        }

        public Test() : this("master", 9090)
        {
            
        }

        static String RandomlyBirthday()
        {
            Random r = new Random();
            int year = 1900 + r.Next(100);
            int month = 1 + r.Next(12);
            int date = 1 + r.Next(30);
            return year + "-" + month.ToString().PadLeft(2, '0') + "-" + date.ToString().PadLeft(2, '0');
        }

        static String RandomlyGender()
        {
            Random r = new Random();
            int flag = r.Next(2);
            return flag == 0 ? "M" : "F";
        }

        static String RandomlyUserType()
        {
            Random r = new Random();
            int flag = 1 + r.Next(10);
            return flag.ToString();
        }

        public void Close()
        {
            client.Close();
        }

        public void CaseForUpdate() {
		    bool writeToWal = false;
            Dictionary<String, String> attributes = new Dictionary<String, String>(0);
		    string table = SetTable();
		    // put kv pairs
		    for (int i = 0; i < 10000000; i++) {
                string rowKey = i.ToString().PadLeft(4, '0');
                Dictionary<String, String> fieldNameValues = new Dictionary<String, String>();
			    fieldNameValues.Add("info:birthday", RandomlyBirthday());
			    fieldNameValues.Add("info:user_type", RandomlyUserType());
			    fieldNameValues.Add("info:gender", RandomlyGender());
			    client.Update(table, rowKey, writeToWal, fieldNameValues, attributes);
		    }
	    }

        public void CaseForDeleteCells() {
		    bool writeToWal = false;
            Dictionary<String, String> attributes = new Dictionary<String, String>(0);
		    String table = SetTable();
		    // put kv pairs
		    for (long i = 5; i < 10; i++) {
			    String rowKey = i.ToString().PadLeft(4, '0');
			    List<String> columns = new List<String>(0);
			    columns.Add("info:birthday");
			    client.DeleteCells(table, rowKey, writeToWal, columns, attributes);
		    }
	    }

        public void CaseForDeleteRow() {
		    Dictionary<String, String> attributes = new Dictionary<String, String>(0);
		    String table = SetTable();
		    // delete rows
		    for (long i = 5; i < 10; i++) {
			    String rowKey = i.ToString().PadLeft(4, '0');
			    client.DeleteRow(table, rowKey, attributes);
		    }
	    }
	
	    public void CaseForScan() {
		    Dictionary<String, String> attributes = new Dictionary<String, String>(0);
		    String table = SetTable();
		    String startRow = "0005";
		    String stopRow = "0015";
		    List<String> columns = new List<String>(0);
		    columns.Add("info:birthday");
		    int id = client.ScannerOpen(table, startRow, stopRow, columns, attributes);
		    int nbRows = 2;
		    List<TRowResult> results = client.ScannerGetList(id, nbRows);
		    while(results != null) {
			    foreach(TRowResult result in results) {
				    client.IterateResults(result);
			    }
			    results = client.ScannerGetList(id, nbRows);
		    }
		    client.ScannerClose(id);
	    }
	
	    public void CaseForGet() {
		    Dictionary<String, String> attributes = new Dictionary<String, String>(0);
		    String table = SetTable();
		    List<String> rows = new List<String>(0);
		    rows.Add("0009");
		    rows.Add("0098");
		    rows.Add("0999");
		    List<String> columns = new List<String>(0);
		    columns.Add("info:birthday");
		    columns.Add("info:gender");
		    List<TRowResult> results = client.GetRowsWithColumns(table, rows, columns, attributes);
		    foreach(TRowResult result in results) {
			    client.IterateResults(result);
		    }
	    }

        private string SetTable()
        {
            string table = "test_info";
            return table;
        }

        static void Main(string[] args)
        {
            Test test = new Test();
            //test.CaseForUpdate(); // insert or update rows/cells
            //test.CaseForDeleteCells(); // delete cells
            //test.CaseForDeleteRow(); // delete rows
            test.CaseForScan(); // scan rows
            //test.CaseForGet(); // get rows

            test.Close();
        }

    }
}

上面的测试可以实现操作Hbase表数据。另外,在生成的Thrift客户端代码中,Iface中给出了全部的服务接口,可以根据需要来选择,客户端Client实现了与Thrift交互的一些逻辑的处理,通过该类对象可以代理HBase提供的Thrift服务。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>