程序那些事

逆水行舟,不进则退。

基于数据库配置rocketmq

2019.04.08 | 73阅读 | 0条评论 | java

一、创建数据表

CREATE TABLE `rkmq_consumer_config` (
  `consumer_id` varchar(64) NOT NULL,
  `consumer_name` varchar(128) NOT NULL,
  `comsumer_desc` varchar(256) DEFAULT NULL,
  `access_key` varchar(128) DEFAULT NULL,
  `secret_key` varchar(128) DEFAULT NULL,
  `ons_addr` varchar(256) DEFAULT NULL,
  `enabled` varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',
  `create_date` datetime NOT NULL,
  `update_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`consumer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消费者配置';

CREATE TABLE `rkmq_listener` (
  `listener_id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `listener_name` varchar(128) NOT NULL,
  `listener_desc` varchar(256) DEFAULT NULL,
  `consumer_id` varchar(64) DEFAULT NULL COMMENT 'consumer配置id',
  `topic` varchar(32) NOT NULL,
  `tag` varchar(32) NOT NULL,
  `listener_class_name` varchar(256) NOT NULL COMMENT '消费者处理类',
  `create_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
  `update_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
  `enabled` varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',
  `data_source` varchar(256) DEFAULT NULL,
  PRIMARY KEY (`listener_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='消费者监听配置';

CREATE TABLE `rkmq_producer_config` (
  `producer_id` varchar(64) NOT NULL,
  `producer_name` varchar(128) NOT NULL,
  `producer_desc` varchar(256) DEFAULT NULL,
  `access_key` varchar(128) DEFAULT NULL,
  `secret_key` varchar(128) DEFAULT NULL,
  `ons_addr` varchar(256) DEFAULT NULL,
  `enabled` varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',
  `create_date` datetime NOT NULL,
  `update_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`producer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='生产者配置';

二、创建管理接口

package com.sgcc.mq;

import javax.sql.DataSource;

public interface Manager {
	
	/**
	 * 启动所有
	 */
	public void start();
	
	/**
	 * 关闭所有
	 */
	public void shutdown();
	
	/**
	 * 开启某个
	 * @param id
	 */
	public void start(String id);
	
	/**
	 * 关闭某个
	 * @param id
	 */
	public void shutdown(String id);
	
	/**
	 * 重启某个
	 * @param id
	 */
	public void restart(String id);
	
	/**
	 * 启用某个
	 * @param id
	 */
	public void enabled(String id);
	
	/**
	 * 禁用某个
	 * @param id
	 */
	public void disabled(String id);
	
	/**
	 * 是否开启
	 * @param id
	 */
	public boolean isStart(String id);
	
	/**
	 * 设置数据源
	 * @param dataSource
	 */
	public void setDataSource(DataSource dataSource);
}

三、创建Consumer和Producer的管理类

package com.sgcc.mq;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.core.JdbcTemplate;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.sgcc.exception.ConsumerException;

/**
 * 
 * 
 * <p>
 * Title: ConsumerManager
 * </p>
 * 
 * <p>
 * Description: ConsumerBean管理器,从数据库中读取配置并实例化ConsumerBean,一个或多个
 * </p>
 * 
 * @author mengjinyuan
 * 
 * @date 2019年3月14日
 */
public class ConsumerManager implements ApplicationContextAware, InitializingBean,Manager {

	private ConsumerBean consumerBean;

	private static Map<String, ConsumerBean> consumerBeans = new HashMap<>();

	private ApplicationContext applicationContext;

	private JdbcTemplate jdbcTemplate;

	private final static String CID_DEFAULT = "CID_DEFAULT";

	private static Logger logger = LoggerFactory.getLogger(ConsumerManager.class);

	public void setConsumerBean(ConsumerBean consumerBean) {
		this.consumerBean = consumerBean;
	}

	@Override
	public void start() {
		if (consumerBeans.size() > 0) {
			logger.info("开启所有监听,consumerBean个数:" + consumerBeans.size());
			consumerBeans.forEach((consumerId, consumer) -> {
				if (!consumer.getSubscriptionTable().isEmpty()) {
					start(consumerId);
				}
			});
		}
	}
	@Override
	public void shutdown() {
		if (!consumerBeans.isEmpty()) {
			logger.info("关闭所有监听");
			consumerBeans.forEach((consumerId, consumer) -> {
				shutdown(consumerId);
			});
		}
	}

	/**
	 * 启动某个消费者
	 * 
	 * @param consumerId
	 */
	@Override
	public void start(String consumerId) {
		ConsumerBean consumer = consumerBeans.get(consumerId);
		logger.info("开启consumer,consumerId=" + consumerId);
		if (consumer != null) {
			consumer.start();
		} else {
			logger.warn("consumer不存在,开启失败!,consumerId=" + consumerId);
		}
	}

	/**
	 * 关闭某个消费者
	 * 
	 * @param consumerId
	 */
	@Override
	public void shutdown(String consumerId) {
		ConsumerBean consumer = consumerBeans.get(consumerId);
		logger.info("关闭consumer,consumerId=" + consumerId);
		if (consumer != null) {
			consumer.shutdown();
		} else {
			logger.warn("consumer不存在,关闭失败!,consumerId=" + consumerId);
		}
	}

	/**
	 * 重启某个消费者
	 * 
	 * @param consumerId
	 */
	@Override
	public void restart(String consumerId) {
		logger.info("重启消费者,consumerId"+consumerId);
		shutdown(consumerId);
		start(consumerId);
		logger.info("重启完成");
	}
	
	/**
	 * 启用某个consumer
	 */
	@Override
	public void enabled(String consumerId){
		logger.info("启用consumer,consumerId="+consumerId);
		ConsumerBean consumer=consumerBeans.get(consumerId);
		if(consumer==null){
			createConsumerBean(consumerId);
			consumer=consumerBeans.get(consumerId);
			consumer.start();
		}else{
			if(consumer.isClosed()){
				consumer.start();
			}
		}
		jdbcTemplate.update("update rkmq_consumer_config set enabled='0' where consumer_id=?", consumerId);
		jdbcTemplate.update("update rkmq_listener set enabled='0' where consumer_id=?", consumerId);
		logger.info("启用成功");
	}
	
	/**
	 * 禁用某个consumer
	 */
	@Override
	public void disabled(String consumerId){
		logger.info("禁用consumer,consumerId="+consumerId);
		ConsumerBean consumer=consumerBeans.get(consumerId);
		if(consumer!=null){
			consumer.shutdown();
			consumerBeans.remove(consumerId);
			jdbcTemplate.update("update rkmq_consumer_config set enabled='1' where consumer_id=?", consumerId);
			jdbcTemplate.update("update rkmq_listener set enabled='1' where consumer_id=?", consumerId);
			logger.info("禁用成功");
		}else{
			logger.warn("禁用失败,consumer不存在,consumerId="+consumerId);
		}
	}
	
	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		logger.info("开始初始化Consumer属性");
		/**
		 * 如果传了consumerBean,就所有的listener都用同一个配置,否则从数据库读取配置
		 */
		if (this.consumerBean == null) {
			logger.info("consumerBean为空,开始从数据库查找配置实例化consumerBean并设置属性");
			// TODO 从数据库中读取ConsumerBean的配置,动态的配置多个ConsumerBean
			createConsumerBean();
		} else {
			createSubscriptionTable(consumerBean, CID_DEFAULT);
		}
	}

	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

	/**
	 * 创建单个consumer
	 * @param consumerId
	 */
	private void createConsumerBean(String consumerId) {
		String sql = "select * from rkmq_consumer_config where enabled='0' and consumer_id=?";
		List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql, consumerId);
		logger.info("创建consumer,consumerId=" + consumerId);
		if (consumerConfigs.size() > 0) {
			consumerConfigs.stream().forEach(consumerConfig -> {
				ConsumerBean consumerBean = new ConsumerBean();
				Properties properties = new Properties();
				properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());
				properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());
				properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());
				properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());
				logger.info("连接属性:" + properties);
				consumerBean.setProperties(properties);
				this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());
			});
		}else{
			throw new ConsumerException("Consumer创建失败,找不到相关配置,consumerId="+consumerId);
		}
	}
	
	/**
	 * 创建所有consumer
	 */
	private void createConsumerBean() {
		String sql = "select * from rkmq_consumer_config where enabled='0'";
		List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql);
		if (consumerConfigs.size() > 0) {
			consumerConfigs.stream().forEach(consumerConfig -> {
				ConsumerBean consumerBean = new ConsumerBean();
				Properties properties = new Properties();
				properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());
				properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());
				properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());
				properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());
				logger.info("连接属性:" + properties);
				consumerBean.setProperties(properties);
				this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());
			});
		}else{
			logger.warn("Consumer创建失败,找不到相关配置");
		}
	}
	/**
	 * 创建监听类对象
	 * 
	 * @param consumerBean
	 * @param consumerId
	 */
	private void createSubscriptionTable(ConsumerBean consumerBean, String consumerId) {
		// 用反射生成消息监听对象,配置在数据库中
		StringBuilder sql = new StringBuilder("select * from rkmq_listener where enabled='0' ");
		List<Map<String, Object>> consumers = new ArrayList<>();
		if (consumerId == null || CID_DEFAULT.equals(consumerId)) {
			consumers = jdbcTemplate.queryForList(sql.toString());
		} else {
			sql.append(" and consumer_id=?");
			consumers = jdbcTemplate.queryForList(sql.toString(), consumerId);
		}
		Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
		if (consumers != null && !consumers.isEmpty()) {
			consumers.stream().forEach(consumer -> {
				Subscription subscription = new Subscription();
				subscription.setTopic(consumer.get("topic").toString());
				subscription.setExpression(consumer.get("tag").toString());
				try {
					Class<?> classz = Class.forName(consumer.get("listener_class_name").toString());
					logger.info("开始为" + classz.getName() + "创建对象");
					MessageListener messageListener = (MessageListener) classz.newInstance();
					/**
					 * 增强bean,让容器之外的bean获得自动注入的能力,这样反射生成的对象也可以用 @Autowire
					 * 注解了,spring牛逼
					 * 
					 */
					applicationContext.getAutowireCapableBeanFactory().autowireBean(messageListener);
					subscriptionTable.put(subscription, messageListener);
				} catch (ClassNotFoundException e) {
					e.printStackTrace();
				} catch (InstantiationException e1) {
					e1.printStackTrace();
				} catch (IllegalAccessException e2) {
					e2.printStackTrace();
				}

			});
			consumerBean.setSubscriptionTable(subscriptionTable);
			consumerBeans.put(consumerId, consumerBean);
			logger.info("consumer初始化完成");
		} else {
			logger.warn("未找到相关Consumer!请检查数据库配置");
		}
	}

	@Override
	public boolean isStart(String consumerId) {
		ConsumerBean consumer=consumerBeans.get(consumerId);
		if(consumer!=null){
			return consumer.isStarted();
		}
		return false;
	}
}
package com.sgcc.mq;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.JdbcTemplate;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.sgcc.exception.ProducerException;

