Jelajahi Sumber

实现canal消费队列

wenzhaoqin 7 tahun lalu
induk
melakukan
ee71cc36ab

+ 10 - 0
pom.xml

@@ -47,6 +47,10 @@
 					<groupId>ch.qos.logback</groupId>
 					<groupId>ch.qos.logback</groupId>
 					<artifactId>logback-classic</artifactId>
 					<artifactId>logback-classic</artifactId>
 				</exclusion>
 				</exclusion>
+				<exclusion>
+					<groupId>org.springframework</groupId>
+					<artifactId>spring</artifactId>
+				</exclusion>
 			</exclusions>
 			</exclusions>
 		</dependency>
 		</dependency>
 		<dependency>
 		<dependency>
@@ -70,5 +74,11 @@
 			<version>3.8.1</version>
 			<version>3.8.1</version>
 			<scope>test</scope>
 			<scope>test</scope>
 		</dependency>
 		</dependency>
+		<dependency>
+        	<groupId>org.springframework</groupId>
+        	<artifactId>spring-context</artifactId>
+        	<version>4.3.18.RELEASE</version>
+        	<scope>runtime</scope>
+    	</dependency>
 	</dependencies>
 	</dependencies>
 </project>
 </project>

+ 4 - 3
src/main/java/com/lewaimai/mysql/elasticsearch/ConsumerLauncher.java

@@ -7,7 +7,8 @@ import javax.xml.bind.JAXBException;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
-import com.lewaimai.mysql.elasticsearch.common.config.ConfigInstance;
+import com.lewaimai.mysql.elasticsearch.common.config.ApplicationConfig;
+import com.lewaimai.mysql.elasticsearch.common.config.ConfigUtils;
 
 
 /**
 /**
  * Canal消费者入口类
  * Canal消费者入口类
@@ -20,10 +21,10 @@ public class ConsumerLauncher
     {
     {
     	String conf = System.getProperty("consumer.conf", "classpath:config.xml");
     	String conf = System.getProperty("consumer.conf", "classpath:config.xml");
     	//初始化配置
     	//初始化配置
-    	ConfigInstance.instance().init(conf);
+    	ApplicationConfig.getContext().getBean(ConfigUtils.class).init(conf);
     	
     	
     	//启动consumer server
     	//启动consumer server
-    	ConsumerServer server = new ConsumerServer();
+    	ConsumerServer server = ApplicationConfig.getContext().getBean(ConsumerServer.class);
     	server.start();
     	server.start();
     }
     }
 }
 }

+ 36 - 7
src/main/java/com/lewaimai/mysql/elasticsearch/ConsumerServer.java

@@ -1,9 +1,12 @@
 package com.lewaimai.mysql.elasticsearch;
 package com.lewaimai.mysql.elasticsearch;
 
 
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.util.List;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnector;
 import com.alibaba.otter.canal.client.CanalConnectors;
 import com.alibaba.otter.canal.client.CanalConnectors;
@@ -11,25 +14,30 @@ import com.alibaba.otter.canal.protocol.Message;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
 import com.lewaimai.mysql.elasticsearch.common.AbstractConsumerLifeCycle;
 import com.lewaimai.mysql.elasticsearch.common.AbstractConsumerLifeCycle;
 import com.lewaimai.mysql.elasticsearch.common.BatchQueue;
 import com.lewaimai.mysql.elasticsearch.common.BatchQueue;
-import com.lewaimai.mysql.elasticsearch.common.ConsumerLifeCycle;
+import com.lewaimai.mysql.elasticsearch.common.config.ApplicationConfig;
 import com.lewaimai.mysql.elasticsearch.common.config.Canal;
 import com.lewaimai.mysql.elasticsearch.common.config.Canal;
 import com.lewaimai.mysql.elasticsearch.common.config.ConfigBean;
 import com.lewaimai.mysql.elasticsearch.common.config.ConfigBean;
-import com.lewaimai.mysql.elasticsearch.common.config.ConfigInstance;
+import com.lewaimai.mysql.elasticsearch.common.config.ConfigUtils;
 
 
 /**
 /**
  * 消费者服务,主要负责从Canal拉取数据变更事件。
  * 消费者服务,主要负责从Canal拉取数据变更事件。
  * @author jogin
  * @author jogin
  *
  *
  */
  */
