一、创建数据表

CREATE TABLE rkmq_consumer_config (

  consumer_id varchar(64) NOT NULL,

  consumer_name varchar(128) NOT NULL,

  comsumer_desc varchar(256) DEFAULT NULL,

  access_key varchar(128) DEFAULT NULL,

  secret_key varchar(128) DEFAULT NULL,

  ons_addr varchar(256) DEFAULT NULL,

  enabled varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',

  create_date datetime NOT NULL,

  update_date datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,

  PRIMARY KEY (`consumer_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消费者配置';

CREATE TABLE rkmq_listener (

  listener_id int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',

  listener_name varchar(128) NOT NULL,

  listener_desc varchar(256) DEFAULT NULL,

  consumer_id varchar(64) DEFAULT NULL COMMENT 'consumer配置id',

  topic varchar(32) NOT NULL,

  tag varchar(32) NOT NULL,

  listener_class_name varchar(256) NOT NULL COMMENT '消费者处理类',

  create_date datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,

  update_date datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,

  enabled varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',

  data_source varchar(256) DEFAULT NULL,

  PRIMARY KEY (`listener_id`)

) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='消费者监听配置';

CREATE TABLE rkmq_producer_config (

  producer_id varchar(64) NOT NULL,

  producer_name varchar(128) NOT NULL,

  producer_desc varchar(256) DEFAULT NULL,

  access_key varchar(128) DEFAULT NULL,

  secret_key varchar(128) DEFAULT NULL,

  ons_addr varchar(256) DEFAULT NULL,

  enabled varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',

  create_date datetime NOT NULL,

  update_date datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,

  PRIMARY KEY (`producer_id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='生产者配置';

二、创建管理接口

package com.sgcc.mq;

import javax.sql.DataSource;

public interface Manager {

/**

 * 启动所有

 */

public void start();

/**

 * 关闭所有

 */

public void shutdown();

/**

 * 开启某个

 * @param id

 */

public void start(String id);

/**

 * 关闭某个

 * @param id

 */

public void shutdown(String id);

/**

 * 重启某个

 * @param id

 */

public void restart(String id);

/**

 * 启用某个

 * @param id

 */

public void enabled(String id);

/**

 * 禁用某个

 * @param id

 */

public void disabled(String id);

/**

 * 是否开启

 * @param id

 */

public boolean isStart(String id);

/**

 * 设置数据源

 * @param dataSource

 */

public void setDataSource(DataSource dataSource);

}

三、创建Consumer和Producer的管理类

package com.sgcc.mq;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import javax.sql.DataSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import org.springframework.jdbc.core.JdbcTemplate;

import com.aliyun.openservices.ons.api.MessageListener;

import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.bean.ConsumerBean;

import com.aliyun.openservices.ons.api.bean.Subscription;

import com.sgcc.exception.ConsumerException;

/**

 * 

 * 

 * <p>

 * Title: ConsumerManager

 * </p>

 * 

 * <p>

 * Description: ConsumerBean管理器,从数据库中读取配置并实例化ConsumerBean,一个或多个

 * </p>

 * 

 * @author mengjinyuan

 * 

 * @date 2019年3月14日

 */

public class ConsumerManager implements ApplicationContextAware, InitializingBean,Manager {

private ConsumerBean consumerBean;

private static Map<String, ConsumerBean> consumerBeans = new HashMap<>();

private ApplicationContext applicationContext;

private JdbcTemplate jdbcTemplate;

private final static String CID_DEFAULT = "CID_DEFAULT";

private static Logger logger = LoggerFactory.getLogger(ConsumerManager.class);

public void setConsumerBean(ConsumerBean consumerBean) {

this.consumerBean = consumerBean;

}

@Override

public void start() {

if (consumerBeans.size() > 0) {

logger.info("开启所有监听,consumerBean个数:" + consumerBeans.size());

consumerBeans.forEach((consumerId, consumer) -> {

if (!consumer.getSubscriptionTable().isEmpty()) {

start(consumerId);

}

});

}

}

@Override

public void shutdown() {

if (!consumerBeans.isEmpty()) {

logger.info("关闭所有监听");

consumerBeans.forEach((consumerId, consumer) -> {

shutdown(consumerId);

});

}

}

/**

 * 启动某个消费者

 * 

 * @param consumerId

 */

@Override

public void start(String consumerId) {

ConsumerBean consumer = consumerBeans.get(consumerId);

logger.info("开启consumer,consumerId=" + consumerId);

if (consumer != null) {

consumer.start();

} else {

logger.warn("consumer不存在,开启失败!,consumerId=" + consumerId);

}

}

/**

 * 关闭某个消费者

 * 

 * @param consumerId

 */

@Override

public void shutdown(String consumerId) {

ConsumerBean consumer = consumerBeans.get(consumerId);

logger.info("关闭consumer,consumerId=" + consumerId);

if (consumer != null) {

consumer.shutdown();

} else {

logger.warn("consumer不存在,关闭失败!,consumerId=" + consumerId);

}

}

/**

 * 重启某个消费者

 * 

 * @param consumerId

 */

@Override

public void restart(String consumerId) {

logger.info("重启消费者,consumerId"+consumerId);

shutdown(consumerId);

start(consumerId);

logger.info("重启完成");

}

/**

 * 启用某个consumer

 */

@Override

public void enabled(String consumerId){

logger.info("启用consumer,consumerId="+consumerId);

ConsumerBean consumer=consumerBeans.get(consumerId);

if(consumer==null){

createConsumerBean(consumerId);

consumer=consumerBeans.get(consumerId);

consumer.start();

}else{

if(consumer.isClosed()){

consumer.start();

}

}

jdbcTemplate.update("update rkmq_consumer_config set enabled='0' where consumer_id=?", consumerId);

jdbcTemplate.update("update rkmq_listener set enabled='0' where consumer_id=?", consumerId);

logger.info("启用成功");

}

/**

 * 禁用某个consumer

 */

@Override

public void disabled(String consumerId){

logger.info("禁用consumer,consumerId="+consumerId);

ConsumerBean consumer=consumerBeans.get(consumerId);

if(consumer!=null){

consumer.shutdown();

consumerBeans.remove(consumerId);

jdbcTemplate.update("update rkmq_consumer_config set enabled='1' where consumer_id=?", consumerId);

jdbcTemplate.update("update rkmq_listener set enabled='1' where consumer_id=?", consumerId);

logger.info("禁用成功");

}else{

logger.warn("禁用失败,consumer不存在,consumerId="+consumerId);

}

}

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

@Override

public void afterPropertiesSet() throws Exception {

logger.info("开始初始化Consumer属性");

/**

 * 如果传了consumerBean,就所有的listener都用同一个配置,否则从数据库读取配置

 */

if (this.consumerBean == null) {

logger.info("consumerBean为空,开始从数据库查找配置实例化consumerBean并设置属性");

// TODO 从数据库中读取ConsumerBean的配置,动态的配置多个ConsumerBean

createConsumerBean();

} else {

createSubscriptionTable(consumerBean, CID_DEFAULT);

}

}

public void setDataSource(DataSource dataSource) {

this.jdbcTemplate = new JdbcTemplate(dataSource);

}

/**

 * 创建单个consumer

 * @param consumerId

 */

private void createConsumerBean(String consumerId) {

String sql = "select * from rkmq_consumer_config where enabled='0' and consumer_id=?";

List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql, consumerId);

logger.info("创建consumer,consumerId=" + consumerId);

if (consumerConfigs.size() > 0) {

consumerConfigs.stream().forEach(consumerConfig -> {

ConsumerBean consumerBean = new ConsumerBean();

Properties properties = new Properties();

properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());

properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());

properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());

properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());

logger.info("连接属性:" + properties);

consumerBean.setProperties(properties);

this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());

});

}else{

throw new ConsumerException("Consumer创建失败,找不到相关配置,consumerId="+consumerId);

}

}