/**
 * 
 * 
 * <p>
 * Title: ProducerManager
 * </p>
 * 
 * <p>
 * Description:生产者管理
 * </p>
 * 
 * @author mengjinyuan
 * 
 * @date 2019年3月14日
 */
public class ProducerManager implements InitializingBean, Manager {

	private static Map<String, ProducerBean> producerBeans = new HashMap<>();

	protected JdbcTemplate jdbcTemplate;

	private static Logger logger = LoggerFactory.getLogger(ProducerManager.class);

	@Override
	public void start() {
		if (producerBeans.size() > 0) {
			logger.info("开启所有生产者,producerBean个数:" + producerBeans.size());
			producerBeans.forEach((producerId, producer) -> {
				start(producerId);
			});
		}
	}

	@Override
	public void shutdown() {
		if (!producerBeans.isEmpty()) {
			logger.info("关闭所有生产者");
			producerBeans.forEach((producerId, producer) -> {
				shutdown(producerId);
			});
		}
	}

	@Override
	public void start(String producerId) {
		ProducerBean producer = producerBeans.get(producerId);
		logger.info("开启producer,producerId=" + producerId);
		if (producer != null) {
			producer.start();
		} else {
			logger.warn("producer不存在,开启失败!,producerId=" + producerId);
		}
	}

