patkua@work

Clearing In-Memory Messages held in ActiveMQ

One of the major troubles with integration tests are the tradeoffs you get with speed and feedback. As systems get larger, the more integration tests you have, and the more chance you might preserve state. I’ve been fighting a build inherited by our team where intermittent integration tests prove troublesome. In the next series of posts, I’ll detail some of the issues we’ve found and some of the fixes.

Our current system uses JMS for messaging and one of the first stateful problems we found were messages being left over from previous test runs in our ActiveMQ instance. We had a few options for doing it out of memory including a purge script and a call to deleteAllMessages though the latter failed because we stored messages only in memory.

Instead we worked out how it stores messages, and wrote a class to go through and clean them all up.

public class ActiveMqHelper {

    public void removeAllMessages() {
        Map<String,BrokerService> brokers = BrokerRegistry
                .getInstance().getBrokers();
        try {
            for (BrokerService brokerService : brokers.values()) {
                Broker broker = brokerService.getBroker();
                new ActiveMQBrokerExtension(broker).clearAllMessages();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private class ActiveMQBrokerExtension {
        private final Broker broker;

        public ActiveMQBrokerExtension(Broker broker) {
            this.broker = broker;
        }

        public void clearAllMessages() throws Exception {
            Map<ActiveMQDestination, Destination> destinationMap 
                = broker.getDestinationMap();
            for (Destination destination : destinationMap.values()) {
                ActiveMQDestination activeMQDestination 
                        = destination.getActiveMQDestination();
                if (activeMQDestination.isTopic()) {
                    clearAllMessages((Topic)destination);
                } else if (activeMQDestination.isQueue()) {
                    clearAllMessages((Queue) destination);
                }
            }
        }

        private void clearAllMessages(Topic topic) throws IOException {
            List<Subscription> consumers = topic.getConsumers();
            for (Subscription consumer : consumers) {
                ConnectionContext consumerContext = consumer.getContext();
                MessageStore messageStore = topic.getMessageStore();
                messageStore.removeAllMessages(consumerContext);
            }
        }
        private void clearAllMessages(Queue queue) throws Exception {
            queue.purge();
        }
    }
}

Someone might one day benefit from this.

Exit mobile version