/**

 * 创建所有consumer

 */

private void createConsumerBean() {

String sql = "select * from rkmq_consumer_config where enabled='0'";

List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql);

if (consumerConfigs.size() > 0) {

consumerConfigs.stream().forEach(consumerConfig -> {

ConsumerBean consumerBean = new ConsumerBean();

Properties properties = new Properties();

properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());

properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());

properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());

properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());

logger.info("连接属性:" + properties);

consumerBean.setProperties(properties);

this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());

});

}else{

logger.warn("Consumer创建失败,找不到相关配置");

}

}

/**

 * 创建监听类对象

 * 

 * @param consumerBean

 * @param consumerId

 */

private void createSubscriptionTable(ConsumerBean consumerBean, String consumerId) {

// 用反射生成消息监听对象,配置在数据库中

StringBuilder sql = new StringBuilder("select * from rkmq_listener where enabled='0' ");

List<Map<String, Object>> consumers = new ArrayList<>();

if (consumerId == null || CID_DEFAULT.equals(consumerId)) {

consumers = jdbcTemplate.queryForList(sql.toString());

} else {

sql.append(" and consumer_id=?");

consumers = jdbcTemplate.queryForList(sql.toString(), consumerId);

}

Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();

if (consumers != null && !consumers.isEmpty()) {

consumers.stream().forEach(consumer -> {

Subscription subscription = new Subscription();

subscription.setTopic(consumer.get("topic").toString());

subscription.setExpression(consumer.get("tag").toString());

try {

Class<?> classz = Class.forName(consumer.get("listener_class_name").toString());

logger.info("开始为" + classz.getName() + "创建对象");

MessageListener messageListener = (MessageListener) classz.newInstance();

/**

 * 增强bean,让容器之外的bean获得自动注入的能力,这样反射生成的对象也可以用 @Autowire

 * 注解了,spring牛逼

 * 

 */

applicationContext.getAutowireCapableBeanFactory().autowireBean(messageListener);

subscriptionTable.put(subscription, messageListener);

} catch (ClassNotFoundException e) {

e.printStackTrace();

} catch (InstantiationException e1) {

e1.printStackTrace();

} catch (IllegalAccessException e2) {

e2.printStackTrace();

}

});

consumerBean.setSubscriptionTable(subscriptionTable);

consumerBeans.put(consumerId, consumerBean);

logger.info("consumer初始化完成");

} else {

logger.warn("未找到相关Consumer!请检查数据库配置");

}

}

