wenzhaoqin 7 anni fa
parent
commit
3dd9290f01

+ 3 - 0
.gitignore

@@ -23,3 +23,6 @@ buildNumber.properties
 # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
 hs_err_pid*
 
+.settings
+.classpath
+.project

+ 74 - 0
pom.xml

@@ -0,0 +1,74 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<groupId>com.lewaimai</groupId>
+	<artifactId>mysql-elasticsearch</artifactId>
+	<version>1.0.0</version>
+	<packaging>jar</packaging>
+
+	<name>mysql-elasticsearch</name>
+	<url>http://maven.apache.org</url>
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+	</properties>
+
+	<dependencies>
+		<!-- http://mvnrepository.com/artifact/com.sun.xml.bind/jaxb-core -->
+		<dependency>
+			<groupId>com.sun.xml.bind</groupId>
+			<artifactId>jaxb-core</artifactId>
+			<version>2.2.11</version>
+		</dependency>
+		<!-- http://mvnrepository.com/artifact/javax.xml/jaxb-api -->
+		<dependency>
+			<groupId>javax.xml</groupId>
+			<artifactId>jaxb-api</artifactId>
+			<version>2.1</version>
+		</dependency>
+		<!-- http://mvnrepository.com/artifact/com.sun.xml.bind/jaxb-impl -->
+		<dependency>
+			<groupId>com.sun.xml.bind</groupId>
+			<artifactId>jaxb-impl</artifactId>
+			<version>2.2.11</version>
+		</dependency>
+		<dependency>
+			<groupId>com.alibaba.otter</groupId>
+			<artifactId>canal.client</artifactId>
+			<version>1.0.24</version>
+			<exclusions>
+				<exclusion>
+					<groupId>ch.qos.logback</groupId>
+					<artifactId>logback-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>ch.qos.logback</groupId>
+					<artifactId>logback-classic</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>log4j</groupId>
+			<artifactId>log4j</artifactId>
+			<version>1.2.17</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+			<version>1.7.21</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>1.7.21</version>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>3.8.1</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

+ 29 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/ConsumerLauncher.java

@@ -0,0 +1,29 @@
+package com.lewaimai.mysql.elasticsearch;
+
+import java.io.FileNotFoundException;
+
+import javax.xml.bind.JAXBException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.lewaimai.mysql.elasticsearch.common.config.ConfigInstance;
+
+/**
+ * Canal消费者入口类
+ *
+ */
+public class ConsumerLauncher 
+{
+	private static Logger logger = LoggerFactory.getLogger(ConsumerLauncher.class);
+    public static void main( String[] args ) throws FileNotFoundException, JAXBException
+    {
+    	String conf = System.getProperty("consumer.conf", "classpath:config.xml");
+    	//初始化配置
+    	ConfigInstance.instance().init(conf);
+    	
+    	//启动consumer server
+    	ConsumerServer server = new ConsumerServer();
+    	server.start();
+    }
+}

+ 72 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/ConsumerServer.java

@@ -0,0 +1,72 @@
+package com.lewaimai.mysql.elasticsearch;
+
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.client.CanalConnectors;
+import com.alibaba.otter.canal.protocol.Message;
+import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.lewaimai.mysql.elasticsearch.common.AbstractConsumerLifeCycle;
+import com.lewaimai.mysql.elasticsearch.common.BatchQueue;
+import com.lewaimai.mysql.elasticsearch.common.ConsumerLifeCycle;
+import com.lewaimai.mysql.elasticsearch.common.config.Canal;
+import com.lewaimai.mysql.elasticsearch.common.config.ConfigBean;
+import com.lewaimai.mysql.elasticsearch.common.config.ConfigInstance;
+
+/**
+ * 消费者服务,主要负责从Canal拉取数据变更事件。
+ * @author jogin
+ *
+ */
+public class ConsumerServer extends AbstractConsumerLifeCycle {
+	private static Logger logger = LoggerFactory.getLogger(ConsumerLauncher.class);
+	public ConsumerServer() {
+	}
+
+	public void start() {
+		super.start();
+		//创建连接
+		Canal canalConfig = ConfigInstance.instance().getConfig().getCanal();
+    	CanalConnector connector = CanalConnectors
+				.newSingleConnector(
+						new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),
+						canalConfig.getDestination(), 
+						canalConfig.getUser(), 
+						canalConfig.getPassword());
+    	try {
+    		ConfigBean configBean = ConfigInstance.instance().getConfig();
+			connector.connect();
+			connector.subscribe(configBean.getDbName() + "\\..*");
+			connector.rollback();
+			logger.info("开始监听消息.");
+			int batchSize = configBean.getBatchSize();
+			while (isRunning()) {
+				Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
+				long batchId = message.getId();
+				int size = message.getEntries().size();
+				if (batchId == -1 || size == 0) {
+					try {
+						logger.debug("没有消息.");
+						Thread.sleep(1000);
+					} catch (InterruptedException e) {
+					}
+				} else {
+					//处理binlog
+					logger.debug("处理消息. ");
+					
+					BatchQueue.instance().add(batchId, message.getEntries());
+				}
+//				connector.ack(batchId); // 提交确认
+			}
+		} catch (CanalClientException e) {
+			logger.error("Canel server连接不上.");
+		}
+    	finally {
+			connector.disconnect();
+			logger.info("退出Consumer Server.");
+		}
+	}
+}