	@Override
	public void shutdown(String producerId) {
		ProducerBean producer = producerBeans.get(producerId);
		logger.info("关闭producer,producerId=" + producerId);
		if (producer != null) {
			producer.shutdown();
		} else {
			logger.warn("producer不存在,关闭失败!,producerId=" + producerId);
		}

	}

	@Override
	public void restart(String producerId) {
		logger.info("重启消费者,producerId" + producerId);
		shutdown(producerId);
		start(producerId);
		logger.info("重启完成");
	}

	@Override
	public void enabled(String producerId) {
		logger.info("启用producer,producerId=" + producerId);
		ProducerBean producer = producerBeans.get(producerId);
		if (producer == null) {
			createProducer(producerId);
			producer = producerBeans.get(producerId);
			producer.start();
		} else {
			if (producer.isClosed()) {
				producer.start();
			}
		}
		jdbcTemplate.update("update rkmq_producer_config set enabled='0' where producer_id=?", producerId);
		logger.info("启用成功");
	}

	@Override
	public void disabled(String producerId) {
		logger.info("禁用producer,producerId=" + producerId);
		ProducerBean producer = producerBeans.get(producerId);
		if (producer != null) {
			producer.shutdown();
			producerBeans.remove(producerId);
			jdbcTemplate.update("update rkmq_producer_config set enabled='1' where producer_id=?", producerId);
			logger.info("禁用成功");
		} else {
			logger.warn("禁用失败,producer不存在,producerId=" + producerId);
		}
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		logger.info("开始创建生产者");
		if (producerBeans.isEmpty()) {
			createProducer();
		}
		logger.info("生产者创建完成");
	}

	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public ProducerBean getProducer(String producerId) {
		return producerBeans.get(producerId);
	}