+@Component
 public class ConsumerServer extends AbstractConsumerLifeCycle {
 public class ConsumerServer extends AbstractConsumerLifeCycle {
 	private static Logger logger = LoggerFactory.getLogger(ConsumerLauncher.class);
 	private static Logger logger = LoggerFactory.getLogger(ConsumerLauncher.class);
+	@Autowired
+	private BatchQueue batchQueue;
+	
 	public ConsumerServer() {
 	public ConsumerServer() {
 	}
 	}
 
 
 	public void start() {
 	public void start() {
 		super.start();
 		super.start();
+		ConfigUtils configUtils = ApplicationConfig.getContext().getBean(ConfigUtils.class);
 		//创建连接
 		//创建连接
-		Canal canalConfig = ConfigInstance.instance().getConfig().getCanal();
+		Canal canalConfig = configUtils.getConfig().getCanal();
     	CanalConnector connector = CanalConnectors
     	CanalConnector connector = CanalConnectors
 				.newSingleConnector(
 				.newSingleConnector(
 						new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),
 						new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),
@@ -37,7 +45,7 @@ public class ConsumerServer extends AbstractConsumerLifeCycle {
 						canalConfig.getUser(), 
 						canalConfig.getUser(), 
 						canalConfig.getPassword());
 						canalConfig.getPassword());
     	try {
     	try {
-    		ConfigBean configBean = ConfigInstance.instance().getConfig();
+    		ConfigBean configBean = configUtils.getConfig();
 			connector.connect();
 			connector.connect();
 			connector.subscribe(configBean.getDbName() + "\\..*");
 			connector.subscribe(configBean.getDbName() + "\\..*");
 			connector.rollback();
 			connector.rollback();
@@ -56,10 +64,18 @@ public class ConsumerServer extends AbstractConsumerLifeCycle {
 				} else {
 				} else {
 					//处理binlog
 					//处理binlog
 					logger.debug("处理消息. ");
 					logger.debug("处理消息. ");
-					
-					BatchQueue.instance().add(batchId, message.getEntries());
+					int batchQueueSize = batchQueue.getSize();
+					while (batchQueueSize == configBean.getBatchQueueSize()) {
+						try {
+							logger.debug("batch队列已满, sleep...");
+							batchAck(connector);
+							Thread.sleep(1000);
+						} catch (InterruptedException e) {
+						}
+					}
+					batchQueue.add(batchId, message.getEntries());
+					batchAck(connector);
 				}
 				}
-//				connector.ack(batchId); // 提交确认
 			}
 			}
 		} catch (CanalClientException e) {
 		} catch (CanalClientException e) {
 			logger.error("Canel server连接不上.");
 			logger.error("Canel server连接不上.");
@@ -69,4 +85,17 @@ public class ConsumerServer extends AbstractConsumerLifeCycle {
 			logger.info("退出Consumer Server.");
 			logger.info("退出Consumer Server.");
 		}
 		}
 	}
 	}
+	
+	/**
+	 * 批量确认Canal消息
+	 * @param connector
+	 */
+	public void batchAck(CanalConnector connector) {
+		List<Long> batchIds = batchQueue.getCompleteBatch();
+		if (batchIds != null) {
+			for (Long batchId : batchIds) {
+				connector.ack(batchId);
+			}
+		}
+	}
 }
 }

+ 30 - 21
src/main/java/com/lewaimai/mysql/elasticsearch/common/BatchQueue.java

@@ -5,6 +5,8 @@ import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.List;
 
 
+import org.springframework.stereotype.Component;
+
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 
 
 /**
 /**
@@ -12,23 +14,16 @@ import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
  * @author jogin
  * @author jogin
  *
  *
  */
  */
+@Component
 public class BatchQueue {
 public class BatchQueue {
 	private LinkedList<Task> queue;
 	private LinkedList<Task> queue;
+	private Object mutex = new Object();
 	
 	
-	private static class SingletonHolder {
-		private static final BatchQueue INSTANCE = new BatchQueue();
-	}
-	
-	private BatchQueue()
+	public BatchQueue()
 	{
 	{
 		queue = new LinkedList<BatchQueue.Task>();
 		queue = new LinkedList<BatchQueue.Task>();
 	}
 	}
 	
 	
-	public static BatchQueue instance()
-	{
-		return SingletonHolder.INSTANCE;
-	}
-	
 	/**
 	/**
 	 * 添加Canal批处理任务
 	 * 添加Canal批处理任务
 	 * @param batchId
 	 * @param batchId
@@ -39,7 +34,9 @@ public class BatchQueue {
 		Task task = new Task();
 		Task task = new Task();
 		task.setBatchId(batchId);
 		task.setBatchId(batchId);
 		task.setEntrys(entries);
 		task.setEntrys(entries);
-		queue.add(task);
+		synchronized (mutex) {
+			queue.add(task);
+		}
 	}
 	}
 	
 	
 	/**
 	/**
@@ -48,10 +45,12 @@ public class BatchQueue {
 	 */
 	 */
 	public void completeSubTask(long batchId)
 	public void completeSubTask(long batchId)
 	{
 	{
-		for (Task task : queue) {
-			if (batchId == task.batchId) {
-				task.completeSubTask();
-				break;
+		synchronized (mutex) {
+			for (Task task : queue) {
+				if (batchId == task.batchId) {
+					task.completeSubTask();
+					break;
+				}
 			}
 			}
 		}
 		}
 	}
 	}
@@ -63,11 +62,13 @@ public class BatchQueue {
 	public List<Long> getCompleteBatch()
 	public List<Long> getCompleteBatch()
 	{
 	{
 		ArrayList<Long> batchIds = new ArrayList<Long>();
 		ArrayList<Long> batchIds = new ArrayList<Long>();
-		for (Task task : queue) {
-			if (task.subTasks != task.completed) {
-				break;
+		synchronized (mutex) {
+			for (Task task : queue) {
+				if (task.subTasks != task.completed || task.subTasks == 0) {
+					break;
+				}
+				batchIds.add(task.batchId);
 			}
 			}
-			batchIds.add(task.batchId);
 		}
 		}
 		return batchIds.size() > 0 ? batchIds : null;
 		return batchIds.size() > 0 ? batchIds : null;
 	}
 	}
@@ -78,7 +79,11 @@ public class BatchQueue {
 	 */
 	 */
 	public int getSize()
 	public int getSize()
 	{
 	{
-		return queue.size();
+		int size;
+		synchronized (mutex) {
+			size = queue.size();
+		}
+		return size;
 	}
 	}
 	
 	
 	/**
 	/**
@@ -87,6 +92,8 @@ public class BatchQueue {
 	public class Task {
 	public class Task {
 		private long batchId;
 		private long batchId;
 		private List<Entry> entrys;
 		private List<Entry> entrys;
+		private Object lock = new Object();
+		
 		//已经完成子任务数
 		//已经完成子任务数
 		private int completed = 0;
 		private int completed = 0;
 		//子任务数
 		//子任务数
@@ -122,7 +129,9 @@ public class BatchQueue {
 		 */
 		 */
 		public void completeSubTask()
 		public void completeSubTask()
 		{
 		{
-			this.subTasks++;
+			synchronized (lock) {
+				this.completed++;
+			}
 		}
 		}
 	}
 	}
 }
 }

+ 24 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/config/ApplicationConfig.java

@@ -0,0 +1,24 @@
+package com.lewaimai.mysql.elasticsearch.common.config;
+
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * bean 配置
+ * @author jogin
+ *
+ */
+@ComponentScan(basePackages = {
+		"com.lewaimai.mysql.elasticsearch", 
+		"com.lewaimai.mysql.elasticsearch.common",
+		"com.lewaimai.mysql.elasticsearch.common.config"})
+public class ApplicationConfig {
+	private static class SingletonHolder {
+		private static final ApplicationContext context = new AnnotationConfigApplicationContext(ApplicationConfig.class);
+	}
+	public static BeanFactory getContext() {
+		return SingletonHolder.context;
+	}
+}

+ 14 - 0
src/main/java/com/lewaimai/mysql/elasticsearch/common/config/ConfigBean.java

@@ -24,6 +24,11 @@ public class ConfigBean {
 	private int batchSize;
 	private int batchSize;
 	
 	
 	/**
 	/**
+	 * canal batch事件队列最大限制
+	 */
+	private int batchQueueSize;
+	
+	/**
 	 * 每个表的同步配置
 	 * 每个表的同步配置
 	 */
 	 */
 	private List<Table> sync;
 	private List<Table> sync;
@@ -63,4 +68,13 @@ public class ConfigBean {
 	public void setBatchSize(int batchSize) {
 	public void setBatchSize(int batchSize) {
 		this.batchSize = batchSize;
 		this.batchSize = batchSize;
 	}
 	}
+
+	@XmlElement(name = "batchQueueSize")
+	public int getBatchQueueSize() {
+		return batchQueueSize;
+	}
+
+	public void setBatchQueueSize(int batchQueueSize) {
+		this.batchQueueSize = batchQueueSize;
+	}
 }
 }

+ 6 - 12
src/main/java/com/lewaimai/mysql/elasticsearch/common/config/ConfigInstance.java

@@ -10,25 +10,19 @@ import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
 import javax.xml.bind.Unmarshaller;
 
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
+import org.springframework.stereotype.Component;
 
 
 /**
 /**
- * 配置实例
+ * 配置工具
  * @author jogin
  * @author jogin
  */
  */
-public class ConfigInstance {
+@Component
+public class ConfigUtils {
 	//配置文件路径
 	//配置文件路径
 	private String configPath;
 	private String configPath;
 	private ConfigBean config;
 	private ConfigBean config;
 	
 	
-	private static class SingletonHolder {
-		private static final ConfigInstance INSTANCE = new ConfigInstance();
-	}
-	
-	public static ConfigInstance instance() {
-		return SingletonHolder.INSTANCE;
-	}
-	
-	private ConfigInstance()
+	public ConfigUtils()
 	{
 	{
 	}
 	}
 	
 	
@@ -59,7 +53,7 @@ public class ConfigInstance {
 		InputStream inputStream;
 		InputStream inputStream;
 		if (configPath.startsWith(classPathPrefix)) {
 		if (configPath.startsWith(classPathPrefix)) {
 			String conf = StringUtils.substringAfter(configPath, classPathPrefix);
 			String conf = StringUtils.substringAfter(configPath, classPathPrefix);
-            inputStream = ConfigInstance.class.getClassLoader().getResourceAsStream(conf);
+            inputStream = ConfigUtils.class.getClassLoader().getResourceAsStream(conf);
         } else {
         } else {
         	inputStream = new FileInputStream(configPath);
         	inputStream = new FileInputStream(configPath);
         }
         }

+ 7 - 0
src/main/resources/spring/bean.xml

@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+</beans>