一、创建数据表
CREATE TABLE `rkmq_consumer_config` (\r\n `consumer_id` varchar(64) NOT NULL,\r\n `consumer_name` varchar(128) NOT NULL,\r\n `comsumer_desc` varchar(256) DEFAULT NULL,\r\n `access_key` varchar(128) DEFAULT NULL,\r\n `secret_key` varchar(128) DEFAULT NULL,\r\n `ons_addr` varchar(256) DEFAULT NULL,\r\n `enabled` varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',\r\n `create_date` datetime NOT NULL,\r\n `update_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,\r\n PRIMARY KEY (`consumer_id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消费者配置';\r\n\r\nCREATE TABLE `rkmq_listener` (\r\n `listener_id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',\r\n `listener_name` varchar(128) NOT NULL,\r\n `listener_desc` varchar(256) DEFAULT NULL,\r\n `consumer_id` varchar(64) DEFAULT NULL COMMENT 'consumer配置id',\r\n `topic` varchar(32) NOT NULL,\r\n `tag` varchar(32) NOT NULL,\r\n `listener_class_name` varchar(256) NOT NULL COMMENT '消费者处理类',\r\n `create_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,\r\n `update_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,\r\n `enabled` varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',\r\n `data_source` varchar(256) DEFAULT NULL,\r\n PRIMARY KEY (`listener_id`)\r\n) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COMMENT='消费者监听配置';\r\n\r\nCREATE TABLE `rkmq_producer_config` (\r\n `producer_id` varchar(64) NOT NULL,\r\n `producer_name` varchar(128) NOT NULL,\r\n `producer_desc` varchar(256) DEFAULT NULL,\r\n `access_key` varchar(128) DEFAULT NULL,\r\n `secret_key` varchar(128) DEFAULT NULL,\r\n `ons_addr` varchar(256) DEFAULT NULL,\r\n `enabled` varchar(1) NOT NULL DEFAULT '1' COMMENT '是否激活 0是 1否',\r\n `create_date` datetime NOT NULL,\r\n `update_date` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,\r\n PRIMARY KEY (`producer_id`)\r\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='生产者配置';
二、创建管理接口
package com.sgcc.mq;\r\n\r\nimport javax.sql.DataSource;\r\n\r\npublic interface Manager {\r\n \r\n /**\r\n * 启动所有\r\n */\r\n public void start();\r\n \r\n /**\r\n * 关闭所有\r\n */\r\n public void shutdown();\r\n \r\n /**\r\n * 开启某个\r\n * @param id\r\n */\r\n public void start(String id);\r\n \r\n /**\r\n * 关闭某个\r\n * @param id\r\n */\r\n public void shutdown(String id);\r\n \r\n /**\r\n * 重启某个\r\n * @param id\r\n */\r\n public void restart(String id);\r\n \r\n /**\r\n * 启用某个\r\n * @param id\r\n */\r\n public void enabled(String id);\r\n \r\n /**\r\n * 禁用某个\r\n * @param id\r\n */\r\n public void disabled(String id);\r\n \r\n /**\r\n * 是否开启\r\n * @param id\r\n */\r\n public boolean isStart(String id);\r\n \r\n /**\r\n * 设置数据源\r\n * @param dataSource\r\n */\r\n public void setDataSource(DataSource dataSource);\r\n}
三、创建Consumer和Producer的管理类
package com.sgcc.mq;\r\n\r\nimport java.util.ArrayList;\r\nimport java.util.HashMap;\r\nimport java.util.List;\r\nimport java.util.Map;\r\nimport java.util.Properties;\r\n\r\nimport javax.sql.DataSource;\r\n\r\nimport org.slf4j.Logger;\r\nimport org.slf4j.LoggerFactory;\r\nimport org.springframework.beans.BeansException;\r\nimport org.springframework.beans.factory.InitializingBean;\r\nimport org.springframework.context.ApplicationContext;\r\nimport org.springframework.context.ApplicationContextAware;\r\nimport org.springframework.jdbc.core.JdbcTemplate;\r\n\r\nimport com.aliyun.openservices.ons.api.MessageListener;\r\nimport com.aliyun.openservices.ons.api.PropertyKeyConst;\r\nimport com.aliyun.openservices.ons.api.bean.ConsumerBean;\r\nimport com.aliyun.openservices.ons.api.bean.Subscription;\r\nimport com.sgcc.exception.ConsumerException;\r\n\r\n/**\r\n * \r\n * \r\n * <p>\r\n * Title: ConsumerManager\r\n * </p>\r\n * \r\n * <p>\r\n * Description: ConsumerBean管理器,从数据库中读取配置并实例化ConsumerBean,一个或多个\r\n * </p>\r\n * \r\n * @author mengjinyuan\r\n * \r\n * @date 2019年3月14日\r\n */\r\npublic class ConsumerManager implements ApplicationContextAware, InitializingBean,Manager {\r\n\r\n private ConsumerBean consumerBean;\r\n\r\n private static Map<String, ConsumerBean> consumerBeans = new HashMap<>();\r\n\r\n private ApplicationContext applicationContext;\r\n\r\n private JdbcTemplate jdbcTemplate;\r\n\r\n private final static String CID_DEFAULT = "CID_DEFAULT";\r\n\r\n private static Logger logger = LoggerFactory.getLogger(ConsumerManager.class);\r\n\r\n public void setConsumerBean(ConsumerBean consumerBean) {\r\n this.consumerBean = consumerBean;\r\n }\r\n\r\n @Override\r\n public void start() {\r\n if (consumerBeans.size() > 0) {\r\n logger.info("开启所有监听,consumerBean个数:" + consumerBeans.size());\r\n consumerBeans.forEach((consumerId, consumer) -> {\r\n if (!consumer.getSubscriptionTable().isEmpty()) {\r\n start(consumerId);\r\n }\r\n });\r\n }\r\n }\r\n @Override\r\n public void shutdown() {\r\n if (!consumerBeans.isEmpty()) {\r\n logger.info("关闭所有监听");\r\n consumerBeans.forEach((consumerId, consumer) -> {\r\n shutdown(consumerId);\r\n });\r\n }\r\n }\r\n\r\n /**\r\n * 启动某个消费者\r\n * \r\n * @param consumerId\r\n */\r\n @Override\r\n public void start(String consumerId) {\r\n ConsumerBean consumer = consumerBeans.get(consumerId);\r\n logger.info("开启consumer,consumerId=" + consumerId);\r\n if (consumer != null) {\r\n consumer.start();\r\n } else {\r\n logger.warn("consumer不存在,开启失败!,consumerId=" + consumerId);\r\n }\r\n }\r\n\r\n /**\r\n * 关闭某个消费者\r\n * \r\n * @param consumerId\r\n */\r\n @Override\r\n public void shutdown(String consumerId) {\r\n ConsumerBean consumer = consumerBeans.get(consumerId);\r\n logger.info("关闭consumer,consumerId=" + consumerId);\r\n if (consumer != null) {\r\n consumer.shutdown();\r\n } else {\r\n logger.warn("consumer不存在,关闭失败!,consumerId=" + consumerId);\r\n }\r\n }\r\n\r\n /**\r\n * 重启某个消费者\r\n * \r\n * @param consumerId\r\n */\r\n @Override\r\n public void restart(String consumerId) {\r\n logger.info("重启消费者,consumerId"+consumerId);\r\n shutdown(consumerId);\r\n start(consumerId);\r\n logger.info("重启完成");\r\n }\r\n \r\n /**\r\n * 启用某个consumer\r\n */\r\n @Override\r\n public void enabled(String consumerId){\r\n logger.info("启用consumer,consumerId="+consumerId);\r\n ConsumerBean consumer=consumerBeans.get(consumerId);\r\n if(consumer==null){\r\n createConsumerBean(consumerId);\r\n consumer=consumerBeans.get(consumerId);\r\n consumer.start();\r\n }else{\r\n if(consumer.isClosed()){\r\n consumer.start();\r\n }\r\n }\r\n jdbcTemplate.update("update rkmq_consumer_config set enabled='0' where consumer_id=?", consumerId);\r\n jdbcTemplate.update("update rkmq_listener set enabled='0' where consumer_id=?", consumerId);\r\n logger.info("启用成功");\r\n }\r\n \r\n /**\r\n * 禁用某个consumer\r\n */\r\n @Override\r\n public void disabled(String consumerId){\r\n logger.info("禁用consumer,consumerId="+consumerId);\r\n ConsumerBean consumer=consumerBeans.get(consumerId);\r\n if(consumer!=null){\r\n consumer.shutdown();\r\n consumerBeans.remove(consumerId);\r\n jdbcTemplate.update("update rkmq_consumer_config set enabled='1' where consumer_id=?", consumerId);\r\n jdbcTemplate.update("update rkmq_listener set enabled='1' where consumer_id=?", consumerId);\r\n logger.info("禁用成功");\r\n }else{\r\n logger.warn("禁用失败,consumer不存在,consumerId="+consumerId);\r\n }\r\n }\r\n \r\n @Override\r\n public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {\r\n this.applicationContext = applicationContext;\r\n }\r\n\r\n @Override\r\n public void afterPropertiesSet() throws Exception {\r\n logger.info("开始初始化Consumer属性");\r\n /**\r\n * 如果传了consumerBean,就所有的listener都用同一个配置,否则从数据库读取配置\r\n */\r\n if (this.consumerBean == null) {\r\n logger.info("consumerBean为空,开始从数据库查找配置实例化consumerBean并设置属性");\r\n // TODO 从数据库中读取ConsumerBean的配置,动态的配置多个ConsumerBean\r\n createConsumerBean();\r\n } else {\r\n createSubscriptionTable(consumerBean, CID_DEFAULT);\r\n }\r\n }\r\n\r\n public void setDataSource(DataSource dataSource) {\r\n this.jdbcTemplate = new JdbcTemplate(dataSource);\r\n }\r\n\r\n /**\r\n * 创建单个consumer\r\n * @param consumerId\r\n */\r\n private void createConsumerBean(String consumerId) {\r\n String sql = "select * from rkmq_consumer_config where enabled='0' and consumer_id=?";\r\n List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql, consumerId);\r\n logger.info("创建consumer,consumerId=" + consumerId);\r\n if (consumerConfigs.size() > 0) {\r\n consumerConfigs.stream().forEach(consumerConfig -> {\r\n ConsumerBean consumerBean = new ConsumerBean();\r\n Properties properties = new Properties();\r\n properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());\r\n properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());\r\n properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());\r\n properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());\r\n logger.info("连接属性:" + properties);\r\n consumerBean.setProperties(properties);\r\n this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());\r\n });\r\n }else{\r\n throw new ConsumerException("Consumer创建失败,找不到相关配置,consumerId="+consumerId);\r\n }\r\n }\r\n \r\n /**\r\n * 创建所有consumer\r\n */\r\n private void createConsumerBean() {\r\n String sql = "select * from rkmq_consumer_config where enabled='0'";\r\n List<Map<String, Object>> consumerConfigs = jdbcTemplate.queryForList(sql);\r\n if (consumerConfigs.size() > 0) {\r\n consumerConfigs.stream().forEach(consumerConfig -> {\r\n ConsumerBean consumerBean = new ConsumerBean();\r\n Properties properties = new Properties();\r\n properties.put(PropertyKeyConst.ConsumerId, consumerConfig.get("consumer_id").toString());\r\n properties.put(PropertyKeyConst.AccessKey, consumerConfig.get("access_key").toString());\r\n properties.put(PropertyKeyConst.SecretKey, consumerConfig.get("secret_key").toString());\r\n properties.put(PropertyKeyConst.ONSAddr, consumerConfig.get("ons_addr").toString());\r\n logger.info("连接属性:" + properties);\r\n consumerBean.setProperties(properties);\r\n this.createSubscriptionTable(consumerBean, consumerConfig.get("consumer_id").toString());\r\n });\r\n }else{\r\n logger.warn("Consumer创建失败,找不到相关配置");\r\n }\r\n }\r\n /**\r\n * 创建监听类对象\r\n * \r\n * @param consumerBean\r\n * @param consumerId\r\n */\r\n private void createSubscriptionTable(ConsumerBean consumerBean, String consumerId) {\r\n // 用反射生成消息监听对象,配置在数据库中\r\n StringBuilder sql = new StringBuilder("select * from rkmq_listener where enabled='0' ");\r\n List<Map<String, Object>> consumers = new ArrayList<>();\r\n if (consumerId == null || CID_DEFAULT.equals(consumerId)) {\r\n consumers = jdbcTemplate.queryForList(sql.toString());\r\n } else {\r\n sql.append(" and consumer_id=?");\r\n consumers = jdbcTemplate.queryForList(sql.toString(), consumerId);\r\n }\r\n Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();\r\n if (consumers != null && !consumers.isEmpty()) {\r\n consumers.stream().forEach(consumer -> {\r\n Subscription subscription = new Subscription();\r\n subscription.setTopic(consumer.get("topic").toString());\r\n subscription.setExpression(consumer.get("tag").toString());\r\n try {\r\n Class<?> classz = Class.forName(consumer.get("listener_class_name").toString());\r\n logger.info("开始为" + classz.getName() + "创建对象");\r\n MessageListener messageListener = (MessageListener) classz.newInstance();\r\n /**\r\n * 增强bean,让容器之外的bean获得自动注入的能力,这样反射生成的对象也可以用 @Autowire\r\n * 注解了,spring牛逼\r\n * \r\n */\r\n applicationContext.getAutowireCapableBeanFactory().autowireBean(messageListener);\r\n subscriptionTable.put(subscription, messageListener);\r\n } catch (ClassNotFoundException e) {\r\n e.printStackTrace();\r\n } catch (InstantiationException e1) {\r\n e1.printStackTrace();\r\n } catch (IllegalAccessException e2) {\r\n e2.printStackTrace();\r\n }\r\n\r\n });\r\n consumerBean.setSubscriptionTable(subscriptionTable);\r\n consumerBeans.put(consumerId, consumerBean);\r\n logger.info("consumer初始化完成");\r\n } else {\r\n logger.warn("未找到相关Consumer!请检查数据库配置");\r\n }\r\n }\r\n\r\n @Override\r\n public boolean isStart(String consumerId) {\r\n ConsumerBean consumer=consumerBeans.get(consumerId);\r\n if(consumer!=null){\r\n return consumer.isStarted();\r\n }\r\n return false;\r\n }\r\n}
package com.sgcc.mq;\r\n\r\nimport java.util.HashMap;\r\nimport java.util.List;\r\nimport java.util.Map;\r\nimport java.util.Properties;\r\n\r\nimport javax.sql.DataSource;\r\n\r\nimport org.slf4j.Logger;\r\nimport org.slf4j.LoggerFactory;\r\nimport org.springframework.beans.factory.InitializingBean;\r\nimport org.springframework.jdbc.core.JdbcTemplate;\r\n\r\nimport com.aliyun.openservices.ons.api.PropertyKeyConst;\r\nimport com.aliyun.openservices.ons.api.bean.ProducerBean;\r\nimport com.sgcc.exception.ProducerException;\r\n\r\n/**\r\n * \r\n * \r\n * <p>\r\n * Title: ProducerManager\r\n * </p>\r\n * \r\n * <p>\r\n * Description:生产者管理\r\n * </p>\r\n * \r\n * @author mengjinyuan\r\n * \r\n * @date 2019年3月14日\r\n */\r\npublic class ProducerManager implements InitializingBean, Manager {\r\n\r\n private static Map<String, ProducerBean> producerBeans = new HashMap<>();\r\n\r\n protected JdbcTemplate jdbcTemplate;\r\n\r\n private static Logger logger = LoggerFactory.getLogger(ProducerManager.class);\r\n\r\n @Override\r\n public void start() {\r\n if (producerBeans.size() > 0) {\r\n logger.info("开启所有生产者,producerBean个数:" + producerBeans.size());\r\n producerBeans.forEach((producerId, producer) -> {\r\n start(producerId);\r\n });\r\n }\r\n }\r\n\r\n @Override\r\n public void shutdown() {\r\n if (!producerBeans.isEmpty()) {\r\n logger.info("关闭所有生产者");\r\n producerBeans.forEach((producerId, producer) -> {\r\n shutdown(producerId);\r\n });\r\n }\r\n }\r\n\r\n @Override\r\n public void start(String producerId) {\r\n ProducerBean producer = producerBeans.get(producerId);\r\n logger.info("开启producer,producerId=" + producerId);\r\n if (producer != null) {\r\n producer.start();\r\n } else {\r\n logger.warn("producer不存在,开启失败!,producerId=" + producerId);\r\n }\r\n }\r\n\r\n @Override\r\n public void shutdown(String producerId) {\r\n ProducerBean producer = producerBeans.get(producerId);\r\n logger.info("关闭producer,producerId=" + producerId);\r\n if (producer != null) {\r\n producer.shutdown();\r\n } else {\r\n logger.warn("producer不存在,关闭失败!,producerId=" + producerId);\r\n }\r\n\r\n }\r\n\r\n @Override\r\n public void restart(String producerId) {\r\n logger.info("重启消费者,producerId" + producerId);\r\n shutdown(producerId);\r\n start(producerId);\r\n logger.info("重启完成");\r\n }\r\n\r\n @Override\r\n public void enabled(String producerId) {\r\n logger.info("启用producer,producerId=" + producerId);\r\n ProducerBean producer = producerBeans.get(producerId);\r\n if (producer == null) {\r\n createProducer(producerId);\r\n producer = producerBeans.get(producerId);\r\n producer.start();\r\n } else {\r\n if (producer.isClosed()) {\r\n producer.start();\r\n }\r\n }\r\n jdbcTemplate.update("update rkmq_producer_config set enabled='0' where producer_id=?", producerId);\r\n logger.info("启用成功");\r\n }\r\n\r\n @Override\r\n public void disabled(String producerId) {\r\n logger.info("禁用producer,producerId=" + producerId);\r\n ProducerBean producer = producerBeans.get(producerId);\r\n if (producer != null) {\r\n producer.shutdown();\r\n producerBeans.remove(producerId);\r\n jdbcTemplate.update("update rkmq_producer_config set enabled='1' where producer_id=?", producerId);\r\n logger.info("禁用成功");\r\n } else {\r\n logger.warn("禁用失败,producer不存在,producerId=" + producerId);\r\n }\r\n }\r\n\r\n @Override\r\n public void afterPropertiesSet() throws Exception {\r\n logger.info("开始创建生产者");\r\n if (producerBeans.isEmpty()) {\r\n createProducer();\r\n }\r\n logger.info("生产者创建完成");\r\n }\r\n\r\n public void setDataSource(DataSource dataSource) {\r\n this.jdbcTemplate = new JdbcTemplate(dataSource);\r\n }\r\n\r\n public ProducerBean getProducer(String producerId) {\r\n return producerBeans.get(producerId);\r\n }\r\n\r\n public void setProducerBeans(Map<String, ProducerBean> producerBeans) {\r\n ProducerManager.producerBeans = producerBeans;\r\n }\r\n\r\n /**\r\n * 创建所有生产者bean\r\n */\r\n private void createProducer() {\r\n String sql = "select * from rkmq_producer_config where enabled='0'";\r\n List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql);\r\n if (producerConfigs.size() > 0) {\r\n producerConfigs.stream().forEach(producerConfig -> {\r\n ProducerBean producerBean = new ProducerBean();\r\n Properties properties = new Properties();\r\n properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());\r\n properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());\r\n properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());\r\n properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());\r\n logger.info("连接属性:" + properties);\r\n producerBean.setProperties(properties);\r\n producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);\r\n });\r\n } else {\r\n logger.warn("producer创建失败,找不到相关配置");\r\n }\r\n\r\n }\r\n\r\n /**\r\n * 创建一个生产者bean\r\n */\r\n private void createProducer(String producerId) {\r\n String sql = "select * from rkmq_producer_config where enabled='0' and producer_id=?";\r\n List<Map<String, Object>> producerConfigs = jdbcTemplate.queryForList(sql, producerId);\r\n if (producerConfigs.size() > 0) {\r\n producerConfigs.stream().forEach(producerConfig -> {\r\n ProducerBean producerBean = new ProducerBean();\r\n Properties properties = new Properties();\r\n properties.put(PropertyKeyConst.ProducerId, producerConfig.get("producer_id").toString());\r\n properties.put(PropertyKeyConst.AccessKey, producerConfig.get("access_key").toString());\r\n properties.put(PropertyKeyConst.SecretKey, producerConfig.get("secret_key").toString());\r\n properties.put(PropertyKeyConst.ONSAddr, producerConfig.get("ons_addr").toString());\r\n logger.info("连接属性:" + properties);\r\n producerBean.setProperties(properties);\r\n producerBeans.put(producerConfig.get("producer_id").toString(), producerBean);\r\n });\r\n } else {\r\n throw new ProducerException("producer创建失败,找不到相关配置,producerId="+producerId);\r\n }\r\n\r\n }\r\n\r\n @Override\r\n public boolean isStart(String producerId) {\r\n ProducerBean producer = producerBeans.get(producerId);\r\n if (producer != null) {\r\n return producer.isStarted();\r\n }\r\n return false;\r\n }\r\n}
四、创建监听类
package com.sgcc.mq.listener;\r\n\r\nimport java.util.List;\r\n\r\nimport org.slf4j.Logger;\r\nimport org.slf4j.LoggerFactory;\r\nimport org.springframework.beans.factory.annotation.Autowired;\r\n\r\nimport com.aliyun.openservices.ons.api.Action;\r\nimport com.aliyun.openservices.ons.api.ConsumeContext;\r\nimport com.aliyun.openservices.ons.api.Message;\r\nimport com.aliyun.openservices.ons.api.MessageListener;\r\nimport com.sgcc.web.entity.Demo;\r\nimport com.sgcc.web.entity.DemoExample;\r\nimport com.sgcc.web.entity.DemoExample.Criteria;\r\nimport com.sgcc.web.mapper.DemoMapper;\r\n\r\npublic class MyMessageListener implements MessageListener{\r\n \r\n private static Logger logger=LoggerFactory.getLogger(MyMessageListener.class);\r\n \r\n @Override\r\n public Action consume(Message message, ConsumeContext context) {\r\n logger.info("收到消息,message="+message+" context="+context);\r\n return Action.CommitMessage;\r\n }\r\n}
在rkmq_listener表中添加上该类的包路径,这样就可以用反射实例化对象了。
五、spring中配置bean
spring-mq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>\r\n<beans xmlns="http://www.springframework.org/schema/beans"\r\n xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r\n xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">\r\n \r\n <bean id="mqDataSource" class="org.apache.commons.dbcp.BasicDataSource"\r\n destroy-method="close">\r\n <property name="driverClassName" value="${jdbc.driverClassName}" />\r\n <property name="url" value="${jdbc.url}" />\r\n <property name="username" value="${jdbc.username}" />\r\n <property name="password" value="${jdbc.password}" />\r\n <!-- 初始化连接大小 -->\r\n <property name="initialSize" value="${druid.initialSize}"></property>\r\n <!-- 连接池最大数量 -->\r\n <property name="maxActive" value="${druid.maxActive}"></property>\r\n <!-- 连接池最大空闲 -->\r\n <property name="maxIdle" value="${druid.maxIdle}"></property>\r\n <!-- 连接池最小空闲 -->\r\n <property name="minIdle" value="${druid.minIdle}"></property>\r\n <!-- 获取连接最大等待时间 -->\r\n <property name="maxWait" value="${druid.maxWait}"></property>\r\n\r\n <property name="validationQuery" value="${druid.validationQuery}" />\r\n <property name="testOnBorrow" value="${druid.testOnBorrow}" />\r\n </bean>\r\n\r\n\r\n <!-- <bean id="mqConsumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean">\r\n <property name="properties"> 消费者配置信息\r\n <props>\r\n 请替换 CID_mainstation_control\r\n <prop key="ConsumerId">CID_TEST100</prop>\r\n <prop key="AccessKey">xxxxxx</prop>\r\n <prop key="SecretKey">xxxxxx</prop>\r\n <prop key="ONSAddr">http://xxxxx/rocketmq/nsaddr4broker-internal\r\n </prop>\r\n </props>\r\n </property>\r\n </bean> -->\r\n\r\n <!-- ConsumerBean管理类,用法:\r\n 如果注入consumerBean的话数据库中配的所有listener全都使用此配置来监听\r\n 否则从数据表rkmq_consumer_config中获取配置并实例化consumerBean,可以配置多个.\r\n dataSource数据源必须注入,可以设置为mybatis/hibernate的数据源\r\n -->\r\n <bean id="consumerManager" class="com.sgcc.mq.ConsumerManager"\r\n init-method="start" destroy-method="shutdown">\r\n <!-- <property name="consumerBean" ref="mqConsumer"></property> -->\r\n <property name="dataSource" ref="mqDataSource"></property>\r\n </bean>\r\n</beans>
spring-mq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>\r\n<beans xmlns="http://www.springframework.org/schema/beans"\r\n xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"\r\n xsi:schemaLocation="http://www.springframework.org/schema/beans\r\nhttp://www.springframework.org/schema/beans/spring-beans.xsd">\r\n \r\n <bean id="producerManager" class="com.sgcc.mq.ProducerManager"\r\n init-method="start" destroy-method="shutdown">\r\n <property name="dataSource" ref="mqDataSource"></property>\r\n </bean>\r\n</beans>
注:也可以写在一个配置文件里