	public void setProducerBeans(Map<String, ProducerBean> producerBeans) {
		ProducerManager.producerBeans = producerBeans;
	}

	/**
	 * 创建所有生产者bean
	 */
	private void createProducer() {
		String sql = "select * from rkmq_producer_config where enabled='0'";
		List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql);
		if (producerConfigs.size() > 0) {
			producerConfigs.stream().forEach(producerConfig -> {
				ProducerBean producerBean = new ProducerBean();
				Properties properties = new Properties();
				properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());
				properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());
				properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());
				properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());
				logger.info("连接属性:" + properties);
				producerBean.setProperties(properties);
				producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);
			});
		} else {
			logger.warn("producer创建失败,找不到相关配置");
		}

	}

	/**
	 * 创建一个生产者bean
	 */
	private void createProducer(String producerId) {
		String sql = "select * from rkmq_producer_config where enabled='0' and producer_id=?";
		List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql, producerId);
		if (producerConfigs.size() > 0) {
			producerConfigs.stream().forEach(producerConfig -> {
				ProducerBean producerBean = new ProducerBean();
				Properties properties = new Properties();
				properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());
				properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());
				properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());
				properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());
				logger.info("连接属性:" + properties);
				producerBean.setProperties(properties);
				producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);
			});
		} else {
			throw new ProducerException("producer创建失败,找不到相关配置,producerId="+producerId);
		}

	}

	@Override
	public boolean isStart(String producerId) {
		ProducerBean producer = producerBeans.get(producerId);
		if (producer != null) {
			return producer.isStarted();
		}
		return false;
	}
}


四、创建监听类

package com.sgcc.mq.listener;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.sgcc.web.entity.Demo;
import com.sgcc.web.entity.DemoExample;
import com.sgcc.web.entity.DemoExample.Criteria;
import com.sgcc.web.mapper.DemoMapper;

public class MyMessageListener implements MessageListener{
	
	private static Logger logger=LoggerFactory.getLogger(MyMessageListener.class);
	
	@Override
	public Action consume(Message message, ConsumeContext context) {
		logger.info("收到消息,message="+message+" context="+context);
		return Action.CommitMessage;
	}
}

在rkmq_listener表中添加上该类的包路径,这样就可以用反射实例化对象了。


五、spring中配置bean

spring-mq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
	
	<bean id="mqDataSource" class="org.apache.commons.dbcp.BasicDataSource"
		destroy-method="close">
		<property name="driverClassName" value="${jdbc.driverClassName}" />
		<property name="url" value="${jdbc.url}" />
		<property name="username" value="${jdbc.username}" />
		<property name="password" value="${jdbc.password}" />
		<!-- 初始化连接大小 -->
		<property name="initialSize" value="${druid.initialSize}"></property>
		<!-- 连接池最大数量 -->
		<property name="maxActive" value="${druid.maxActive}"></property>
		<!-- 连接池最大空闲 -->
		<property name="maxIdle" value="${druid.maxIdle}"></property>
		<!-- 连接池最小空闲 -->
		<property name="minIdle" value="${druid.minIdle}"></property>
		<!-- 获取连接最大等待时间 -->
		<property name="maxWait" value="${druid.maxWait}"></property>

		<property name="validationQuery" value="${druid.validationQuery}" />
		<property name="testOnBorrow" value="${druid.testOnBorrow}" />
	</bean>


	<!-- <bean id="mqConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean">
		<property name="properties"> 消费者配置信息
			<props>
				请替换 CID_mainstation_control
				<prop key="ConsumerId">CID_TEST100</prop>
				<prop key="AccessKey">xxxxxx</prop>
				<prop key="SecretKey">xxxxxx</prop>
				<prop key="ONSAddr">http://xxxxx/rocketmq/nsaddr4broker-internal
				</prop>
			</props>
		</property>
	</bean> -->

	<!-- ConsumerBean管理类,用法:
		如果注入consumerBean的话数据库中配的所有listener全都使用此配置来监听
		否则从数据表rkmq_consumer_config中获取配置并实例化consumerBean,可以配置多个.
		dataSource数据源必须注入,可以设置为mybatis/hibernate的数据源
	 -->
	<bean id="consumerManager" class="com.sgcc.mq.ConsumerManager"
		init-method="start" destroy-method="shutdown">
		<!-- <property name="consumerBean" ref="mqConsumer"></property> -->
		<property name="dataSource" ref="mqDataSource"></property>
	</bean>
</beans>

spring-mq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
	
	<bean id="producerManager" class="com.sgcc.mq.ProducerManager"
		init-method="start" destroy-method="shutdown">
		<property name="dataSource" ref="mqDataSource"></property>
	</bean>
</beans>

注:也可以写在一个配置文件里


赞 (

发表评论

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

召唤伊斯特瓦尔