Created
February 15, 2017 13:54
-
-
Save andreiavram/eedd9b43eab6cddd5974337954fd754a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# every day I'm sovelling - script to move all messages from all queues | |
# to a different rabbitmq server using dynamic shovels | |
# needs at least rabbitmq 3.3.0 to work, that's when dynamic shovels were added | |
import requests | |
import json | |
class RabbitMQError(Exception): | |
pass | |
class RabbitMQApiQueue: | |
def __init__(self, *args, **kwargs): | |
for k, v in kwargs.items(): | |
if not k.startswith("_"): | |
setattr(self, k, v) | |
class RabbitMQApi: | |
server = "" | |
port = "" | |
user = "" | |
password = "" | |
vhost = "" | |
def __init__(self, server, user, password, vhost, web_port=15672, amqp_port=5672, *args, **kwargs): | |
self.server = server | |
self.web_port = web_port | |
self.amqp_port = amqp_port | |
self.user = user | |
self.password = password | |
self.vhost = vhost | |
self._queues = [] | |
@property | |
def api_base(self): | |
return "http://{}:{}@{}:{}/api".format(self.user, self.password, self.server, self.web_port) | |
@property | |
def amqp_conn_string(self): | |
return "amqp://{}:{}@{}:{}/{}".format(self.user, self.password, self.server, self.amqp_port, self.vhost) | |
def get_urls(self, action, **kwargs): | |
url = self.api_base | |
config = { | |
"list_queues": "/queues/{vhost}", | |
"create_shovel": "/parameters/shovel/{vhost}/{shovel_name}" | |
} | |
kwargs['vhost'] = self.vhost | |
url = url + config.get(action).format(**kwargs) | |
return url | |
def get_queues(self): | |
r = requests.get(self.get_urls("list_queues")) | |
if r.status_code != 200: | |
raise RabbitMQError("Something went wrong! Response {}, message {}".format(r.status_code, r.json())) | |
queue_list = [RabbitMQApiQueue(**q) for q in r.json()] | |
return queue_list | |
def create_shovel(self, source_queue, destination_rabbitmq, destination_queue=None, shovel_name=None): | |
if not shovel_name: | |
shovel_name = source_queue.name | |
destination_queue_name = destination_queue.name if destination_queue else source_queue.name | |
url = self.get_urls("create_shovel", shovel_name=shovel_name) | |
data = {"value": { | |
"src-uri": self.amqp_conn_string, | |
"src-queue": source_queue.name, | |
"dest-uri": destination_rabbitmq.amqp_conn_string, | |
"dest-queue": destination_queue_name | |
}} | |
r = requests.put(url, json=data) | |
if r.status_code != 204: | |
raise RabbitMQError("Shovel {} status {}: {}".format(shovel_name, r.status_code, r.text)) | |
else: | |
print "Shovel {} status {} CREATED".format(shovel_name, r.status_code) | |
def shovel_all(self, destination_rabbitmq): | |
self._queues = self.get_queues() | |
print "TOTAL {} queues".format(len(self._queues)) | |
for q in self._queues: | |
if not q.auto_delete: | |
self.create_shovel(q, destination_rabbitmq) | |
if __name__ == "__main__": | |
# src = RabbitMQApi("172.17.0.2", "guest", "guest", "test") | |
# dst = RabbitMQApi("172.17.0.3", "guest", "guest", "test2") | |
src.shovel_all(dst) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment