Skip to content

Instantly share code, notes, and snippets.

@mlui
Created February 8, 2011 23:14

Revisions

  1. mlui created this gist Feb 8, 2011.
    102 changes: 102 additions & 0 deletions AutoRetryConnectionFactory.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,102 @@
    /*
    * Copyright 2002-2010 the original author or authors.
    *
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */

    package com.shopzilla.amqp.core;

    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ShutdownListener;
    import com.rabbitmq.client.ShutdownSignalException;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
    import org.springframework.beans.factory.annotation.Required;

    import java.io.IOException;

    /**
    * @author Mark Lui
    * @since Jan 5, 2011
    */
    public class AutoRetryConnectionFactory extends SingleConnectionFactory {
    private static final Log LOG = LogFactory.getLog(AutoRetryConnectionFactory.class);

    private long retryDelay;

    private AutoRetryMessageListenerContainer container;

    public AutoRetryConnectionFactory(String hostName) {
    super(hostName);
    }

    @Override
    protected void prepareConnection(Connection con) throws IOException {
    //Add a listener whenever the connection to RabbitMQ breaks
    con.addShutdownListener(new AutoRetryShutdownListener(container, retryDelay, this));
    }

    static class AutoRetryShutdownListener implements ShutdownListener {

    private long retryDelay;
    private AutoRetryMessageListenerContainer container;
    private SingleConnectionFactory connectionFactory;

    public AutoRetryShutdownListener(AutoRetryMessageListenerContainer container, long retryDelay,
    SingleConnectionFactory connectionFactory) {
    this.container = container;
    this.retryDelay = retryDelay;
    this.connectionFactory = connectionFactory;
    }

    public void shutdownCompleted(ShutdownSignalException cause) {
    //Need to check reason to determine if reconnection logic should run
    String exceptionMessage = cause.getMessage();
    //If clean connection shutdown do not run reconnection code
    if (exceptionMessage.indexOf("clean connection shutdown") < 0) {
    boolean containerDown = true;

    while (containerDown) {
    try {
    //Container must be shutdown to allow a restart
    if (container != null) {
    container.shutdown();
    container.start();
    } else {
    connectionFactory.resetConnection();
    }
    containerDown = false;
    } catch (Throwable ex) {
    LOG.warn(String.format("Problem connecting with RabbitMQ waiting %d ms", retryDelay));
    try {
    Thread.sleep(retryDelay);
    } catch (InterruptedException e) {
    //do nothing
    }
    }
    }
    }
    }
    }

    public void setContainer(AutoRetryMessageListenerContainer container) {
    this.container = container;
    }

    @Required
    public void setRetryDelay(long retryDelay) {
    this.retryDelay = retryDelay;
    }

    }
    127 changes: 127 additions & 0 deletions AutoRetryMessageListenerContainer.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,127 @@
    /*
    * Copyright 2002-2010 the original author or authors.
    *
    * Licensed under the Apache License, Version 2.0 (the "License");
    * you may not use this file except in compliance with the License.
    * You may obtain a copy of the License at
    *
    * http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */

    package com.shopzilla.amqp.core;

    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Required;
    import org.springframework.util.ReflectionUtils;

    import java.lang.reflect.Field;
    import java.util.LinkedList;
    import java.util.List;

    /**
    * @author Mark Lui
    * @since Jan 5, 2011
    */
    public class AutoRetryMessageListenerContainer extends SimpleMessageListenerContainer {
    private static final Log LOG = LogFactory.getLog(AutoRetryMessageListenerContainer.class);

    private long retryDelay;
    private List<Queue> targetQueue;
    private RabbitAdmin amqpAdmin;

    public AutoRetryMessageListenerContainer() {
    super();
    }

    public AutoRetryMessageListenerContainer(AutoRetryConnectionFactory connectionFactory) {
    super(connectionFactory);
    connectionFactory.setContainer(this);
    }

    /**
    * Capture exceptions from property set if RabbitMQ is down on startup. Will retry until reconnect is possible.
    */
    @Override
    public void afterPropertiesSet() {
    boolean started = false;
    while (!started) {
    try {
    super.afterPropertiesSet();
    for(Queue queue: targetQueue) {
    amqpAdmin.declareQueue(queue);
    }
    started = true;
    } catch (Throwable ex) {
    LOG.warn(String.format("Problem connecting with RabbitMQ waiting %d ms", retryDelay));
    try {
    Thread.sleep(retryDelay);
    } catch (InterruptedException e) {
    //do nothing
    }
    }
    }
    }

    /**
    * Augmented shutdown code to fix issues with current RabbitMQ container.
    * This may be unnecessary in later versions of Spring AMQP support after 1.0.0.M1
    */
    @Override
    public void shutdown() {
    ((SingleConnectionFactory)getConnectionFactory()).resetConnection();
    super.shutdown();
    try {
    Field consumersField = ReflectionUtils.findField(this.getClass(), "consumers");
    ReflectionUtils.makeAccessible(consumersField);
    consumersField.set(this, null);
    Field channelsField = ReflectionUtils.findField(this.getClass(), "channels");
    ReflectionUtils.makeAccessible(channelsField);
    channelsField.set(this, null);
    } catch (IllegalAccessException ex) {
    throw new RuntimeException(ex);
    }
    }


    @Required
    public void setRetryDelay(long retryDelay) {
    this.retryDelay = retryDelay;
    }

    public void setTargetQueue(Queue queue) {
    if(targetQueue == null) {
    targetQueue = new LinkedList<Queue>();
    }
    targetQueue.add(queue);
    }

    public void setTargetQueueList(List<Queue> queues) {
    targetQueue = queues;
    }

    @Required
    public void setAmqpAdmin(RabbitAdmin amqpAdmin) {
    this.amqpAdmin = amqpAdmin;
    }

    /**
    * Wrapper final method isRunning to allow for unit testing
    * @return boolean is container is running
    */
    public boolean isEnabled() {
    return super.isRunning();
    }

    }