We're using ActiveMQ Artemis 2.8.1 with MQTT.
We integrated ActiveMQ Artemis with Wildfly. Consider our server connects using client id SAM
on 50 topics. On checking Artemis with JConsole we can see that each client subscription results in a queue whose name follows the pattern <client-id>_<topic>
. In my case consider topic as com/api/output
which means the subscription queue name will be SAM_com/api/output
. Likewise there will be 50 other subscription queues using the same naming pattern (i.e. SAM_<topic>
).
My findings
Based on my research each queue is used to store messages sent to that topic for each client's subscription. For example, when the same topic (e.g 1/2/3
) is subscribed to by 3 different clients (e.g A
,B
,& C
) then there will be 3 subscription queues (i.e. A_1/2/3
, B_1/2/3
, & C_1/2/3
). Therefore when a message is sent to the topic 1/2/3
Artemis will put that messages in the subscription queues A_1/2/3
,B_1/2/3
, C_1/2/3
.
Actual problem
Now same client wants to connect to broker with different client id now (e.g. TOM
). My Client initiates connection drop and Artemis also recognizes connection drop, then my client connects to broker with new client id (TOM
) for the same 50 topics. Now there will be 100 subscription queue total with each topic having 2 (i.e. one for each clientid - SAM
& TOM
). I find the reason that SAM
queues are maintained because while initiating the connection we use cleanSession
as false
. So all those subscription queues will be durable, hence the queues are maintained even if the client is disconnected.
When a message is sent to the topic it will be put in two queues (SAM
& TOM
). Our client is connected to broker with client id TOM
so the TOM
queue has consumer which results in all the TOM
queue messages being consumed by the client. However, the SAM
queue accumulates messages and eats up all JVM's heapspace until the server dies.
The purpose of durable queues is to maintain the message even when the client disconnects, but is there any way to tell ActiveMQ Artemis to purge the client's queues and messages if client doesn't show up for certain time period or to purge the messages from the client's subscription queue when client drops the connection?
Our broker.xml
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">
<name>0.0.0.0</name>
<security-enabled>true</security-enabled>
<persistence-enabled>false</persistence-enabled>
<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>ASYNCIO</journal-type>
<paging-directory>data/paging</paging-directory>
<bindings-directory>data/bindings</bindings-directory>
<journal-directory>data/journal</journal-directory>
<large-messages-directory>data/large-messages</large-messages-directory>
<journal-datasync>true</journal-datasync>
<journal-min-files>2</journal-min-files>
<journal-pool-files>10</journal-pool-files>
<journal-file-size>10M</journal-file-size>
<journal-buffer-timeout>64000</journal-buffer-timeout>
<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>4096</journal-max-io>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->
<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->
<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->
<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>99</max-disk-usage>
<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>
<critical-analyzer-timeout>120000</critical-analyzer-timeout>
<critical-analyzer-check-period>60000</critical-analyzer-check-period>
<critical-analyzer-policy>HALT</critical-analyzer-policy>
<acceptors>
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->
<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->
<!-- MQTT Acceptor -->
<acceptor name="mqtt">tcp://0.0.0.0:${activeMQ-mqtt-port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,MQTT;useEpoll=true</acceptor>
<acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:${activeMQ-mqtts-port}?sslEnabled=true;keyStorePath=${activeMQ-keystore-path};keyStorePassword=${activeMQ-keyStore-password};protocols=CORE,MQTT</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-create-jms-queues>true</auto-create-jms-queues>
<auto-create-jms-topics>true</auto-create-jms-topics>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="tru
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…