流程

  • kafka配置
  • 创建消费者
  • 关注主题ct
  • 获取数据
  • 将数据写入HBase
consumer.properties是kafka集群的配置信息,calllog是数据封装对象。
package com.csw.ct.consumer.bean;

import com.csw.ct.common.bean.Consumer;
import com.csw.ct.common.constant.Names;
import com.csw.ct.consumer.dao.HBaseDao;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

/**
 * 通话日志消费者对象
 */
public class CalllogConsumer implements Consumer {
    /**
     * 消费数据
     */
    public void consume() {

        try {
            //创建配置对象
            Properties prop = new Properties ();
            prop.load (Thread.currentThread ().getContextClassLoader ().getResourceAsStream ( "consumer.properties" ));

            //获取flume采集的数据
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (prop);

            //关注主题
            consumer.subscribe( Arrays.asList( Names.TOPIC.getValue ()));

            //HBase数据访问对象
            HBaseDao dao = new HBaseDao ();
            //初始化
            dao.init();

            //消费数据
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll ( 100 );
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println (consumerRecord.value());
                    //插入数据
                    //dao.insertDate(consumerRecord.value());
                    Calllog log = new Calllog(consumerRecord.value());

                    dao.insertDate (log);
                }
            }
        } catch (Exception e) {
            e.printStackTrace ();
        }

    }


    public void close() throws IOException {

    }
}

写入HBase具体代码
https://www.cnblogs.com/chenshaowei/p/12736522.html

相关文章:

  • 2022-12-23
  • 2021-11-20
  • 2021-12-14
  • 2022-12-23
  • 2021-07-21
  • 2021-09-12
  • 2021-12-21
猜你喜欢
  • 2021-12-17
  • 2022-01-25
  • 2021-09-04
  • 2021-04-06
  • 2021-11-03
  • 2021-04-13
  • 2022-01-04
相关资源
相似解决方案