应用zookeeper动态修改flume接受数据格式

项目位置:https://github.com/wty19920422/mydesign

按照需求可以自定制代码,下面展示部分简单展示流程

应用flume接受tcp数据的过程中,为了产品需要有时需要动态修改数据格式。例如增加公司信息、数据监管人员信息以及其他自定制格式等,数据接受实时性很高,格式变化需要及时响应。为了满足数据变化的实时性可以通过多台flume服务器同时监控zookeeper中对应znode变化情况,当zookeeper中znode发生变化,flume接受数据格式及时发生变化。

应用zookeeper动态修改flume接受数据格式

1、架构简介

-- 多台服务器上配置flume用于接受数据

-- flume source应用原生syslog接受tcp数据

-- flume sink通过自定制输出类,目的为了兼容多目的地写入以及可以及时响应zookeeper配置变化

-- zookeeper通过监听/storedata znode的变化来实时改编数据输出格式

-- 存储目标为多目标,根据配置可以存储到hdfs、file、kafka、mongo、redis、mysql等,能够达到目标写入

2、实现简介

1)flume source

配置部分

a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 6666
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.channels = c1

2)flume sink

自定义sink,当然代码看不看都无所谓

a1.sinks.k1.type = cn.zkjt.data.transmit.flume_ng_zk_datatransmit.ZKDataTransmitSink
a1.sinks.k1.sendtype = logger or file or hdfs or kafka or mongo or redis
a1.sinks.k1.zookeeperlist = 0.0.0.0
a1.sinks.k1.zookeeperpath = /zkdata
a1.sinks.k1.channel = c1

# file part
a1.sinks.k1.filewritepath = /data/filewrite

# hdfs part
a1.sinks.k1.hdfslist = hdfs://192.168.20.18:9000
a1.sinks.k1.hdfsuser = hadoop
a1.sinks.k1.hdfswritepath = /writedata

# kafka part
a1.sinks.k1.kafkabrokerlist = 192.168.10.7:9092
a1.sinks.k1.kafkatopic = zktest
a1.sinks.k1.partionnum = 2

# mongo part
a1.sinks.k1.mongohost = 192.168.20.35
a1.sinks.k1.mongoport = 27017
a1.sinks.k1.mongouser = flume
a1.sinks.k1.mongopassword = CskAMpk=
a1.sinks.k1.mongodb = zzzsj
a1.sinks.k1.mongocollection = test

# redis part
a1.sinks.k1.redishost = 192.168.20.20
a1.sinks.k1.redisport = 6379
a1.sinks.k1.redispassword = 123456
a1.sinks.k1.redisdb = 7

 

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;

import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.process.ConstantData;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.process.ZKThread;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.writer.DataWriter;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.writer.DataWriterFactory;


public class ZKDataTransmitSink extends AbstractSink implements Configurable {

	private static final Logger logger = LoggerFactory.getLogger(ZKDataTransmitSink.class);

	private int batchSize;
	private SinkCounter sinkCounter;
	
	private String type;
	private DataWriter dataWriter;
	private Context context;
	
	private Thread zkThread;

	public Status process() throws EventDeliveryException {
		Status status = Status.READY;
		Channel channel = getChannel();
		Transaction transaction = channel.getTransaction();

		try {
			transaction.begin();
			long count = 0;
			for (count = 0; count < batchSize; ++count) {
				Event event = channel.take();
				if (event == null) {
					break;
				}
				String commonData = ConstantData.commonData;
				logger.info("common data : {}", commonData);
				dataWriter.write(event, commonData);		
			}
			if (count <= 0) {
				sinkCounter.incrementBatchEmptyCount();
				status = Status.BACKOFF;
			} else {
				if (count < batchSize) {
					sinkCounter.incrementBatchUnderflowCount();
					status = Status.BACKOFF;
				} else {
					sinkCounter.incrementBatchCompleteCount();
				}

				sinkCounter.addToEventDrainAttemptCount(count);
			}

			transaction.commit();
			sinkCounter.addToEventDrainSuccessCount(count);

		} catch (Throwable t) {
			try {
				transaction.rollback();
			} catch (Exception e) {
				logger.error("Exception during transaction rollback.", e);
			}

			logger.error("Failed to commit transaction. Transaction rolled back.", t);
			if (t instanceof Error || t instanceof RuntimeException) {
				Throwables.propagate(t);
			} else {
				throw new EventDeliveryException("Failed to commit transaction. Transaction rolled back.", t);
			}
		} finally {
			if (transaction != null) {
				transaction.close();
			}
		}

		return status;
	}

	@Override
	public synchronized void start() {	
		logger.info("Starting sink");
		
		dataWriter = new DataWriterFactory().createWriter(type);
		dataWriter.setContext(context);
		dataWriter.start();
		
		zkThread = new Thread(new ZKThread(context));
		zkThread.start();
		
		
		sinkCounter.start();
		try {
			sinkCounter.incrementConnectionCreatedCount();
		} catch (Exception e) {
			logger.error("Exception while connecting to Redis", e);
			sinkCounter.incrementConnectionFailedCount();
		}

		super.start();
		logger.info("Sink started");
	}