+ 21 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/AbstractConsumerLifeCycle.java

@@ -0,0 +1,21 @@
+package com.lewaimai.mysql.elasticsearch.common;
+
+public class AbstractConsumerLifeCycle implements ConsumerLifeCycle {
+	private boolean isRunning = false;
+	
+	public void start() {
+		isRunning = true;
+	}
+
+	public void stop() {
+		isRunning = false;
+	}
+	
+	/**
+	 * 是否正在运行
+	 * @return
+	 */
+	public boolean isRunning() {
+		return isRunning;
+	}
+}

+ 128 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/BatchQueue.java

@@ -0,0 +1,128 @@
+package com.lewaimai.mysql.elasticsearch.common;
+
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
+
+/**
+ * Canal批处理任务
+ * @author jogin
+ *
+ */
+public class BatchQueue {
+	private LinkedList<Task> queue;
+	
+	private static class SingletonHolder {
+		private static final BatchQueue INSTANCE = new BatchQueue();
+	}
+	
+	private BatchQueue()
+	{
+		queue = new LinkedList<BatchQueue.Task>();
+	}
+	
+	public static BatchQueue instance()
+	{
+		return SingletonHolder.INSTANCE;
+	}
+	
+	/**
+	 * 添加Canal批处理任务
+	 * @param batchId
+	 * @param entries
+	 */
+	public void add(long batchId, List<Entry> entries)
+	{
+		Task task = new Task();
+		task.setBatchId(batchId);
+		task.setEntrys(entries);
+		queue.add(task);
+	}
+	
+	/**
+	 * 完成子任务
+	 * @param batchId
+	 */
+	public void completeSubTask(long batchId)
+	{
+		for (Task task : queue) {
+			if (batchId == task.batchId) {
+				task.completeSubTask();
+				break;
+			}
+		}
+	}
+	
+	/**
+	 * 按顺序返回已经完成的批处理id, 不能跳跃,必须按入队顺序
+	 * @return
+	 */
+	public List<Long> getCompleteBatch()
+	{
+		ArrayList<Long> batchIds = new ArrayList<Long>();
+		for (Task task : queue) {
+			if (task.subTasks != task.completed) {
+				break;
+			}
+			batchIds.add(task.batchId);
+		}
+		return batchIds.size() > 0 ? batchIds : null;
+	}
+	
+	/**
+	 * 获取队列大小
+	 * @return
+	 */
+	public int getSize()
+	{
+		return queue.size();
+	}
+	
+	/**
+	 * 任务
+	 */
+	public class Task {
+		private long batchId;
+		private List<Entry> entrys;
+		//已经完成子任务数
+		private int completed = 0;
+		//子任务数
+		private int subTasks = 0;
+		
+		public long getBatchId() {
+			return batchId;
+		}
+		public void setBatchId(long batchId) {
+			this.batchId = batchId;
+		}
+		public List<Entry> getEntrys() {
+			return entrys;
+		}
+		public void setEntrys(List<Entry> entrys) {
+			this.entrys = entrys;
+		}
+		public int getCompleted() {
+			return completed;
+		}
+		public void setCompleted(int completed) {
+			this.completed = completed;
+		}
+		public int getSubTasks() {
+			return subTasks;
+		}
+		public void setSubTasks(int subTasks) {
+			this.subTasks = subTasks;
+		}
+		
+		/**
+		 * 完成子任务
+		 */
+		public void completeSubTask()
+		{
+			this.subTasks++;
+		}
+	}
+}

