-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Milestone
Description
Version 6.2.1
pom dependency
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>6.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>6.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.2.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
bean
@Bean(name = "adapter")
public MessageProducerSupport mqttInbound( MqttConnectionOptions mqttConnectionOptions, String mqttClientId) {
Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(
mqttConnectionOptions, mqttClientId.concat("_consumer"),
mqttProperties.receiveTopicsName());
adapter.connectComplete(true);
adapter.setPayloadType(String.class);
adapter.setManualAcks(false);
adapter.setOutputChannel(inboundChannel);
return adapter;
}
bean inject and use
@Resource
private Mqttv5PahoMessageDrivenChannelAdapter adapter;
@Override
public void subscribe(String topic) {
adapter.addTopic(topic);
}
Describe the bug
Follow the code to run, adapter.addTopic(topic) will throw IndexOutOfBoundsException
Caused by: java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266)
at java.base/java.util.Objects.checkIndex(Objects.java:359)
at java.base/java.util.ArrayList.get(ArrayList.java:427)
at org.eclipse.paho.mqttv5.client.MqttAsyncClient.subscribe(MqttAsyncClient.java:1276)
at org.eclipse.paho.mqttv5.client.MqttAsyncClient.subscribe(MqttAsyncClient.java:1205)
at org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter.addTopic(Mqttv5PahoMessageDrivenChannelAdapter.java:279)
at org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter.addTopic(AbstractMqttMessageDrivenChannelAdapter.java:293)
at com.uatair.airport.driver.mqtt.service.impl.MqttTopicServiceImpl.subscribe(MqttTopicServiceImpl.java:20)
at com.uatair.airport.driver.mqtt.status.StatusHandler.productStatusTopo(StatusHandler.java:52)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169)
at org.springframework.integration.handler.support.IntegrationInvocableHandlerMethod.doInvoke(IntegrationInvocableHandlerMethod.java:45)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1086)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:569)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:482)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:360)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114)
... 67 more
The error location according to IndexOutOfBoundsException in Mqttv5PahoMessageDrivenChannelAdapter at line 279
this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived).waitForCompletion(getCompletionTimeout());
Lines 273 to 289 in 0c83eaf
| @Override | |
| public void addTopic(String topic, int qos) { | |
| this.topicLock.lock(); | |
| try { | |
| super.addTopic(topic, qos); | |
| if (this.mqttClient != null && this.mqttClient.isConnected()) { | |
| this.mqttClient.subscribe(new MqttSubscription(topic, qos), this::messageArrived) | |
| .waitForCompletion(getCompletionTimeout()); | |
| } | |
| } | |
| catch (MqttException ex) { | |
| throw new MessagingException("Failed to subscribe to topic " + topic, ex); | |
| } | |
| finally { | |
| this.topicLock.unlock(); | |
| } | |
| } |
in AbstractMqttMessageDrivenChannelAdapter at line 293
Lines 281 to 299 in 0c83eaf
| /** | |
| * Add a topic (or topics) to the subscribed list (qos=1). | |
| * @param topics The topics. | |
| * @throws MessagingException if the topics is already in the list. | |
| * @since 4.1 | |
| */ | |
| @ManagedOperation | |
| public void addTopic(String... topics) { | |
| validateTopics(topics); | |
| this.topicLock.lock(); | |
| try { | |
| for (String t : topics) { | |
| addTopic(t, 1); | |
| } | |
| } | |
| finally { | |
| this.topicLock.unlock(); | |
| } | |
| } |
To Reproduce
as mentioned above
Expected behavior
adapter.addTopic(topic); success to subscribe the topic