	@Override
	public synchronized void stop() {
		logger.info("Stopping sink");
		dataWriter.stop();
		sinkCounter.incrementConnectionClosedCount();
		sinkCounter.stop();
		super.stop();
		logger.info("Sink stopped");
	}

	public void configure(Context context) {
		this.context = context;
		type = context.getString(ZKDataTransmitSinkConstants.SENDTYPE, ZKDataTransmitSinkConstants.DEFAULT_SENDTYPE);		
		batchSize = context.getInteger(ZKDataTransmitSinkConstants.BATCH_SIZE, ZKDataTransmitSinkConstants.DEFAULT_BATCH_SIZE);
		if (sinkCounter == null) {
			sinkCounter = new SinkCounter(getName());
		}
	}

}

生成对应的writer

import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.process.TypeMap;

public class DataWriterFactory implements DataWriterFactoryInterface{
	
	private String classPath;
	
	public DataWriterFactory() {
//		String simpleName = getClass().getSimpleName();
//		String totalName = getClass().getName();
		String packageName = getClass().getPackage().getName();
		classPath = packageName.replace("writer", "datawriter");
//		classPath = totalName.replace(simpleName, "");		
	}
	
	public DataWriter createWriter(String typeName) {
		DataWriter dataWriter = null;
		TypeMap typeMap = new TypeMap();
		String className = typeMap.getClassName(typeName);
		String factoryClassName = classPath + "." + typeName + "." + className;
		try {
			Class<?> clazz = Class.forName(factoryClassName);
			dataWriter = (DataWriter) clazz.newInstance();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return dataWriter;
	}
}

kafka writer

import java.util.Properties;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.configure.kafka.KafkaWriteConstants;
import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.writer.DataWriter;

public class KafkaDataWriter implements DataWriter {

	private String brokerList;
	private String topic;
	private int partitionNum;

	private KafkaProducer<String, byte[]> producer;

	private ProducerRecord<String, byte[]> record;
	
	public void write(Event event, String flag) {
		
		int code = Math.abs(new String(event.getBody()).hashCode());
		int partitionID = code % partitionNum;
				
		record = new ProducerRecord<String, byte[]>(topic, partitionID, "data",
				event.getBody());
		producer.send(record);
	}

	public void setContext(Context context) {
		brokerList = context.getString(KafkaWriteConstants.KAFKA_BROKERLIST);
		topic = context.getString(KafkaWriteConstants.KAFKA_TOPIC);
		partitionNum = context.getInteger(KafkaWriteConstants.KAFKA_PARTITION_NUM, KafkaWriteConstants.DEFAULT_KAFKA_PARTITION_NUM);
	}

	public void start() {	
		Properties properties=new Properties();
       // properties.put("bootstrap.servers",brokerList);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
         properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getClass());
        properties.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
//        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getClass());
        //properties.put("client.id","producer.client.id.demo");//指定客户端ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");//指定客户端ID
        
        producer = new KafkaProducer<String, byte[]>(properties);
        
	}

	public void stop() {
		// TODO Auto-generated method stub
		producer.close();
	}

}

3、zookeeper部分

import java.io.IOException;
import org.apache.flume.Context;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.zkjt.data.transmit.flume_ng_zk_datatransmit.ZKDataTransmitSinkConstants;

import org.apache.zookeeper.ZooDefs.Ids;

public class ZKThread implements Runnable{

	private String connectString;
	private int sessionTimeout = 2000;
	private String storePath;
	private ZooKeeper zk;
	
	private Logger logger = LoggerFactory.getLogger(getClass());
	
	public ZKThread(Context context) {
		connectString = context.getString(ZKDataTransmitSinkConstants.ZOOKEEPER_LIST);
		storePath = context.getString(ZKDataTransmitSinkConstants.ZOOKEEPER_PATH, ZKDataTransmitSinkConstants.DEFAULT_ZOOKEEPER_PATH);
	}
	
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		
		try {
			zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
				public void process(WatchedEvent event) {
					try {
						logger.info("path:{}, list:{}", storePath, connectString);
						Stat exists = zk.exists(storePath, false);
						if(exists == null) {
							zk.create(storePath, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						}
						byte[] data = zk.getData(storePath, true, null);
						ConstantData.commonData = new String(data);
					}catch (Exception e) {
						e.printStackTrace();
					}
				}	
			});
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
					
	}

}

4、存储部分

应用zookeeper动态修改flume接受数据格式

5、数据展示

启动flume

bin/flume-ng agent -c conf/ -f conf/zk.conf -n a1 -Dflume.root.logger=INFO,console

 启动一个netcat向flume中发送数据

nc 0.0.0.0 6666

应用zookeeper动态修改flume接受数据格式

 

应用zookeeper动态修改flume接受数据格式

此时我们修改zookeeper中配置

应用zookeeper动态修改flume接受数据格式

再次查看一次

应用zookeeper动态修改flume接受数据格式

 

上一篇:flume exec source实时监控单个文件


下一篇:SpringApplication到底run了什么(上)