十年網站開發(fā)經驗 + 多家企業(yè)客戶 + 靠譜的建站團隊
量身定制 + 運營維護+專業(yè)推廣+無憂售后,網站問題一站解決
這篇文章主要為大家展示了“Flink1.8如何批量Sink到HBase”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Flink1.8如何批量Sink到HBase”這篇文章吧。
專注于為中小企業(yè)提供網站制作、成都做網站服務,電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)日照免費做網站提供優(yōu)質的服務。我們立足成都,凝聚了一批互聯(lián)網行業(yè)人才,有力地推動了近1000家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網站建設實現(xiàn)規(guī)模擴充和轉變。
實現(xiàn)背景:
消費Kafka數(shù)據寫入HBase時,單條處理效率太低。需要批量插入hbase,這里自定義時間窗口countWindowAll 實現(xiàn)100條hbase插入一次Hbase
前面我就不寫了 直接上核心代碼
/*每10秒一個處理窗口*/DataStream> putList = filterData.countWindowAll(Constants.windowCount).apply(new AllWindowFunction
, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable message, Collector > out) throws Exception { List
putList=new ArrayList (); for (String value : message) { String rowKey=value.replace("::","_"); Put put = new Put(Bytes.toBytes(rowKey.toString())); String[] column=value.split("::"); for (int i = 0; i < column.length; i++) { put.addColumn(Bytes.toBytes(Constants.columnFamily),Bytes.toBytes(Constants.columnArray[i]),Bytes.toBytes(column[i])); } putList.add(put); } out.collect(putList); } }).setParallelism(4);
putList.addSink(new HBaseSinkFunction()).setParallelism(1);
這里sink需要繼承Flink的RichSinkFunction接口,實現(xiàn)其中的三個比較重要的函數(shù):
1.open()任務開始只調用一次
2.invoke()每接收一條記錄調用一次,多條記錄調用多次
3.close()任務關閉只調用一次
寫HBase自定義Sink為
HBaseSinkFunction extends RichSinkFunction>{@Overridepublic void open(Configuration parameters) throws Exception { super.open(parameters); HbaseUtils.connectHbase(); TableName table=TableName.valueOf(Constants.tableNameStr); Admin admin = HbaseUtils.connection.getAdmin(); if(!admin.tableExists(table)){ HTableDescriptor tableDescriptor = new HTableDescriptor(Constants.tableNameStr); tableDescriptor.addFamily(new HColumnDescriptor(Constants.columnFamily)); admin.createTable(tableDescriptor); }}@Overridepublic void invoke(List
putList, Context context) throws Exception { Table table=HbaseUtils.connection.getTable(TableName.valueOf(Constants.tableNameStr)); table.put(putList);}@Overridepublic void close() throws Exception { super.close(); HbaseUtils.closeHBaseConnect();}}
以上是“Flink1.8如何批量Sink到HBase”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道!