• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spring RabbitMQ配置多数据源——使用RoutingConnectionFactory

武飞扬头像
小伙很稳健
帮助1

环境配置

spring:2.2.4.RELEASE
文档:Spring-AMQP文档翻译

RabbitMQ配置

 @Bean("connectionFactory1")
    public ConnectionFactory rabbitConnectionFactory1() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(rabbitProperties.getAddresses());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean("connectionFactory2")
    public ConnectionFactory rabbitConnectionFactory2(ConnectionNameStrategy cns) {
        ...
        return connectionFactory;
    }

    @Bean("simpleRoutingConnectionFactory")
    @Primary
    public ConnectionFactory routingConnectionFactory(@Qualifier("connectionFactory1")ConnectionFactory connectionFactory1,
                                                      @Qualifier("connectionFactory2")ConnectionFactory connectionFactory2) {
        SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
        Map<Object,ConnectionFactory> map = new HashMap<>();
        map.put("business1",connectionFactory1);
        map.put("business2",connectionFactory2);
        simpleRoutingConnectionFactory.setTargetConnectionFactories(map);
        return simpleRoutingConnectionFactory;
    }
学新通

这里配置了两个不同的连接工厂,并且RoutingConnectionFactory的lookup key的指定为不同的业务。

SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(),routingKey);
rabbitTemplate.convertAndSend(exchange, routingKey, sendContent, (message) -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
},
correlationData);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());

源码跟踪
rabbitTemplate send方法

	public void send(final String exchange, final String routingKey,
			final Message message, @Nullable final CorrelationData correlationData)
			throws AmqpException {
		execute(channel -> {...}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
	}

obtainTargetConnectionFactory方法使用用户设置的(RabbitTemplate数量)SpEL表达式和message来计算出lookup key在查找对应的connectionFactory。这里我们没有单独配置。所以仍然得到的仍然是RoutingConnectionFactory。
接下来会执行到创建connection方法。

if (isChannelTransacted()) {
			...
}
else {
//这里usePublisherConnection默认为false
connection = ConnectionFactoryUtils.createConnection(connectionFactory,
						this.usePublisherConnection); 
}

//创建方法
	public static Connection createConnection(final ConnectionFactory connectionFactory,
			final boolean publisherConnectionIfPossible) {

		if (publisherConnectionIfPossible) {
			ConnectionFactory publisherFactory = connectionFactory.getPublisherConnectionFactory();
			if (publisherFactory != null) {
				return publisherFactory.createConnection();
			}
		}
		return connectionFactory.createConnection();
	}

//创建方法有三个实现类,这里使用AbstractRoutingConnectionFactory的
protected ConnectionFactory determineTargetConnectionFactory() {
		//从这查找到lookup key,也就是配置那里bind的。
		Object lookupKey = determineCurrentLookupKey();
		ConnectionFactory connectionFactory = null;
		if (lookupKey != null) {
			connectionFactory = this.targetConnectionFactories.get(lookupKey);
		}
		if (connectionFactory == null && (this.lenientFallback || lookupKey == null)) {
			connectionFactory = this.defaultTargetConnectionFactory;
		}
		if (connectionFactory == null) {
			throw new IllegalStateException("Cannot determine target ConnectionFactory for lookup key ["
					  lookupKey   "]");
		}
		return connectionFactory;
	}
学新通

多listener异常解决

以上源码还是很简单的。不过需要注意的是,此时项目中未配置listener。
如果配置了listener可能会看到类似的异常。
Cannot determine target ConnectionFactory for lookup key [null]
这是由于SimpleMessageListenerContainer初始化要声明队列引起的,虽然我没可以在配置时进行如下配置,并指定connectionFactory:

    @Bean("myListener1")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory1(@Qualifier("connectionFactory1")ConnectionFactory
                                                                                     connectionFactory) {
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
        return simpleRabbitListenerContainerFactory;
    }
// 
    @RabbitListener(queues = {TestMqConfig.QUEUE_MQ_TEST},containerFactory = "myListener1")

但是实际报错的原因是因为初始化时使用的是RabbitAdmin,这个RabbitAdmin也是用的rabbitTempla进行操作,也就是说仍然用的RoutingConnectionFactory。

java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
	at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:120)
	at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:98)
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.createConnection(ConnectionFactoryUtils.java:214)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2095)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2068)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2048)
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueInfo(RabbitAdmin.java:407)
	at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:391)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.attemptDeclarations(AbstractMessageListenerContainer.java:1830)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.redeclareElementsIfNecessary(AbstractMessageListenerContainer.java:1811)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1337)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183)
	at java.lang.Thread.run(Thread.java:745)

找到了问题所在,现在来试试怎么解决这个问题。

方案一 手动配置RabbitAdmin

    @Bean
    public RabbitAdmin rabbitAdmin(@Qualifier("connectionFactory1")ConnectionFactory
                                               connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        return rabbitAdmin;
    }

这个方案不可行,因为初始化BlockingQueueConsumer时发现connectionFactory还是Routing的所以还是有问题。
具体原因暂时不知道,后面看到了再补充

方案二 配置lookup key

map.put("[queue.mq.test,test]",connectionFactory1);

这里直接将每个listenner监听的队列名(如果一个listenner监听了多个队列就要如上面代码所示填充多个)

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggjjga
系列文章
更多 icon
同类精品
更多 icon
继续加载