+ 18 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/ConsumerLifeCycle.java

@@ -0,0 +1,18 @@
+package com.lewaimai.mysql.elasticsearch.common;
+
+/**
+ * 消费者生命周期接口
+ * @author jogin
+ *
+ */
+public interface ConsumerLifeCycle {
+	/**
+	 * 启动
+	 */
+	void start();
+	
+	/**
+	 * 关闭
+	 */
+	void stop();
+}

+ 55 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/config/Canal.java

@@ -0,0 +1,55 @@
+package com.lewaimai.mysql.elasticsearch.common.config;
+
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ * canal server配置
+ * @author jogin
+ *
+ */
+public class Canal {
+	private String ip;
+	private int port;
+	private String user;
+	private String password;
+	private String destination;
+	
+	@XmlElement(name = "ip")
+	public String getIp() {
+		return ip;
+	}
+	public void setIp(String ip) {
+		this.ip = ip;
+	}
+	
+	@XmlElement(name = "port")
+	public int getPort() {
+		return port;
+	}
+	public void setPort(int port) {
+		this.port = port;
+	}
+	
+	@XmlElement(name = "user")
+	public String getUser() {
+		return user;
+	}
+	public void setUser(String user) {
+		this.user = user;
+	}
+	
+	@XmlElement(name = "password")
+	public String getPassword() {
+		return password;
+	}
+	public void setPassword(String password) {
+		this.password = password;
+	}
+	@XmlElement(name = "destination")
+	public String getDestination() {
+		return destination;
+	}
+	public void setDestination(String destination) {
+		this.destination = destination;
+	}
+}

+ 66 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/config/ConfigBean.java

@@ -0,0 +1,66 @@
+package com.lewaimai.mysql.elasticsearch.common.config;
+
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+/**
+ * consumer配置
+ * @author jogin
+ */
+@XmlRootElement (name = "config")
+//@XmlAccessorType (XmlAccessType.FIELD)
+public class ConfigBean {
+	private Canal canal;
+	
+	/**
+	 * 数据库名
+	 */
+	private String dbName;
+	
+	/**
+	 * 每次获取canal事件大小
+	 */
+	private int batchSize;
+	
+	/**
+	 * 每个表的同步配置
+	 */
+	private List<Table> sync;
+
+	@XmlElement(name = "canal")
+	public Canal getCanal() {
+		return canal;
+	}
+
+	public void setCanal(Canal canal) {
+		this.canal = canal;
+	}
+
+	@XmlElement(name = "dbName")
+	public String getDbName() {
+		return dbName;
+	}
+
+	public void setDbName(String dbName) {
+		this.dbName = dbName;
+	}
+
+	@XmlElement(name = "sync")
+	public List<Table> getSync() {
+		return sync;
+	}
+
+	public void setSync(List<Table> sync) {
+		this.sync = sync;
+	}
+
+	@XmlElement(name = "batchSize")
+	public int getBatchSize() {
+		return batchSize;
+	}
+
+	public void setBatchSize(int batchSize) {
+		this.batchSize = batchSize;
+	}
+}

+ 70 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/config/ConfigInstance.java

