Spring RabbitMQ配置多数据源——使用RoutingConnectionFactory
环境配置
spring:2.2.4.RELEASE
文档:Spring-AMQP文档翻译
RabbitMQ配置
@Bean("connectionFactory1")
public ConnectionFactory rabbitConnectionFactory1() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(rabbitProperties.getAddresses());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean("connectionFactory2")
public ConnectionFactory rabbitConnectionFactory2(ConnectionNameStrategy cns) {
...
return connectionFactory;
}
@Bean("simpleRoutingConnectionFactory")
@Primary
public ConnectionFactory routingConnectionFactory(@Qualifier("connectionFactory1")ConnectionFactory connectionFactory1,
@Qualifier("connectionFactory2")ConnectionFactory connectionFactory2) {
SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
Map<Object,ConnectionFactory> map = new HashMap<>();
map.put("business1",connectionFactory1);
map.put("business2",connectionFactory2);
simpleRoutingConnectionFactory.setTargetConnectionFactories(map);
return simpleRoutingConnectionFactory;
}
这里配置了两个不同的连接工厂,并且RoutingConnectionFactory的lookup key的指定为不同的业务。
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(),routingKey);
rabbitTemplate.convertAndSend(exchange, routingKey, sendContent, (message) -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
correlationData);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
源码跟踪
rabbitTemplate send方法
public void send(final String exchange, final String routingKey,
final Message message, @Nullable final CorrelationData correlationData)
throws AmqpException {
execute(channel -> {...}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
}
obtainTargetConnectionFactory方法使用用户设置的(RabbitTemplate数量)SpEL表达式和message来计算出lookup key在查找对应的connectionFactory。这里我们没有单独配置。所以仍然得到的仍然是RoutingConnectionFactory。
接下来会执行到创建connection方法。
if (isChannelTransacted()) {
...
}
else {
//这里usePublisherConnection默认为false
connection = ConnectionFactoryUtils.createConnection(connectionFactory,
this.usePublisherConnection);
}
//创建方法
public static Connection createConnection(final ConnectionFactory connectionFactory,
final boolean publisherConnectionIfPossible) {
if (publisherConnectionIfPossible) {
ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
if (publisherFactory != null) {
return publisherFactory.createConnection();
}
}
return connectionFactory.createConnection();
}
//创建方法有三个实现类,这里使用AbstractRoutingConnectionFactory的
protected ConnectionFactory determineTargetConnectionFactory() {
//从这查找到lookup key,也就是配置那里bind的。
Object lookupKey = determineCurrentLookupKey();
ConnectionFactory connectionFactory = null;
if (lookupKey != null) {
connectionFactory = this.targetConnectionFactories.get(lookupKey);
}
if (connectionFactory == null && (this.lenientFallback || lookupKey == null)) {
connectionFactory = this.defaultTargetConnectionFactory;
}
if (connectionFactory == null) {
throw new IllegalStateException("Cannot determine target ConnectionFactory for lookup key ["
lookupKey "]");
}
return connectionFactory;
}
多listener异常解决
以上源码还是很简单的。不过需要注意的是,此时项目中未配置listener。
如果配置了listener可能会看到类似的异常。
Cannot determine target ConnectionFactory for lookup key [null]
这是由于SimpleMessageListenerContainer
初始化要声明队列引起的,虽然我没可以在配置时进行如下配置,并指定connectionFactory:
@Bean("myListener1")
public SimpleRabbitListenerContainerFactory listenerContainerFactory1(@Qualifier("connectionFactory1")ConnectionFactory
connectionFactory) {
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
return simpleRabbitListenerContainerFactory;
}
//
@RabbitListener(queues = {TestMqConfig.QUEUE_MQ_TEST},containerFactory = "myListener1")
但是实际报错的原因是因为初始化时使用的是RabbitAdmin,这个RabbitAdmin也是用的rabbitTempla进行操作,也就是说仍然用的RoutingConnectionFactory。
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2068)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2048)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1830)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1811)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183)
at java.lang.Thread.run(Thread.java:745)
找到了问题所在,现在来试试怎么解决这个问题。
方案一 手动配置RabbitAdmin
@Bean
public RabbitAdmin rabbitAdmin(@Qualifier("connectionFactory1")ConnectionFactory
connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
这个方案不可行,因为初始化BlockingQueueConsumer时发现connectionFactory还是Routing的所以还是有问题。
具体原因暂时不知道,后面看到了再补充
方案二 配置lookup key
map.put("[queue.mq.test,test]",connectionFactory1);
这里直接将每个listenner监听的队列名(如果一个listenner监听了多个队列就要如上面代码所示填充多个)
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggjjga
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13