@Override

public boolean isStart(String consumerId) {

ConsumerBean consumer=consumerBeans.get(consumerId);

if(consumer!=null){

return consumer.isStarted();

}

return false;

}

}

package com.sgcc.mq;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import javax.sql.DataSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.jdbc.core.JdbcTemplate;

import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.bean.ProducerBean;

import com.sgcc.exception.ProducerException;

/**

 * 

 * 

 * <p>

 * Title: ProducerManager

 * </p>

 * 

 * <p>

 * Description:生产者管理

 * </p>

 * 

 * @author mengjinyuan

 * 

 * @date 2019年3月14日

 */

public class ProducerManager implements InitializingBean, Manager {

private static Map<String, ProducerBean> producerBeans = new HashMap<>();

protected JdbcTemplate jdbcTemplate;

private static Logger logger = LoggerFactory.getLogger(ProducerManager.class);

@Override

public void start() {

if (producerBeans.size() > 0) {

logger.info("开启所有生产者,producerBean个数:" + producerBeans.size());

producerBeans.forEach((producerId, producer) -> {

start(producerId);

});

}

}

@Override

public void shutdown() {

if (!producerBeans.isEmpty()) {

logger.info("关闭所有生产者");

producerBeans.forEach((producerId, producer) -> {

shutdown(producerId);

});

}

}

@Override

public void start(String producerId) {

ProducerBean producer = producerBeans.get(producerId);

logger.info("开启producer,producerId=" + producerId);

if (producer != null) {

producer.start();

} else {

logger.warn("producer不存在,开启失败!,producerId=" + producerId);

}

}

@Override

public void shutdown(String producerId) {

ProducerBean producer = producerBeans.get(producerId);

logger.info("关闭producer,producerId=" + producerId);

if (producer != null) {

producer.shutdown();

} else {

logger.warn("producer不存在,关闭失败!,producerId=" + producerId);

}

}

@Override

public void restart(String producerId) {

logger.info("重启消费者,producerId" + producerId);

shutdown(producerId);

start(producerId);

logger.info("重启完成");

}

@Override

public void enabled(String producerId) {

logger.info("启用producer,producerId=" + producerId);

ProducerBean producer = producerBeans.get(producerId);

if (producer == null) {

createProducer(producerId);

producer = producerBeans.get(producerId);

producer.start();

} else {

if (producer.isClosed()) {

producer.start();

}

}

jdbcTemplate.update("update rkmq_producer_config set enabled='0' where producer_id=?", producerId);

logger.info("启用成功");

}

@Override

public void disabled(String producerId) {

logger.info("禁用producer,producerId=" + producerId);

ProducerBean producer = producerBeans.get(producerId);

if (producer != null) {

producer.shutdown();

producerBeans.remove(producerId);

jdbcTemplate.update("update rkmq_producer_config set enabled='1' where producer_id=?", producerId);

logger.info("禁用成功");

} else {

logger.warn("禁用失败,producer不存在,producerId=" + producerId);

}

}

@Override

public void afterPropertiesSet() throws Exception {

logger.info("开始创建生产者");

if (producerBeans.isEmpty()) {

createProducer();

}

logger.info("生产者创建完成");

}

public void setDataSource(DataSource dataSource) {

this.jdbcTemplate = new JdbcTemplate(dataSource);

}

public ProducerBean getProducer(String producerId) {

return producerBeans.get(producerId);

}

public void setProducerBeans(Map<String, ProducerBean> producerBeans) {

ProducerManager.producerBeans = producerBeans;

}

/**

 * 创建所有生产者bean

 */

private void createProducer() {

String sql = "select * from rkmq_producer_config where enabled='0'";

List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql);

if (producerConfigs.size() > 0) {

producerConfigs.stream().forEach(producerConfig -> {

ProducerBean producerBean = new ProducerBean();

Properties properties = new Properties();

properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());

properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());

properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());

properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());

