patkua@work

The intersection of technology and leadership

Clearing In-Memory Messages held in ActiveMQ

One of the major troubles with integration tests are the tradeoffs you get with speed and feedback. As systems get larger, the more integration tests you have, and the more chance you might preserve state. I’ve been fighting a build inherited by our team where intermittent integration tests prove troublesome. In the next series of posts, I’ll detail some of the issues we’ve found and some of the fixes.

Our current system uses JMS for messaging and one of the first stateful problems we found were messages being left over from previous test runs in our ActiveMQ instance. We had a few options for doing it out of memory including a purge script and a call to deleteAllMessages though the latter failed because we stored messages only in memory.

Instead we worked out how it stores messages, and wrote a class to go through and clean them all up.

public class ActiveMqHelper {

    public void removeAllMessages() {
        Map<String,BrokerService> brokers = BrokerRegistry
                .getInstance().getBrokers();
        try {
            for (BrokerService brokerService : brokers.values()) {
                Broker broker = brokerService.getBroker();
                new ActiveMQBrokerExtension(broker).clearAllMessages();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private class ActiveMQBrokerExtension {
        private final Broker broker;

        public ActiveMQBrokerExtension(Broker broker) {
            this.broker = broker;
        }

        public void clearAllMessages() throws Exception {
            Map<ActiveMQDestination, Destination> destinationMap 
                = broker.getDestinationMap();
            for (Destination destination : destinationMap.values()) {
                ActiveMQDestination activeMQDestination 
                        = destination.getActiveMQDestination();
                if (activeMQDestination.isTopic()) {
                    clearAllMessages((Topic)destination);
                } else if (activeMQDestination.isQueue()) {
                    clearAllMessages((Queue) destination);
                }
            }
        }

        private void clearAllMessages(Topic topic) throws IOException {
            List<Subscription> consumers = topic.getConsumers();
            for (Subscription consumer : consumers) {
                ConnectionContext consumerContext = consumer.getContext();
                MessageStore messageStore = topic.getMessageStore();
                messageStore.removeAllMessages(consumerContext);
            }
        }
        private void clearAllMessages(Queue queue) throws Exception {
            queue.purge();
        }
    }
}

Someone might one day benefit from this.

5 Comments

  1. Thanks for posting this Pat!

    Anwar and I are trying to fix up some old tests at the moment that will benefit from this.

  2. Good luck with fixing them. Hope it helps you.

  3. Hey Pat,

    Having some trouble with this.

    We don’t seem to have the purge method on our Queue interface (this is the jms interface)

    Could you show us the imports please, and what version of activeMq are you using

    Thanks

  4. Here’s the imports

    import org.apache.activemq.broker.Broker;
    import org.apache.activemq.broker.BrokerRegistry;
    import org.apache.activemq.broker.BrokerService;
    import org.apache.activemq.broker.ConnectionContext;
    import org.apache.activemq.broker.region.Destination;
    import org.apache.activemq.broker.region.Queue;
    import org.apache.activemq.broker.region.Subscription;
    import org.apache.activemq.broker.region.Topic;
    import org.apache.activemq.command.ActiveMQDestination;
    import org.apache.activemq.store.MessageStore;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.Map;
    

    I think the version of ActiveMQ we’re using is 5.4.2 (which looking at their website is a little out of date now)

  5. That helped.
    Thanks Pat 🙂

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

© 2019 patkua@work

Theme by Anders NorenUp ↑