HQueue基于HBase的消息隊(duì)列
1. HQueue簡(jiǎn)介
HQueue是一淘搜索網(wǎng)頁(yè)抓取離線系統(tǒng)團(tuán)隊(duì)基于HBase開發(fā)的一套分布式、持久化消息隊(duì)列。它利用HTable存儲(chǔ)消息數(shù)據(jù),借助HBase Coprocessor將原始的KeyValue數(shù)據(jù)封裝成消息數(shù)據(jù)格式進(jìn)行存儲(chǔ),并基于HBase Client API封裝了HQueue Client API用于消息存取。
HQueue可以有效使用在需要存儲(chǔ)時(shí)間序列數(shù)據(jù)、作為MapReduce Job和iStream等輸入、輸出供上下游共享數(shù)據(jù)等場(chǎng)合。
2. HQueue特性
由于HQueue是基于HBase進(jìn)行消息存取的,因此站在HDFS和HBase的肩膀上,使得其具備如下特點(diǎn):
(1)支持多Partitions,可根據(jù)需求設(shè)置Queue的規(guī)模,支持高并發(fā)訪問(wèn)(HBase的多Region);
(2)支持自動(dòng)Failover,任何機(jī)器Down掉,Partition可自動(dòng)遷移至其他機(jī)器(HBase的Failover機(jī)制);
(3)支持動(dòng)態(tài)負(fù)載均衡,Partition可以動(dòng)態(tài)被調(diào)度到最合理的機(jī)器上(HBase的LoadBalance機(jī)制,可動(dòng)態(tài)調(diào)整);
(4)利用HBase進(jìn)行消息的持久化存儲(chǔ),不丟失數(shù)據(jù)(HBase HLog和HDFS Append);
(5)隊(duì)列的讀寫模式與HBase的存儲(chǔ)特性天然切合,具備良好的并發(fā)讀寫性能(最新消息存儲(chǔ)在MemStore中,寫消息直接寫入MemStore,通常場(chǎng)景下都是內(nèi)存級(jí)操作);
(6)支持消息按Topic進(jìn)行分類存?。℉Base中的Qualifier);
(7)支持消息TTL,自動(dòng)清理過(guò)期消息(HBase支持KeyValue級(jí)別的TTL);
(8)HQueue = HTable Schema Design + HQueue Coprocessor + HBase Client Wrapper,完全擴(kuò)展開發(fā),無(wú)任何Hack工作,可隨HBase自動(dòng)升級(jí);
(9)HQueue Client API基于HBase Client Wrapper進(jìn)行簡(jiǎn)單封裝,HBase的ThriftServer使得其支持多語(yǔ)言API,因此HQueue也很容易封裝出多語(yǔ)言API;
(10)HQueue Client API可以天然支持Hadoop MapReduce Job和iStream的InputFormat機(jī)制,利用Locality特性將計(jì)算調(diào)度到存儲(chǔ)最近的機(jī)器;
(11)HQueue支持消息訂閱機(jī)制(HQueue 0.3及后續(xù)版本)。
3. HQueue系統(tǒng)設(shè)計(jì)及處理流程
3.1. HQueue系統(tǒng)結(jié)構(gòu)
HQueue系統(tǒng)結(jié)構(gòu)如圖(1)所示:
圖(1):HQueue系統(tǒng)結(jié)構(gòu)
其中:
(1)每個(gè)Queue對(duì)應(yīng)一個(gè)HTable,創(chuàng)建Queue可以通過(guò)Presharding Table方式創(chuàng)建,有利于負(fù)載均衡。
(2)每個(gè)Queue可以有多個(gè)Partitions(HBase Regions),這些Partitions均勻分布在HBase集群中的多個(gè)Region Servers中。
(3)每個(gè)Partition可以在HBase集群的多個(gè)Region Servers中動(dòng)態(tài)遷移。任何一臺(tái)Region Server掛掉,運(yùn)行在其上的HQueue Partition可以自動(dòng)遷移到其他Region Server上,并且數(shù)據(jù)不會(huì)丟失。當(dāng)集群負(fù)載不均衡時(shí),HQueue Partition會(huì)自動(dòng)被HMaster遷移到負(fù)載低的Region Server。
(4)每個(gè)Message對(duì)應(yīng)一個(gè)HBase KeyValue Pair,按MessageID即時(shí)間順序存儲(chǔ)在HBase Region中。MessageID由Timestamp和同一Timestamp下自增的SequenceID構(gòu)成,詳細(xì)信息參見(jiàn)《Message存儲(chǔ) 結(jié)構(gòu)》部分。
3.2. Message存儲(chǔ)結(jié)構(gòu)
Message存儲(chǔ)結(jié)構(gòu)如圖(2)所示:
圖(2):Message存儲(chǔ)結(jié)構(gòu)
其中:
(1)RowKey:由PartitionID和MessageID構(gòu)成。
PartitionID:一個(gè)Queue可以有多個(gè)Partitions,目前最多支持Short.MAX_VALUE個(gè) Partitions。Partition ID可以不在創(chuàng)建Message對(duì)象時(shí)指定,而是在發(fā)送消息時(shí)設(shè)定,或者不指定而使用一個(gè)隨機(jī)Partition ID。
MessageID:即消息ID,它由Timestamp和SequenceID兩部分組成。Timestamp是消息寫入HQueue時(shí)的時(shí) 間戳,單位為毫秒。SequenceID是同一Timestamp下消息的順序編號(hào),目前最多支持同一Timestamp下 Short.MAX_VALUE個(gè)Messages。
(2)Column:由Column Family和Message Topic構(gòu)成。
Column Family:HBase Column Family,此處為固定值“message”。
Message Topic :HBase Column Qualifier,消息Topic名稱。用戶可以根據(jù)需要將Message存儲(chǔ)在不同的Topics之下,也可以從Queue中獲取感興趣的Topics消息數(shù)據(jù)。
(3)Value:即消息內(nèi)容。
3.3. HQueue消息寫入及Coprocessor處理流程
HQueue利用HQueue Client API寫入消息數(shù)據(jù),為保證消息唯一和有序,HQueue利用Coprocessor處理用戶寫入消息的MessageID,然后立即放入HBase MemStore中,使其可以被訪問(wèn)到,最后持久化的HLog中。具體的處理邏輯如圖(3)所示:
圖(3):數(shù)據(jù)寫入及Coprocessor處理流程
其中:
(1)HQueue封裝了HQueue Client API,用戶可以使用其提供Put等方法向HQueue中寫入消息。
(2)HQueue Client會(huì)使用Message.makeKeyValueRow()用于完成將Message數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換成HBase Rowkey。HQueue所要求的RowKey格式可以參加上述內(nèi)容。
(3)HQueue Client在完成RowKey的轉(zhuǎn)換后,會(huì)調(diào)用HTable的put方法按照HBase標(biāo)準(zhǔn)的寫入流程來(lái)完成消息的寫入。
(4)HQueue上注冊(cè)有HQueueCoprocessor,它擴(kuò)展自BaseRegionObserver。HRegion在真正寫入消息數(shù)據(jù)前, 會(huì)調(diào)用HQueueCoprocessor的preBatchMutate方法,該方法主要用于調(diào)整MessageID,保證MessageID唯一并且 有序。
(5)在HQueueCoprocessor的preBatchMutate方法中同時(shí)會(huì)調(diào)整Durability為SKIP_WAL,這樣HBase將不會(huì)主動(dòng)將消息數(shù)據(jù)持久化進(jìn)HLog。
(6)HRegion在寫入消息數(shù)據(jù)后,會(huì)調(diào)用HQueueCoprocessor的postBatchMutate方法,該方法主要完成將消息數(shù)據(jù)持久化進(jìn)HLog的功能。
3.4. HQueue Scan處理流程
為了方便從Queue中Scan數(shù)據(jù),HQueue封裝了ClientScanner,提供了QueueScanner、 PartitionScanner和CombinedPartitionScanner等Scanner,用于不同的場(chǎng)景。HQueue Scan的具體處理流程如圖(4)所示:
圖(4):HQueue Scan處理流程
其中:
(1)用戶可以根據(jù)需要從HQueue Client中獲取所需的Queue Scanner,目前主要提供三種Scanner:
QueueScanner:用于Scan Queue中全部Partitions的數(shù)據(jù);
PartitionScanner:用于Scan Queue中指定Partition的數(shù)據(jù);
CombinedPartitionScanner:用于Scan Queue中若干指定Partitions的數(shù)據(jù)。
(2)用戶獲取到Scanner之后,可以循環(huán)調(diào)用Scanner的next方法依次取出消息數(shù)據(jù),直至無(wú)數(shù)據(jù)返回,本次Scan結(jié)束。Scan結(jié)束后,用戶應(yīng)主動(dòng)關(guān)閉Scanner以便及時(shí)釋放資源。
(3)用戶在不再使用先前創(chuàng)建的Queue對(duì)象時(shí),應(yīng)主動(dòng)關(guān)閉Queue以便及時(shí)釋放資源。
3.5. HQueue訂閱流程
3.5.1. 整體流程
HQueue自0.3版本開始提供訂閱功能,一個(gè)訂閱者可以訂閱一個(gè)Queue的多個(gè)Partitions、多個(gè)Topics。與用戶使用 Scanner主動(dòng)Scan消息數(shù)據(jù)的方式相比,訂閱方式具有(1)消息數(shù)據(jù)一旦寫入Queue便會(huì)被主動(dòng)推送至訂閱者,消息送達(dá)更為及時(shí);(2)訂閱者 被動(dòng)接收新消息,可以省去HQueue無(wú)新消息數(shù)據(jù)時(shí)多余的Scan操作,減少系統(tǒng)開銷等優(yōu)點(diǎn)。
HQueue訂閱流程處理邏輯如圖(5)所示:
圖(5):HQueue訂閱流程處理邏輯
其中:
(1)HQueue訂閱主要由Subscriber、ZooKeeper和Coprocessor這三部分組成。其中:
Subscrier:即訂閱者。主要完成向ZoeoKeeper寫入訂閱信息、啟動(dòng)監(jiān)聽(tīng)、接收新消息并回調(diào)注冊(cè)在其上的消息處理函數(shù)(MessageListener)等功能。
ZooKeeper:用于保存訂閱者提交的訂閱信息,主要包括訂閱者訂閱的Queue、Partitions和Topics;訂閱者的地址和Checkpoint等信息,更為詳細(xì)信息參見(jiàn)后續(xù)描述。
Coprocessor:主要完成從ZooKeeper獲取訂閱信息、使用InternalScanner從Queue中Scan最新的消息、將新消息發(fā)送至訂閱者并將當(dāng)前Checkpoint更新至ZooKeeper等功能。
(2)Coprocessor的主要處理流程如下:
Step 1:創(chuàng)建Subscriber,添加訂閱信息和消息處理函數(shù),將訂閱信息寫入ZooKeeper,啟動(dòng)監(jiān)聽(tīng)等待接收新消息。寫入ZooKeeper中的訂閱信息主要包括:
訂閱者訂閱的Queue名稱;
訂閱者訂閱的Queuee Partitions以及各Partition上消息的起始ID。一個(gè)訂閱者可以訂閱多個(gè)Partitions,如果沒(méi)有指定,那么認(rèn)為訂閱該Queue的所有Partitions。
訂閱者訂閱的消息Topics。一個(gè)訂閱者可以訂閱多個(gè)主題,如果沒(méi)有指定,那么認(rèn)為訂閱該Queue上的所有Topics。
訂閱者的Addresss/Hostname和監(jiān)聽(tīng)端口。用戶創(chuàng)建訂閱者時(shí)可以指定監(jiān)聽(tīng)端口,如果沒(méi)有指定,那么會(huì)隨機(jī)選擇一個(gè)當(dāng)前可用端口作為監(jiān)聽(tīng)端口。
Step 2:Coprocessor從ZooKeeper獲取訂閱信息并向ZooKeeper注冊(cè)相關(guān)Watcher,以便ZooKeeper中訂閱信息發(fā)生變化 時(shí)ZooKeeper能夠及時(shí)通知Coprocessor。Coprocessor在獲取到訂閱信息后,會(huì)根據(jù)需要?jiǎng)?chuàng)建 SubscriptionWorker等工作線程,以便從HQueue Partition中Scan消息并將消息發(fā)送至Subscriber。
Step 3:Coprocessor從HQueue Partition中Scan新消息。
Step 4:Coprocessor將新消息發(fā)送至Subscriber。
Step 5:Subscriber在接收到新消息時(shí),會(huì)回調(diào)注冊(cè)在其上的回調(diào)函數(shù)。
Step 6:待新消息發(fā)送成功后,Coprocessor會(huì)將消息的Checkpoint更新至ZooKeeper以便后續(xù)使用。
Step 7:Subscriber取消訂閱,并從ZooKeeper中刪除必要的訂閱信息。
Step 8:ZooKeeper會(huì)通過(guò)注冊(cè)在其上的Watcher將Subscriber訂閱信息的變化通知至Coprocessor,Coprocessor根據(jù)訂閱信息的變化,暫停SubscriptionWorker等工作線程等。
3.5.2. HQueue Subscriber
HQueue Subscriber結(jié)構(gòu)和主要處理邏輯如圖(6)所示:
圖(6):HQueue Subscriber結(jié)構(gòu)和主要處理邏輯
其中:
(1)Subscriber主要由兩部分組成:SubscriberZooKeeper和Thrift Server。其中,SubscriberZooKeeper主要完成與ZooKeeper相關(guān)的若干操作,包括寫入訂閱信息、刪除訂閱信息等。 Coprocessor與Subscriber之間的通訊通過(guò)Thrift來(lái)完成,Subscriber中啟動(dòng)Thrift Server,監(jiān)聽(tīng)指定的端口,等待接收Coprocessor發(fā)送過(guò)來(lái)的新消息。
(2)Subscriber通過(guò)Thrift Server接收到新消息后,會(huì)回調(diào)注冊(cè)在其上的回調(diào)函數(shù)(MessageListeners),并將狀態(tài)碼返回給Coprocessor。
(3)可以在一個(gè)Subscriber上注冊(cè)多個(gè)MessageListeners,多個(gè)MessageListeners會(huì)被依次調(diào)用。
3.5.3. HQueue Coprocessor
HQueue Coprocessor結(jié)構(gòu)和主要處理邏輯如圖(7)所示:
圖(7):HQueue Coprocessor結(jié)構(gòu)和主要處理邏輯
其中:
(1)Coprocessor:主要由兩部分構(gòu)成SubscriptionZooKeeper和SubscriptionWorker。
SubscriptionZooKeeper:主要完成與ZooKeeper相關(guān)的工作,包括從ZooKeeper獲取訂閱信息并注冊(cè)相關(guān)Watcher、SubscriptionWorker將Checkpoint更新至ZooKeeper等操作。
SubscriptionWorker又主要包括MessageScanner和MessageSender兩部分,主要完成Scan新消息、發(fā)送消息至Subscriber和更新Checkpoint等操作。
(2)MessageScanner主要完成創(chuàng)建InternalScanner,從Queue Partition中Scan新消息,并將其放入緩沖隊(duì)列中等操作。
當(dāng)緩沖隊(duì)列中沒(méi)有空閑空間時(shí),MessageScanner會(huì)等待直至緩沖隊(duì)列中的消息被MessageSender消費(fèi)掉,騰出剩余空間。
當(dāng)Queue Partition中沒(méi)有新消息時(shí),MessageScanner會(huì)主動(dòng)Sleep,當(dāng)有新消息寫入時(shí),Coprocessor會(huì)通過(guò)SubscriptionWorker喚醒MessageScanner,開始新一輪Scan。
(3)MessageSender主要完成從緩沖隊(duì)列中取出新消息,將其發(fā)送至Subscriber,并等待Subscriber發(fā)回響應(yīng)等操作。當(dāng)緩沖隊(duì)列中沒(méi)有新消息時(shí),MessageSender會(huì)等待直至有新消息到來(lái)。
(4)MessageSender中的CheckpointUpdater會(huì)定時(shí)將當(dāng)前的Checkpoint寫入ZooKeeper中的相關(guān)訂閱節(jié)點(diǎn)中,以便后續(xù)使用。
3.5.4. 訂閱信息層次結(jié)構(gòu)
HQueue相關(guān)訂閱信息保存在ZooKeeper,ZooKeeper中訂閱信息的層次結(jié)構(gòu)如圖(8)所示:
圖(8):訂閱信息層次結(jié)構(gòu)
其中:
(1)訂閱者節(jié)點(diǎn)(subscriber_x)上會(huì)記錄該訂閱者在Queue Partition上的Checkpoint。該Checkpoint由Subscriber在發(fā)起訂閱時(shí)寫入,并由 SubscriptionWorker MessageSender中的CheckpointUpdater來(lái)更新。
(2)訂閱者節(jié)點(diǎn)下會(huì)有兩個(gè)臨時(shí)性節(jié)點(diǎn):address和topics,分別保存訂閱者的IP Address/Hostname:Port和訂閱的主題。當(dāng)訂閱者主動(dòng)取消訂閱時(shí)會(huì)刪除這兩個(gè)臨時(shí)節(jié)點(diǎn),當(dāng)訂閱者意外退出時(shí),等Session失效 后,ZooKeeper會(huì)刪除該臨時(shí)節(jié)點(diǎn)。
3.5.5. 訂閱者Thrift Service
HQueue訂閱功能使用Thrift來(lái)簡(jiǎn)化對(duì)多語(yǔ)言客戶端的支持。Subscriber啟動(dòng)Thrift Server,監(jiān)聽(tīng)指定端口,接收消息,并回調(diào)MessageListeners以便處理消息。用于描述HQueue Subscriber所提供服務(wù)的接口定義如下所示:
namespace java com.etao.hadoop.hbase.queue.thrift.generated
/**
* HQueue MessageID
*/
struct TMessageID {
1: i64 timestamp,
2: i16 sequenceID
}
/**
* HQueue Message
*/
struct TMessage {
1: optional TMessageID id,
2: optional i16 partitionID,
3: binary topic,
4: binary value
}
/**
* HQueue Subscriber Service
*/
service HQueueSubscriberService {
i32 consumeMessages(1:list<TMessage> messages)
}
4. HQueue使用
4.1. HQueue Toolkit
為方便用戶使用,HQueue封裝了HQueue Client API用于存取消息數(shù)據(jù)。自HQueue 0.3版本,HQueue日志運(yùn)維工具集成進(jìn)HQueue Shell中,構(gòu)成HQueue Toolkit,為用戶提供一站式服務(wù),方便用戶管理Queue以及Queue訂閱者。
同HBase Shell使用方式相似,用戶使用$ ${HBASE_HOME}/bin/hqueue shell便可以進(jìn)入HQueue Shell命令行工具。需要注意的是,用戶在使用HQueue Toolkit之前需要確保已經(jīng)部署HQueue Toolkit。
HQueue Toolkit中包括創(chuàng)建Queue、Disable Queue、Enable Queue、刪除Queue和清空Queue等命令。使用示例如下:
(1)創(chuàng)建隊(duì)列
USAGE:create ‘queue_name’, partition_count, ttl, [Configuration Dictionary]
DESCRIPTIONS:
queue_name:待創(chuàng)建的HQueue的名稱,必選參數(shù)。
partition_count:待創(chuàng)建的HQueue的Partition個(gè)數(shù),必選參數(shù)。
ttl:失效時(shí)間,必選參數(shù)。
Configuration Dictonary:可選配置參數(shù)。目前支持的配置參數(shù)為:(1)hbase.hqueue.partitionsPerRegion; (2)hbase.hregion.memstore.flush.size;(3)hbase.hregion.majorcompaction; (4)hbase.hstore.compaction.min;(5)hbase.hstore.compaction.max; (6)hbase.hqueue.compression;(7)hbase.hstore.blockingStoreFiles等。
EXAMPLES:
hqueue> create ‘q1′, 32, 86400
hqueue> create ‘q1′, 32, 86400, {‘hbase.hqueue.partitionsPerRegion’ => ’4′, ‘hbase.hstore.compaction.min’ => ’16′, ‘hbase.hstore.compaction.max’ => ’32′}
(2)清空隊(duì)列
USAGE:truncate_queue 'queue_name' DESCRIPTIONS:
queue_name:待清空的Queue名稱,必選參數(shù)。
EXAMPLES:
hqueue(main):013:0> truncate_queue 'replication_dev_2_test_queue'
需要注意的是:該命令與HBase Shell中的truncate有所不同,該命令僅會(huì)刪除Queue中的數(shù)據(jù),而保留Queue的Presharding信息。 更多操作請(qǐng)參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#Queue.E7.AE.A1.E7.90.86 (3)新增訂閱者 USAGE:add_subscriber 'queue_name', 'subscriber_name' DESCRIPTIONS:
queue_name:隊(duì)列名稱,必選參數(shù)。
subscriber_name:訂閱者名稱,必選參數(shù)。
EXAMPLES:
add_subscriber 'replication_dev_2_test_queue', 'subscriber_1'
(4)刪除訂閱者
USAGE:delete_subscriber 'subscriber_name', 'queue_name' DESCRIPTIONS:
queue_name:訂閱者所訂閱的Queue名稱,必選參數(shù)。
subscriber_name:訂閱者名稱,必選參數(shù)。
EXAMPLES:
hqueue(main):040:0> delete_subscriber 'replication_dev_2_test_queue', 'subscriber_1'
更多信息可以參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#.E8.AE.A2.E9.98.85.E8.80.85.E7.AE.A1.E7.90.86
4.2. Put
HQueue Client API中的Put相關(guān)操作可以完成將用戶消息數(shù)據(jù)寫入HQueue中,Put支持批量操作,具體使用方式示例如下:
HQueue queue = new HQueue(queueName); String topic1 = "crawler"; String value1 = "http://www.360test.com"; // 寫入單條消息數(shù)據(jù),不指定Partition ID。在不指定Partition ID的情況下,將會(huì)在Queue的所有Partitions中隨機(jī)選取一個(gè)。 Message message1 = new Message(Bytes.toBytes(topic1), Bytes.toBytes(value1)); queue.put(message); // 寫入Message時(shí),顯式指定PartitionID。 short partitionID = 10; queue.put(partitionID, message1); List<Message> messages = new ArrayList<Message>(); messages.add(message1); String topic2 = "dump"; String value2 = "http://www.jd.com"; Message message2 = new Message(Bytes.toBytes(topic2), Bytes.toBytes(value2)); messages.add(message2); // 寫入多條消息數(shù)據(jù),不指定Partition ID。 queue.put(messages); // 寫入多條消息數(shù)據(jù),指定Partition ID。 queue.put(partitionID, messages); queue.close();
4.3. Scan
為方便用戶從Queue中Scan消息數(shù)據(jù),HQueue Client API提供了三種自定義Scanner,分別為:QueueScanner、PartitionScanner和CombinedPartitionScanner,使用示例如下:
String queueName = "subscription_queue";
Queue queue = new HQueue(queueName);
// 起始時(shí)間戳
long currentTimestamp = System.currentTimeMillis();
MessageID startMessageID = new MessageID(currentTimestamp - 6000);
MessageID stopMessageID = new MessageID(currentTimestamp);
Scan scan = new Scan(startMessageID, stopMessageID);
// 添加主題
scan.addTopic(Bytes.toBytes("topic1"));
scan.addTopic(Bytes.toBytes("topic2"));
Message message = null;
// 使用QueueScanner,掃描Queue下全部Partitions中的數(shù)據(jù)
QueueScanner queueScanner = queue.getQueueScanner(scan);
while ((message = queueScanner.next()) != null) {
// no-op
}
queueScanner.close();
short partitionID1 = 1;
// 使用PartitionScanner,掃描Queue中指定的Partition的數(shù)據(jù)
PartitionScanner partitionScanner = queue.getPartitionScanner(partitionID1, scan);
while ((message = partitionScanner.next()) != null) {
// no-op
}
partitionScanner.close();
short partitionID2 = 2;
Map<Short, Scan> partitions = new HashMap<Short, Scan>();
// 添加多個(gè)Partitions
partitions.put(partitionID1, scan);
partitions.put(partitionID2, scan);
CombinedPartitionScanner combinedScanner = queue.getCombinedPartitionScanner(partitions);
while ((message = combinedScanner.next()) != null) {
// no-op
}
combinedScanner.close();
queue.close();
4.4. 訂閱消息
HQueue自0.3版本開始提供訂閱功能,使用方式示例如下:
HQueue queue = null;
HQueueSubscriber subscriber = null;
try {
String queueName = "subscription_queue";
queue = new HQueue(queueName);
Set<Pair<Short, MessageID>> partitions = new HashSet<Pair<Short, MessageID>>();
// 添加所訂閱的Partitions
Pair<Short, MessageID> partition1 = new Pair<Short, MessageID>((short)0, null);
partitions.add(partition1);
Pair<Short, MessageID> partition2 = new Pair<Short, MessageID>((short)1, null);
partitions.add(partition2);
Pair<Short, MessageID> partition3 = new Pair<Short, MessageID>((short)2, null);
partitions.add(partition3);
// 添加所訂閱的Topics
Set<String> topics = new HashSet<String>();
topics.add("topic_1");
topics.add("topic_2");
topics.add("topic_3");
// 訂閱者名稱
String subscriberName = "subscriber_1";
Subscription subscription = new Subscription(subscriberName, topics);
subscription.addPartitions(partitions);
// 添加回調(diào)函數(shù)
List<MessageListener> listeners = new LinkedList<MessageListener>();
MessageListener blackHoleListener = new BlackHoleMessageListener(subscriberName);
listeners.add(blackHoleListener);
// 創(chuàng)建訂閱者
subscriber = queue.createSubscriber(subscription, listeners);
subscriber.start();
Thread.sleep(600000L);
subscriber.stop("Time out, request to stop subscriber:" + subscriberName);
} catch (Exception ex) {
LOG.error("Received unexpected exception when testing subscription.", ex);
} finally {
if (queue != null) {
try {
queue.close();
queue=null;
} catch (IOException ex) {
// ignore the exception
}
}
}
4.5. ThriftServer API
HBase自帶的ThriftServer實(shí)現(xiàn)了對(duì)HTable的多語(yǔ)言API支持,HQueue在HBase ThriftServer中擴(kuò)展了對(duì)HQueue的支持,使得C++、Python和PHP等語(yǔ)言也可以方便地訪問(wèn)HQueue。
HQueue目前提供的Thrift API如下所示:
| 1 | ScannerID messageScannerOpen(1:Text queueName,2:i16 partitionID,3:TMessageScan messageScan) | 根據(jù)Scan,打開Queue中某個(gè)Partition上的Scanner |
| 2 | TMessage messageScannerGet(1:ScannerID id) | 逐條獲取Message |
| 3 | list<TMessage> messageScannerGetList(1:ScannerID id,2:i32 nbMessages) | 批量獲取Messages |
| 4 | void messageScannerClose(1:ScannerID id) | 關(guān)閉ScannerID |
| 5 | void putMessage(1:Text queueName,2:TMessage tMessage) | 向Queue中寫入Message,使用隨機(jī)的Partition ID |
| 6 | void putMessages(1:Text queueName,2:list<TMessage> tMessages) | 向Queue中批量寫入Messages,使用隨機(jī)的Partition ID |
| 7 | void putMessageWithPid(1:Text queueName,2:i16 partitionID,3:TMessage tMessage) | 向Queue中寫入Message,使用指定的Partition ID |
| 8 | void putMessagesWithPid(1:Text queueName,2:i16 partitionID,3:list<TMessage> tMessages) | 向Queue中批量寫入Messages,使用指定的Partition ID |
| 9 | list<Text> getQueueLocations(1:Text queueName) | 獲取Queue中所有Partition所在主機(jī)的地址 |
5. 總結(jié)
以上是對(duì)HQueue概念、特性、系統(tǒng)設(shè)計(jì)、處理流程以及應(yīng)用等方面的簡(jiǎn)單闡述,希望對(duì)大家有所幫助。
