一、创建数据表
CREATE TABLE rkmq_consumer_config
(
consumer_id
varchar(64) NOT NULL,
consumer_name
varchar(128) NOT NULL,
comsumer_desc
varchar(256) DEFAULT NULL,
access_key
varchar(128) DEFAULT NULL,
secret_key
varchar(128) DEFAULT NULL,
ons_addr
varchar(256) DEFAULT NULL,
enabled
varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',
create_date
datetime NOT NULL,
update_date
datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`consumer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消费者配置';
CREATE TABLE rkmq_listener
(
listener_id
int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
listener_name
varchar(128) NOT NULL,
listener_desc
varchar(256) DEFAULT NULL,
consumer_id
varchar(64) DEFAULT NULL COMMENT 'consumer配置id',
topic
varchar(32) NOT NULL,
tag
varchar(32) NOT NULL,
listener_class_name
varchar(256) NOT NULL COMMENT '消费者处理类',
create_date
datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
update_date
datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
enabled
varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',
data_source
varchar(256) DEFAULT NULL,
PRIMARY KEY (`listener_id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='消费者监听配置';
CREATE TABLE rkmq_producer_config
(
producer_id
varchar(64) NOT NULL,
producer_name
varchar(128) NOT NULL,
producer_desc
varchar(256) DEFAULT NULL,
access_key
varchar(128) DEFAULT NULL,
secret_key
varchar(128) DEFAULT NULL,
ons_addr
varchar(256) DEFAULT NULL,
enabled
varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',
create_date
datetime NOT NULL,
update_date
datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`producer_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='生产者配置';
二、创建管理接口
package com.sgcc.mq;
import javax.sql.DataSource;
public interface Manager {
/**
* 启动所有
*/
public void start();
/**
* 关闭所有
*/
public void shutdown();
/**
* 开启某个
* @param id
*/
public void start(String id);
/**
* 关闭某个
* @param id
*/
public void shutdown(String id);
/**
* 重启某个
* @param id
*/
public void restart(String id);
/**
* 启用某个
* @param id
*/
public void enabled(String id);
/**
* 禁用某个
* @param id
*/
public void disabled(String id);
/**
* 是否开启
* @param id
*/
public boolean isStart(String id);
/**
* 设置数据源
* @param dataSource
*/
public void setDataSource(DataSource dataSource);
}
三、创建Consumer和Producer的管理类
package com.sgcc.mq;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.core.JdbcTemplate;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.sgcc.exception.ConsumerException;
/**
*
*
* <p>
* Title: ConsumerManager
* </p>
*
* <p>
* Description: ConsumerBean管理器,从数据库中读取配置并实例化ConsumerBean,一个或多个
* </p>
*
* @author mengjinyuan
*
* @date 2019年3月14日
*/
public class ConsumerManager implements ApplicationContextAware, InitializingBean,Manager {
private ConsumerBean consumerBean;
private static Map<String, ConsumerBean> consumerBeans = new HashMap<>();
private ApplicationContext applicationContext;
private JdbcTemplate jdbcTemplate;
private final static String CID_DEFAULT = "CID_DEFAULT";
private static Logger logger = LoggerFactory.getLogger(ConsumerManager.class);
public void setConsumerBean(ConsumerBean consumerBean) {
this.consumerBean = consumerBean;
}
@Override
public void start() {
if (consumerBeans.size() > 0) {
logger.info("开启所有监听,consumerBean个数:" + consumerBeans.size());
consumerBeans.forEach((consumerId, consumer) -> {
if (!consumer.getSubscriptionTable().isEmpty()) {
start(consumerId);
}
});
}
}
@Override
public void shutdown() {
if (!consumerBeans.isEmpty()) {
logger.info("关闭所有监听");
consumerBeans.forEach((consumerId, consumer) -> {
shutdown(consumerId);
});
}
}
/**
* 启动某个消费者
*
* @param consumerId
*/
@Override
public void start(String consumerId) {
ConsumerBean consumer = consumerBeans.get(consumerId);
logger.info("开启consumer,consumerId=" + consumerId);
if (consumer != null) {
consumer.start();
} else {
logger.warn("consumer不存在,开启失败!,consumerId=" + consumerId);
}
}
/**
* 关闭某个消费者
*
* @param consumerId
*/
@Override
public void shutdown(String consumerId) {
ConsumerBean consumer = consumerBeans.get(consumerId);
logger.info("关闭consumer,consumerId=" + consumerId);
if (consumer != null) {
consumer.shutdown();
} else {
logger.warn("consumer不存在,关闭失败!,consumerId=" + consumerId);
}
}
/**
* 重启某个消费者
*
* @param consumerId
*/
@Override
public void restart(String consumerId) {
logger.info("重启消费者,consumerId"+consumerId);
shutdown(consumerId);
start(consumerId);
logger.info("重启完成");
}
/**
* 启用某个consumer
*/
@Override
public void enabled(String consumerId){
logger.info("启用consumer,consumerId="+consumerId);
ConsumerBean consumer=consumerBeans.get(consumerId);
if(consumer==null){
createConsumerBean(consumerId);
consumer=consumerBeans.get(consumerId);
consumer.start();
}else{
if(consumer.isClosed()){
consumer.start();
}
}
jdbcTemplate.update("update rkmq_consumer_config set enabled='0' where consumer_id=?", consumerId);
jdbcTemplate.update("update rkmq_listener set enabled='0' where consumer_id=?", consumerId);
logger.info("启用成功");
}
/**
* 禁用某个consumer
*/
@Override
public void disabled(String consumerId){
logger.info("禁用consumer,consumerId="+consumerId);
ConsumerBean consumer=consumerBeans.get(consumerId);
if(consumer!=null){
consumer.shutdown();
consumerBeans.remove(consumerId);
jdbcTemplate.update("update rkmq_consumer_config set enabled='1' where consumer_id=?", consumerId);
jdbcTemplate.update("update rkmq_listener set enabled='1' where consumer_id=?", consumerId);
logger.info("禁用成功");
}else{
logger.warn("禁用失败,consumer不存在,consumerId="+consumerId);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void afterPropertiesSet() throws Exception {
logger.info("开始初始化Consumer属性");
/**
* 如果传了consumerBean,就所有的listener都用同一个配置,否则从数据库读取配置
*/
if (this.consumerBean == null) {
logger.info("consumerBean为空,开始从数据库查找配置实例化consumerBean并设置属性");
// TODO 从数据库中读取ConsumerBean的配置,动态的配置多个ConsumerBean
createConsumerBean();
} else {
createSubscriptionTable(consumerBean, CID_DEFAULT);
}
}
public void setDataSource(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
/**
* 创建单个consumer
* @param consumerId
*/
private void createConsumerBean(String consumerId) {
String sql = "select * from rkmq_consumer_config where enabled='0' and consumer_id=?";
List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql, consumerId);
logger.info("创建consumer,consumerId=" + consumerId);
if (consumerConfigs.size() > 0) {
consumerConfigs.stream().forEach(consumerConfig -> {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());
properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());
properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());
properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());
logger.info("连接属性:" + properties);
consumerBean.setProperties(properties);
this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());
});
}else{
throw new ConsumerException("Consumer创建失败,找不到相关配置,consumerId="+consumerId);
}
}
/**
* 创建所有consumer
*/
private void createConsumerBean() {
String sql = "select * from rkmq_consumer_config where enabled='0'";
List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql);
if (consumerConfigs.size() > 0) {
consumerConfigs.stream().forEach(consumerConfig -> {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());
properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());
properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());
properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());
logger.info("连接属性:" + properties);
consumerBean.setProperties(properties);
this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());
});
}else{
logger.warn("Consumer创建失败,找不到相关配置");
}
}
/**
* 创建监听类对象
*
* @param consumerBean
* @param consumerId
*/
private void createSubscriptionTable(ConsumerBean consumerBean, String consumerId) {
// 用反射生成消息监听对象,配置在数据库中
StringBuilder sql = new StringBuilder("select * from rkmq_listener where enabled='0' ");
List<Map<String, Object>> consumers = new ArrayList<>();
if (consumerId == null || CID_DEFAULT.equals(consumerId)) {
consumers = jdbcTemplate.queryForList(sql.toString());
} else {
sql.append(" and consumer_id=?");
consumers = jdbcTemplate.queryForList(sql.toString(), consumerId);
}
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
if (consumers != null && !consumers.isEmpty()) {
consumers.stream().forEach(consumer -> {
Subscription subscription = new Subscription();
subscription.setTopic(consumer.get("topic").toString());
subscription.setExpression(consumer.get("tag").toString());
try {
Class<?> classz = Class.forName(consumer.get("listener_class_name").toString());
logger.info("开始为" + classz.getName() + "创建对象");
MessageListener messageListener = (MessageListener) classz.newInstance();
/**
* 增强bean,让容器之外的bean获得自动注入的能力,这样反射生成的对象也可以用 @Autowire
* 注解了,spring牛逼
*
*/
applicationContext.getAutowireCapableBeanFactory().autowireBean(messageListener);
subscriptionTable.put(subscription, messageListener);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InstantiationException e1) {
e1.printStackTrace();
} catch (IllegalAccessException e2) {
e2.printStackTrace();
}
});
consumerBean.setSubscriptionTable(subscriptionTable);
consumerBeans.put(consumerId, consumerBean);
logger.info("consumer初始化完成");
} else {
logger.warn("未找到相关Consumer!请检查数据库配置");
}
}
@Override
public boolean isStart(String consumerId) {
ConsumerBean consumer=consumerBeans.get(consumerId);
if(consumer!=null){
return consumer.isStarted();
}
return false;
}
}
package com.sgcc.mq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.core.JdbcTemplate;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.sgcc.exception.ProducerException;
/**
*
*
* <p>
* Title: ProducerManager
* </p>
*
* <p>
* Description:生产者管理
* </p>
*
* @author mengjinyuan
*
* @date 2019年3月14日
*/
public class ProducerManager implements InitializingBean, Manager {
private static Map<String, ProducerBean> producerBeans = new HashMap<>();
protected JdbcTemplate jdbcTemplate;
private static Logger logger = LoggerFactory.getLogger(ProducerManager.class);
@Override
public void start() {
if (producerBeans.size() > 0) {
logger.info("开启所有生产者,producerBean个数:" + producerBeans.size());
producerBeans.forEach((producerId, producer) -> {
start(producerId);
});
}
}
@Override
public void shutdown() {
if (!producerBeans.isEmpty()) {
logger.info("关闭所有生产者");
producerBeans.forEach((producerId, producer) -> {
shutdown(producerId);
});
}
}
@Override
public void start(String producerId) {
ProducerBean producer = producerBeans.get(producerId);
logger.info("开启producer,producerId=" + producerId);
if (producer != null) {
producer.start();
} else {
logger.warn("producer不存在,开启失败!,producerId=" + producerId);
}
}
@Override
public void shutdown(String producerId) {
ProducerBean producer = producerBeans.get(producerId);
logger.info("关闭producer,producerId=" + producerId);
if (producer != null) {
producer.shutdown();
} else {
logger.warn("producer不存在,关闭失败!,producerId=" + producerId);
}
}
@Override
public void restart(String producerId) {
logger.info("重启消费者,producerId" + producerId);
shutdown(producerId);
start(producerId);
logger.info("重启完成");
}
@Override
public void enabled(String producerId) {
logger.info("启用producer,producerId=" + producerId);
ProducerBean producer = producerBeans.get(producerId);
if (producer == null) {
createProducer(producerId);
producer = producerBeans.get(producerId);
producer.start();
} else {
if (producer.isClosed()) {
producer.start();
}
}
jdbcTemplate.update("update rkmq_producer_config set enabled='0' where producer_id=?", producerId);
logger.info("启用成功");
}
@Override
public void disabled(String producerId) {
logger.info("禁用producer,producerId=" + producerId);
ProducerBean producer = producerBeans.get(producerId);
if (producer != null) {
producer.shutdown();
producerBeans.remove(producerId);
jdbcTemplate.update("update rkmq_producer_config set enabled='1' where producer_id=?", producerId);
logger.info("禁用成功");
} else {
logger.warn("禁用失败,producer不存在,producerId=" + producerId);
}
}
@Override
public void afterPropertiesSet() throws Exception {
logger.info("开始创建生产者");
if (producerBeans.isEmpty()) {
createProducer();
}
logger.info("生产者创建完成");
}
public void setDataSource(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
public ProducerBean getProducer(String producerId) {
return producerBeans.get(producerId);
}
public void setProducerBeans(Map<String, ProducerBean> producerBeans) {
ProducerManager.producerBeans = producerBeans;
}
/**
* 创建所有生产者bean
*/
private void createProducer() {
String sql = "select * from rkmq_producer_config where enabled='0'";
List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql);
if (producerConfigs.size() > 0) {
producerConfigs.stream().forEach(producerConfig -> {
ProducerBean producerBean = new ProducerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());
properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());
properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());
properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());
logger.info("连接属性:" + properties);
producerBean.setProperties(properties);
producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);
});
} else {
logger.warn("producer创建失败,找不到相关配置");
}
}
/**
* 创建一个生产者bean
*/
private void createProducer(String producerId) {
String sql = "select * from rkmq_producer_config where enabled='0' and producer_id=?";
List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql, producerId);
if (producerConfigs.size() > 0) {
producerConfigs.stream().forEach(producerConfig -> {
ProducerBean producerBean = new ProducerBean();
Properties properties = new Properties();
properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());
properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());
properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());
properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());
logger.info("连接属性:" + properties);
producerBean.setProperties(properties);
producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);
});
} else {
throw new ProducerException("producer创建失败,找不到相关配置,producerId="+producerId);
}
}
@Override
public boolean isStart(String producerId) {
ProducerBean producer = producerBeans.get(producerId);
if (producer != null) {
return producer.isStarted();
}
return false;
}
}
四、创建监听类
package com.sgcc.mq.listener;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.sgcc.web.entity.Demo;
import com.sgcc.web.entity.DemoExample;
import com.sgcc.web.entity.DemoExample.Criteria;
import com.sgcc.web.mapper.DemoMapper;
public class MyMessageListener implements MessageListener{
private static Logger logger=LoggerFactory.getLogger(MyMessageListener.class);
@Override
public Action consume(Message message, ConsumeContext context) {
logger.info("收到消息,message="+message+" context="+context);
return Action.CommitMessage;
}
}
在rkmq_listener表中添加上该类的包路径,这样就可以用反射实例化对象了。
五、spring中配置bean
spring-mq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="mqDataSource" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
<!-- 初始化连接大小 -->
<property name="initialSize" value="${druid.initialSize}"></property>
<!-- 连接池最大数量 -->
<property name="maxActive" value="${druid.maxActive}"></property>
<!-- 连接池最大空闲 -->
<property name="maxIdle" value="${druid.maxIdle}"></property>
<!-- 连接池最小空闲 -->
<property name="minIdle" value="${druid.minIdle}"></property>
<!-- 获取连接最大等待时间 -->
<property name="maxWait" value="${druid.maxWait}"></property>
<property name="validationQuery" value="${druid.validationQuery}" />
<property name="testOnBorrow" value="${druid.testOnBorrow}" />
</bean>
<!-- <bean id="mqConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean">
<property name="properties"> 消费者配置信息
<props>
请替换 CID_mainstation_control
<prop key="ConsumerId">CID_TEST100</prop>
<prop key="AccessKey">xxxxxx</prop>
<prop key="SecretKey">xxxxxx</prop>
<prop key="ONSAddr">http://xxxxx/rocketmq/nsaddr4broker-internal
</prop>
</props>
</property>
</bean> -->
<!-- ConsumerBean管理类,用法:
如果注入consumerBean的话数据库中配的所有listener全都使用此配置来监听
否则从数据表rkmq_consumer_config中获取配置并实例化consumerBean,可以配置多个.
dataSource数据源必须注入,可以设置为mybatis/hibernate的数据源
-->
<bean id="consumerManager" class="com.sgcc.mq.ConsumerManager"
init-method="start" destroy-method="shutdown">
<!-- <property name="consumerBean" ref="mqConsumer"></property> -->
<property name="dataSource" ref="mqDataSource"></property>
</bean>
</beans>
spring-mq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="producerManager" class="com.sgcc.mq.ProducerManager"
init-method="start" destroy-method="shutdown">
<property name="dataSource" ref="mqDataSource"></property>
</bean>
</beans>
注:也可以写在一个配置文件里