大橙子网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要为大家展示了“Flink1.8如何批量Sink到HBase”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Flink1.8如何批量Sink到HBase”这篇文章吧。
专注于为中小企业提供网站制作、成都做网站服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业日照免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了近1000家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。
实现背景:
消费Kafka数据写入HBase时,单条处理效率太低。需要批量插入hbase,这里自定义时间窗口countWindowAll 实现100条hbase插入一次Hbase
前面我就不写了 直接上核心代码
/*每10秒一个处理窗口*/DataStream> putList = filterData.countWindowAll(Constants.windowCount).apply(new AllWindowFunction
, GlobalWindow>() { @Override public void apply(GlobalWindow window, Iterable message, Collector > out) throws Exception { List
putList=new ArrayList (); for (String value : message) { String rowKey=value.replace("::","_"); Put put = new Put(Bytes.toBytes(rowKey.toString())); String[] column=value.split("::"); for (int i = 0; i < column.length; i++) { put.addColumn(Bytes.toBytes(Constants.columnFamily),Bytes.toBytes(Constants.columnArray[i]),Bytes.toBytes(column[i])); } putList.add(put); } out.collect(putList); } }).setParallelism(4);
putList.addSink(new HBaseSinkFunction()).setParallelism(1);
这里sink需要继承Flink的RichSinkFunction接口,实现其中的三个比较重要的函数:
1.open()任务开始只调用一次
2.invoke()每接收一条记录调用一次,多条记录调用多次
3.close()任务关闭只调用一次
写HBase自定义Sink为
HBaseSinkFunction extends RichSinkFunction>{@Overridepublic void open(Configuration parameters) throws Exception { super.open(parameters); HbaseUtils.connectHbase(); TableName table=TableName.valueOf(Constants.tableNameStr); Admin admin = HbaseUtils.connection.getAdmin(); if(!admin.tableExists(table)){ HTableDescriptor tableDescriptor = new HTableDescriptor(Constants.tableNameStr); tableDescriptor.addFamily(new HColumnDescriptor(Constants.columnFamily)); admin.createTable(tableDescriptor); }}@Overridepublic void invoke(List
putList, Context context) throws Exception { Table table=HbaseUtils.connection.getTable(TableName.valueOf(Constants.tableNameStr)); table.put(putList);}@Overridepublic void close() throws Exception { super.close(); HbaseUtils.closeHBaseConnect();}}
以上是“Flink1.8如何批量Sink到HBase”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注创新互联行业资讯频道!