使用设计模式将Kafka数据传入HBase

需要先启动kafka和HBase:
后台启动Kafka:kafka-server-start.sh -daemon /opt/server.properties
启动HBase:start-hbase
hbase shell
一.创建三个接口
IParseRecord 接口:

public interface IParseRecord {
    public List<Put> parse (ConsumerRecords<String, String> records);
}

IParseRecord 接口:

public interface IWorker {
    public void fillData();
}

IWriter 接口:

public interface IWriter {
    public int write(ConsumerRecords<String, String> records, String tableName) throws IOException;
}

二.创建Worker和Writer类
(1)ParentWorker

package nj.zb.kb09.kafkaToHBaseGJ;
/**
 * @Author Jalrs
 * @Date 2021/1/11
 * @Description
 */

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.Properties;


public abstract class ParentWorker implements IWorker{
    protected Properties prop;
    public ParentWorker( String groupName) throws IOException {
        prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.134.104:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }
}

(2)HBaseWorker

package nj.zb.kb09.kafkaToHBaseGJ;
/**
 * @Author Jalrs
 * @Date 2021/1/11
 * @Description
 */

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;


public class HBaseWorker extends ParentWorker {
    private IWriter writer;
    private String topic;
    private String target;

    public HBaseWorker(IWriter writer, String topic, String targetTable) throws IOException {
        this(writer, "myGroupDefault", topic, targetTable);
    }

    public HBaseWorker(IWriter writer, String groupName, String topic, String targetTable) throws IOException {
        super(groupName);
        this.topic = topic;
        this.writer = writer;
        this.target = targetTable;

    }

    @Override
    public void fillData() {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton(this.topic));

        try {
            while (true) {
                ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
                int rowNum = writer.write(poll, this.target);
                System.out.println("行数:" + rowNum);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(3)HBaseWrite

package nj.zb.kb09.kafkaToHBaseGJ;
/**
 * @Author Jalrs
 * @Date 2021/1/8
 * @Description
 */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;


public class HBaseWrite implements IWriter {
    private Connection connection;
    private IParseRecord parsedRecord;

    public IParseRecord getParsedRecord() {
        return parsedRecord;
    }

    public void setParsedRecord(IParseRecord parsedRecord) {
        this.parsedRecord = parsedRecord;
    }

    public HBaseWrite(IParseRecord parsedRecord) throws IOException {
        this.parsedRecord = parsedRecord;

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", "hdfs://hadoop004:9000/hbase");
        conf.set("hbase.zookeeper.quorum", "hadoop004");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        connection = ConnectionFactory.createConnection(conf);
    }

    @Override
    public int write(ConsumerRecords<String, String> records, String tableName) throws IOException {
        Table userFriendTable =
                connection.getTable(TableName.valueOf(tableName));
        List<Put> datas = parsedRecord.parse(records);

        userFriendTable.put(datas);
        return datas.size();
    }
}

三.根据需求创建对应的Handler
(1)EventAttendHandler :

package nj.zb.kb09.kafkaToHBaseGJ;
/**
 * @Author Jalrs
 * @Date 2021/1/11
 * @Description
 */
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;

public class EventAttendHandler implements IParseRecord{
    @Override
    public List<Put> parse(ConsumerRecords<String, String> records) {
        List<Put> datas = new ArrayList<>();
        for (ConsumerRecord<String, String> p : records) {
            System.out.println(p.value());
            String[] split = p.value().split(",");
            Put put = new Put(Bytes.toBytes((split[0] + split[1] + split[2]).hashCode()));
            put.addColumn("euat".getBytes(), "eventid".getBytes(), split[0].getBytes());
            put.addColumn("euat".getBytes(), "userid".getBytes(), split[1].getBytes());
            put.addColumn("euat".getBytes(), "state".getBytes(), split[2].getBytes());
            datas.add(put);
        }
        return datas;
    }
}

(2)EventsHandler :

package nj.zb.kb09.kafkaToHBaseGJ;
/**
 * @Author Jalrs
 * @Date 2021/1/12
 * @Description
 */

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;

public class EventsHandler implements IParseRecord {
    @Override
    public List<Put> parse(ConsumerRecords<String, String> records) {
        List<Put> datas = new ArrayList<>();
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            String[] split = record.value().split(",");
            Put put = new Put(Bytes.toBytes(split[0].hashCode()));
            put.addColumn("creator".getBytes(), "user_id".getBytes(), split[1].getBytes());
            put.addColumn("schedule".getBytes(), "start_time".getBytes(), split[2].getBytes());
            put.addColumn("location".getBytes(), "city".getBytes(), split[3].getBytes());
            put.addColumn("location".getBytes(), "state".getBytes(), split[4].getBytes());
            put.addColumn("location".getBytes(), "zip".getBytes(), split[5].getBytes());
            put.addColumn("location".getBytes(), "country".getBytes(), split[6].getBytes());
            put.addColumn("location".getBytes(), "lat".getBytes(), split[7].getBytes());
            put.addColumn("location".getBytes(), "lng".getBytes(), split[8].getBytes());
            put.addColumn("remark".getBytes(), "common_words".getBytes(), split[9].getBytes());
            datas.add(put);
        }
        return datas;
    }
}

(3)TrainHandler

package nj.zb.kb09.kafkaToHBaseGJ;/**
 * @Author Jalrs
 * @Date 2021/1/12
 * @Description
 */

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;

public class TrainHandler implements IParseRecord {
    @Override
    public List<Put> parse(ConsumerRecords<String, String> records) {
        List<Put> datas = new ArrayList<>();
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            String[] split = record.value().split(",");
            Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));
            put.addColumn("eu".getBytes(), "user".getBytes(), split[0].getBytes());
            put.addColumn("eu".getBytes(), "event".getBytes(), split[1].getBytes());
            put.addColumn("eu".getBytes(), "invited".getBytes(), split[2].getBytes());
            put.addColumn("eu".getBytes(), "timestamp".getBytes(), split[3].getBytes());
            put.addColumn("eu".getBytes(), "interested".getBytes(), split[4].getBytes());
            put.addColumn("eu".getBytes(), "not_interested".getBytes(), split[5].getBytes());

            datas.add(put);
        }
        return datas;
    }
}

