背景
在HMaster、RegionServer内部,创建了RpcServer实例,并与Client三者之间实现了Rpc调用,HBase0.95内部引入了Google-Protobuf作为中间数据组织方式,并在Protobuf提供的Rpc接口之上,实现了基于服务的Rpc实现,本文详细阐述了HBase-Rpc实现细节。
HBase的RPC Protocol
在HMaster、RegionServer内部,实现了rpc 多个protocol来完成管理和应用逻辑,具体如下protocol如下:
MasterMonitorProtocol,Client与Master之间的通信,Master是RpcServer端,主要实现HBase集群监控的目的。
HBase-RPC实现机制分析
RpcServer配置三个队列:
1)普通队列callQueue,绝大部分Call请求存在该队列中:callQueue上maxQueueLength为${ipc.server.max.callqueue.length},默认是${hbase.master.handler.count}*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER,目前0.95.1中,每个Handler上CallQueue的最大个数默认值(DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER)为10。
2)优先级队列: PriorityQueue。如果设置priorityHandlerCount的个数,会创建与callQueue相当容量的queue存储Call,该优先级队列对应的Handler的个数由rpcServer实例化时传入。
3)拷贝队列:replicationQueue。由于RpcServer由HMaster和RegionServer共用,该功能仅为RegionServer提供,queue的大小为${ipc.server.max.callqueue.size}指定,默认为1024*1024*1024,handler的个数为hbase.regionserver.replication.handler.count。
RpcServer由三个模块组成:
Listener ===Queue=== Responder
这里以HBaseAdmin.listTables为例,分析一个Rpc请求的函数调用过程:
1) RpcClient创建一个BlockingRpcChannel。
2)以channel为参数创建执行RPC请求需要的stub,此时的stub已经被封装在具体Service下,stub下定义了可执行的rpc接口。
3)stub调用对应的接口,实际内部channel调用callBlockingMethod方法。
RpcClient内实现了protobuf提供的BlockingRpcChannel接口方法callBlockingMethod,
@Override public Message callBlockingMethod(MethodDescriptor md, RpcController controller, Message param, Message returnType) throws ServiceException { return this.rpcClient.callBlockingMethod(md, controller, param, returnType, this.ticket, this.isa, this.rpcTimeout); } |
通过以上的实现细节,最终转换成rpcClient的调用,使用MethodDescriptor封装了不同rpc函数,使用Message基类可以接收基于Message的不同的Request和Response对象。
4)RpcClient创建Call对象,查找或者创建合适的Connection,并唤醒Connection。
5)Connection等待Call的Response,同时rpcClient调用函数中,会使用connection.writeRequest(Call call)将请求写入到RpcServer网络流中。
6)等待Call的Response,然后层层返回给更上层接口,从而完成此次RPC调用。
RPCServer收到的Rpc报文的内部组织如下:
Magic (4Byte) |
Version (1Byte) |
AuthMethod (1Byte) |
Connection HeaderLength (4Byte) |
ConnectionHeader |
Request |
“HBas” |
|||||
验证RpcServer的CURRENT_VERSION 与RPC报文一致 |
目前支持三类: AuthMethod.SIMPLE AuthMethod.KERBEROS AuthMethod.DIGEST |
RPC.proto定义 userInfo = 1; serviceName = 2; sending over optional cell blocks. Server throws exception cellBlockCodecClass = 3 [default = "org.apache. hadoop.hbase.codec. KeyValueCodec"]; if cell block is compressed. Server will throw exception if not supported. hadoop’s CompressionCodec Interface cellBlockCompressorClass = 4; |
整个Request存储是经过编码之后的byte数组,包括如下几个部分:
RequestHeaderLength(RawVarint32) |
RequestHeader |
ParamSize(RawVarint32) |
Param |
CellScanner |
RPC.proto定义: |
Protobuf的基本类型Message, |
从功能上讲,RpcServer上包含了三个模块,
1)Listener。包含了多个Reader线程,通过Selector获取ServerSocketChannel接收来自RpcClient发送来的Connection,并从中重构Call实例,添加到CallQueue队列中。
”IPC Server listener on 60021″ daemon prio=10 tid=0x00007f7210a97800 nid=0x14c6 runnable [0x00007f720e8d0000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x00000000c43cae68> (a sun.nio.ch.Util$2)
- locked <0x00000000c43cae50> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c4322ca8> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:84)
at org.apache.hadoop.hbase.ipc.RpcServer$Listener.run(RpcServer.java:646)
2)Handler。负责执行Call,调用Service的方法,然后返回Pair<Message,CellScanner>
“IPC Server handler 0 on 60021″ daemon prio=10 tid=0x00007f7210eab000 nid=0x14c7 waiting on condition [0x00007f720e7cf000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000c43cad90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
at org.apache.hadoop.hbase.ipc.RpcServer$Handler.run(RpcServer.java:1804)
3) Responder。负责把Call的结果返回给RpcClient。
”IPC Server Responder” daemon prio=10 tid=0x00007f7210a97000 nid=0x14c5 runnable [0x00007f720e9d1000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
- locked <0x00000000c4407078> (a sun.nio.ch.Util$2)
- locked <0x00000000c4407060> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000c4345b68> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.doRunLoop(RpcServer.java:833)
at org.apache.hadoop.hbase.ipc.RpcServer$Responder.run(RpcServer.java:816)
RpcClient为Rpc请求建立Connection,通过Connection将Call发送RpcServer,然后RpcClient等待结果的返回。
思考
1)为什么HBase新版本使用了Protobuf,并实现RPC接口?
HBase是Hadoop生态系统内重要的分布式数据库,Hadoop2.0广泛采用Protobuf作为中间数据组织方式,整个系统内Wire-Compatible的统一需求。
2)HBase内部实现的Rpc框架对于服务性能的影响?
目前使用Protobuf作为用户请求和内部数据交换的数据格式,采用更为紧缩编码格式,能够提高传输数据的效率。但是,有些优化仍然可以在该框架内探索:
实现多个Request复用Connection(把多个短连接合并成一个长连接);
在RpcServer内创建多个CallQueue,分别处理不同的Service,分离管理逻辑与应用逻辑的队列,保证互不干扰;
Responder单线程的模式,是否高并发应用的瓶颈所在?
是否可以分离Read/Write请求占用的队列,以及处理的handler,从而使得读写性能能够更加平衡?
针对读写应用的特点,在RpcServer层次内对应用进行分级,建立不同优先级的CallQueue,按照Hadoop-FairScheduler的模式,然后配置中心调度(类似OMega或者Sparrow轻量化调度方案),保证实时应用的低延迟和非实时应用的高吞吐。优先级更好的Call会优先被调度给Handler,而非实时应用可以实现多个Call的合并操作,从而提高吞吐。
3)Protobuf内置编码与传统压缩技术是否可以配合使用?
使用tcpdump获取了一段HMaster得到的RegionServer上报来的信息:
以上的信息几乎是明文出现在tcp-ip连接中,因此,是否在Protobuf-RPC数据格式采取一定的压缩策略,会给scan、multiGet等数据交互较为密集的应用提供一种优化的思路。
参考文献:
[1] HBase Rpc Protocols: http://blog.zahoor.in/2012/08/protocol-buffers-in-hbase/
[2] HBase project 0.95.1
本系列文章属于Binos_ICT在Binospace个人技术博客原创,原文链接为http://www.binospace.com/index.php/in-depth-analysis-hbase-rpc-0-95-version-implementation-mechanism/,未经允许,不得转载。
From Binospace, post 深入分析HBase RPC(Protobuf)实现机制
文章的脚注信息由WordPress的wp-posturl插件自动生成
相关推荐
hbase分页查询实现.pdf
hbase分页查询实现
hbase分页查询实现[归类].pdf
基于hadoop+hbase+springboot实现的分布式网盘系统,适合本科毕业设计 资源包含的整个demo在Hadoop,和Hbase环境搭建好了,可以启动起来。 技术选型 1.Hadoop 2.Hbase 3.SpringBoot ...... 系统实现的功能 1.用户...
离线安装TensorFlow0.12cpu版本时,其它要求包都能容易找到,就这个找不到,想办法在以前的电脑上找到的,用的时候直接解压到python包的安装路径Lib\site-packages下,就可以了,其它作用没试。
协议分析,可以分析protobuf消息。动态配置文件。 把文件放在(wireshark安装目录)\plugins目录下面。 把配置文件和proto文件放在(wireshark安装目录)\protobuf目录下面 插件基于wireshark 1.8.6 和protobuf ...
Hbase HLog源代码阅读笔记 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)...
HBaseCoprocessor的实现与应用.pdf
可以参考:http://baike.baidu.com/view/32726.htm)机制分析的博客一直耽搁了下来。昨天晚上胡老大和我抱怨说:最近乱的很。呵呵,老是往武汉跑,能不乱嘛。不过差不多腾讯面试的事就该告一段落了。五一期间,...
基于spring boot 的spring-boot-starter-hbase自动注解实现,HbaseTemplate的直接使用
10.hbase的整体工作机制--集群角色功能介绍--存储机制.mp4
基于hadoop+hbase+springboot实现分布式网盘系统
支持google protobuf 2.5版本, 1.1 之前的都是只支持2.4,自己封装了一个
HBase性能深度分析HBase性能深度分析
介绍一些hbase的底层存储机制,以更好认识hbase的原理
基于hadoop+hbase+springboot实现分布式网盘系统源码+数据集+详细文档(高分毕业设计).zip基于hadoop+hbase+springboot实现分布式网盘系统源码+数据集+详细文档(高分毕业设计).zip基于hadoop+hbase+springboot...
基于hbase + spark 实现常用推荐算法(主要用于精准广告投放和推荐系统).zip
HBase_SI_--_实现HBase_ACID的理论
基于数据冗余的HBase合并机制研究_HBase列式数据库的所有操作均以追加数据的方式写入,导致其合并机制占用资源过多,影响系统读性能。
这是hbase对数据存储的代码实现,让你轻松秒懂hbase,