@@ -0,0 +1,70 @@
+package com.lewaimai.mysql.elasticsearch.common.config;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.io.StringReader;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * 配置实例
+ * @author jogin
+ */
+public class ConfigInstance {
+	//配置文件路径
+	private String configPath;
+	private ConfigBean config;
+	
+	private static class SingletonHolder {
+		private static final ConfigInstance INSTANCE = new ConfigInstance();
+	}
+	
+	public static ConfigInstance instance() {
+		return SingletonHolder.INSTANCE;
+	}
+	
+	private ConfigInstance()
+	{
+	}
+	
+	public ConfigBean getConfig() {
+		return config;
+	}
+	/**
+	 * 初始化配置
+	 * @param path
+	 * @throws JAXBException 
+	 * @throws FileNotFoundException 
+	 */
+	public void init(String path) throws FileNotFoundException, JAXBException
+	{
+		configPath = path;
+		reload();
+	}
+	
+	/**
+	 * 重新加载配置
+	 * @throws JAXBException 
+	 * @throws FileNotFoundException 
+	 */
+	public void reload() throws JAXBException, FileNotFoundException
+	{
+		
+		String classPathPrefix = "classpath:";
+		InputStream inputStream;
+		if (configPath.startsWith(classPathPrefix)) {
+			String conf = StringUtils.substringAfter(configPath, classPathPrefix);
+            inputStream = ConfigInstance.class.getClassLoader().getResourceAsStream(conf);
+        } else {
+        	inputStream = new FileInputStream(configPath);
+        }
+		JAXBContext context = JAXBContext.newInstance(ConfigBean.class);
+		Unmarshaller unmarshaller = context.createUnmarshaller();
+		config = (ConfigBean) unmarshaller.unmarshal(inputStream);
+	}
+}

+ 60 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/config/Table.java

@@ -0,0 +1,60 @@
+package com.lewaimai.mysql.elasticsearch.common.config;
+
+import javax.xml.bind.annotation.XmlElement;
+
+/**
+ *	针对mysql表的同步配置 
+ * @author jogin
+ */
+public class Table {
+	private String name;
+	private String fields;
+	private String key;
+	private String index;
+	private String type;
+	private String consumer;
+	
+	@XmlElement(name = "name")
+	public String getName() {
+		return name;
+	}
+	public void setName(String name) {
+		this.name = name;
+	}
+	@XmlElement(name = "fields")
+	public String getFields() {
+		return fields;
+	}
+	public void setFields(String fields) {
+		this.fields = fields;
+	}
+	@XmlElement(name = "key")
+	public String getKey() {
+		return key;
+	}
+	public void setKey(String key) {
+		this.key = key;
+	}
+	@XmlElement(name = "index")
+	public String getIndex() {
+		return index;
+	}
+	public void setIndex(String index) {
+		this.index = index;
+	}
+	@XmlElement(name = "type")
+	public String getType() {
+		return type;
+	}
+	public void setType(String type) {
+		this.type = type;
+	}
+	
+	@XmlElement(name = "consumer")
+	public String getConsumer() {
+		return consumer;
+	}
+	public void setConsumer(String consumer) {
+		this.consumer = consumer;
+	}
+}

+ 27 - 0
src/main/resources/config.xml

@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<config>
+	<!-- canal server 配置 -->
+	<canal>
+		<ip>127.0.0.1</ip>
+		<port>11111</port>
+		<user></user>
+		<password></password>
+		<destination>example</destination>
+	</canal>
+	<!-- 监听数据库名 -->
+	<dbName>weixin</dbName>
+	<!-- 每次获取多少数据变更事件 -->
+	<batchSize>256</batchSize>
+	<batchQueueSize>10</batchQueueSize>
+	<!-- 表同步配置 -->
+	<sync>
+		<table>
+			<name>wx_config</name>
+			<fields>id,admin_id,init_date,shopname</fields>
+			<key>id</key>
+			<index>shop</index>
+			<type>shop</type>
+			<consumer>com.lewaimai.mysql.elasticsearch.consumer.DefaultConsumer</consumer>
+		</table>
+	</sync>
+</config>

+ 9 - 0
src/main/resources/log4j.properties

@@ -0,0 +1,9 @@
+log4j.rootLogger=DEBUG,console
+log4j.additivity.org.apache=true
+#console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=DEBUG
+log4j.appender.console.ImmediateFlush=true
+log4j.appender.console.Target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} [%p] %m%n

+ 38 - 0
src/test/java/com/lewaimai/mysql/elasticsearch/AppTest.java

@@ -0,0 +1,38 @@
+package com.lewaimai.mysql.elasticsearch;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest 
+    extends TestCase
+{
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public AppTest( String testName )
+    {
+        super( testName );
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite()
+    {
+        return new TestSuite( AppTest.class );
+    }
+
+    /**
+     * Rigourous Test :-)
+     */
+    public void testApp()
+    {
+        assertTrue( true );
+    }
+}