(4)UserFriendHandler

package nj.zb.kb09.kafkaToHBaseGJ;
/**
 * @Author Jalrs
 * @Date 2021/1/11
 * @Description
 */

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;

public class UserFriendHandler implements IParseRecord {
    @Override
    public List<Put> parse(ConsumerRecords<String, String> records) {
        List<Put> datas = new ArrayList<>();

        for (ConsumerRecord<String, String> p : records) {
            System.out.println(p.value());
            String[] split = p.value().split(",");
            Put put = new Put(Bytes.toBytes((split[0] + split[1]).hashCode()));
            put.addColumn("uf".getBytes(), "userid".getBytes(), split[0].getBytes());
            put.addColumn("uf".getBytes(), "friendid".getBytes(), split[1].getBytes());
            datas.add(put);
        }
        return datas;
    }
}

(5)UsersHandler

package nj.zb.kb09.kafkaToHBaseGJ;/**
 * @Author Jalrs
 * @Date 2021/1/12
 * @Description
 */

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.ArrayList;
import java.util.List;


public class UsersHandler implements IParseRecord {
    @Override
    public List<Put> parse(ConsumerRecords<String, String> records) {
        List<Put> datas = new ArrayList<>();
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            String[] split = record.value().split(",");
            if (split[0].trim().length() == 0) {
                continue;
            }
            System.out.println(record);
            Put put = new Put(Bytes.toBytes(split[0].hashCode()));
            put.addColumn("profile".getBytes(), "locale".getBytes(), split[1].getBytes());
            put.addColumn("profile".getBytes(), "birthyear".getBytes(), split[2].getBytes());
            put.addColumn("region".getBytes(), "gender".getBytes(), split[3].getBytes());
            if (split.length > 4) {
                put.addColumn("registration".getBytes(), "joinedAt".getBytes(), split[4].getBytes());
                if (split.length > 5) {
                    put.addColumn("region".getBytes(), "location".getBytes(), split[5].getBytes());
                    if (split.length > 6) {
                        put.addColumn("region".getBytes(), "timezone".getBytes(), split[6].getBytes());
                    }
                }
            }
            datas.add(put);
        }
        return datas;
    }
}

四.创建Driver类

package nj.zb.kb09.kafkaToHBaseGJ;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

/**
 * @Author Jalrs
 * @Date 2021/1/7
 * @Description 将Kafka  user_friends中的数据消费到HBase events_db:user_friend中
 */

public class Driver {
    public static void main(String[] args) throws IOException {

        IParseRecord record = new UserFriendHandler();
        IWriter writer = new HBaseWrite(record);
//        IWorker worker = new HBaseWorker(writer,
//                "userFriend2",
//                "user_friends",
//                "events_db:user_friend");
//        worker.fillData();

//        new HBaseWorker(new HBaseWrite(new EventAttendHandler()),
//                "event_attendees",
//                "events_db:event_attendee")
//                .fillData();

//        new HBaseWorker(new HBaseWrite(new UsersHandler()),
//                "users1",
//                "users",
//                "events_db:users")
//                .fillData();

        new HBaseWorker(new HBaseWrite(new EventsHandler()),
                "events1",
                "events",
                "events_db:events")
                .fillData();

//        new HBaseWorker(new HBaseWrite(new TrainHandler()),
//                "train1",
//                "train",
//                "events_db:train")
//                .fillData();

    }
}
上一篇:Spark连接MySQL,Hive,Hbase


下一篇:中文字符的字节数