logger.info("连接属性:" + properties);

producerBean.setProperties(properties);

producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);

});

} else {

logger.warn("producer创建失败,找不到相关配置");

}

}

/**

 * 创建一个生产者bean

 */

private void createProducer(String producerId) {

String sql = "select * from rkmq_producer_config where enabled='0' and producer_id=?";

List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql, producerId);

if (producerConfigs.size() > 0) {

producerConfigs.stream().forEach(producerConfig -> {

ProducerBean producerBean = new ProducerBean();

Properties properties = new Properties();

properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());

properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());

properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());

properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());

logger.info("连接属性:" + properties);

producerBean.setProperties(properties);

producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);

});

} else {

throw new ProducerException("producer创建失败,找不到相关配置,producerId="+producerId);

}

}

@Override

public boolean isStart(String producerId) {

ProducerBean producer = producerBeans.get(producerId);

if (producer != null) {

return producer.isStarted();

}

return false;

}

}

四、创建监听类

package com.sgcc.mq.listener;

import java.util.List;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import com.aliyun.openservices.ons.api.Action;

import com.aliyun.openservices.ons.api.ConsumeContext;

import com.aliyun.openservices.ons.api.Message;

import com.aliyun.openservices.ons.api.MessageListener;

import com.sgcc.web.entity.Demo;

import com.sgcc.web.entity.DemoExample;

import com.sgcc.web.entity.DemoExample.Criteria;

import com.sgcc.web.mapper.DemoMapper;

public class MyMessageListener implements MessageListener{

private static Logger logger=LoggerFactory.getLogger(MyMessageListener.class);

@Override

public Action consume(Message message, ConsumeContext context) {

logger.info("收到消息,message="+message+" context="+context);

return Action.CommitMessage;

}

}

在rkmq_listener表中添加上该类的包路径,这样就可以用反射实例化对象了。

五、spring中配置bean

spring-mq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="mqDataSource" class="org.apache.commons.dbcp.BasicDataSource"

destroy-method="close">

<property name="driverClassName" value="${jdbc.driverClassName}" />

<property name="url" value="${jdbc.url}" />

<property name="username" value="${jdbc.username}" />

<property name="password" value="${jdbc.password}" />

<!-- 初始化连接大小 -->

<property name="initialSize" value="${druid.initialSize}"></property>

<!-- 连接池最大数量 -->

<property name="maxActive" value="${druid.maxActive}"></property>

<!-- 连接池最大空闲 -->

<property name="maxIdle" value="${druid.maxIdle}"></property>

<!-- 连接池最小空闲 -->

<property name="minIdle" value="${druid.minIdle}"></property>

<!-- 获取连接最大等待时间 -->

<property name="maxWait" value="${druid.maxWait}"></property>

<property name="validationQuery" value="${druid.validationQuery}" />

<property name="testOnBorrow" value="${druid.testOnBorrow}" />

</bean>

<!-- <bean id="mqConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean">

<property name="properties"> 消费者配置信息

<props>

请替换 CID_mainstation_control

<prop key="ConsumerId">CID_TEST100</prop>

<prop key="AccessKey">xxxxxx</prop>

<prop key="SecretKey">xxxxxx</prop>

<prop key="ONSAddr">http://xxxxx/rocketmq/nsaddr4broker-internal

</prop>

</props>

</property>

</bean> -->

<!-- ConsumerBean管理类,用法:

如果注入consumerBean的话数据库中配的所有listener全都使用此配置来监听

否则从数据表rkmq_consumer_config中获取配置并实例化consumerBean,可以配置多个.

dataSource数据源必须注入,可以设置为mybatis/hibernate的数据源

 -->

<bean id="consumerManager" class="com.sgcc.mq.ConsumerManager"

init-method="start" destroy-method="shutdown">

<!-- <property name="consumerBean" ref="mqConsumer"></property> -->

<property name="dataSource" ref="mqDataSource"></property>

</bean>

</beans>

spring-mq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="producerManager" class="com.sgcc.mq.ProducerManager"

init-method="start" destroy-method="shutdown">

<property name="dataSource" ref="mqDataSource"></property>

</bean>

</beans>

注:也可以写在一个配置文件里