Created
December 31, 2014 05:29
-
-
Save theanti9/344b878aeab3e0953699 to your computer and use it in GitHub Desktop.
A Durable (non thread safe) Queue class backed by a commit log.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import struct | |
import Queue | |
class QueueItem(object): | |
def __init__(self, msg_id, data, removed=False): | |
self.msg_id = msg_id | |
self.data = data | |
self.removed = removed | |
def serialize(self): | |
data_len = len(self.data) if self.data is not None else 0 | |
fmt = "<2i?" | |
if data_len > 0: | |
fmt += "%ds" % data_len | |
return struct.pack(fmt, data_len, self.msg_id, self.removed, self.data) | |
else: | |
return struct.pack(fmt, data_len, self.msg_id, self.removed) | |
class CommitLog(object): | |
def __init__(self, file_path): | |
self.file_path = file_path | |
self.log_handle = None | |
def open(self): | |
if self.log_handle is not None: | |
raise Exception("Commit Log already open or improperly closed.") | |
self.log_handle = open(self.file_path, "ab") | |
def close(self): | |
self.log_handle.flush() | |
self.log_handle.close() | |
self.log_handle = None | |
def commit(self, queue_item): | |
try: | |
self.log_handle.write(queue_item.serialize()) | |
self.log_handle.flush() | |
return True | |
except Exception as e: | |
print e | |
return False | |
def replay(self): | |
if os.path.isfile(self.file_path): | |
with open(self.file_path, "rb") as read_only: | |
while True: | |
header = read_only.read(9) | |
if header == "" or len(header) < 9: | |
break | |
data_len, msg_id, removed = struct.unpack("<2i?", header) | |
data = None | |
if data_len > 0: | |
data = struct.unpack("<%ds" % data_len, read_only.read(data_len))[0] | |
yield QueueItem(msg_id, data, removed) | |
class DurableQueue(object): | |
def __init__(self, q_name, commit_log_path="."): | |
self.q_name = q_name | |
self.commit_log_path = os.path.join(commit_log_path, q_name) | |
self.commit_log = None | |
self.q = Queue.Queue() | |
self.next_id = 1 | |
def _construct(self): | |
self.commit_log = CommitLog(self.commit_log_path) | |
items = {} | |
for item in self.commit_log.replay(): | |
if item.removed: | |
if item.msg_id in items.keys(): | |
del items[item.msg_id] | |
else: | |
# Need to log this probably? | |
pass | |
else: | |
items[item.msg_id] = item | |
ordered_keys = list(items.keys()) | |
ordered_keys.sort() | |
for key in ordered_keys: | |
self.next_id = key | |
self.q.put_nowait(items[key]) | |
def put(self, data): | |
msg_id = self.next_id | |
self.next_id += 1 | |
item = QueueItem(msg_id, data) | |
try: | |
if not self.commit_log.commit(item): | |
return False | |
except Exception as e: | |
return False | |
try: | |
self.q.put_nowait(item) | |
return True | |
except Exception as e: | |
# Failed to put it in the queue. | |
# Strike it from the commit log | |
item.removed = False | |
return False | |
def pop(self): | |
if self.q.empty(): | |
return None | |
return self.q.get() | |
def done(self, msg_id): | |
try: | |
if not self.commit_log.commit(QueueItem(msg_id, None, True)): | |
return False | |
return True | |
except Exception as e: | |
return False | |
def __enter__(self): | |
self._construct() | |
self.commit_log.open() | |
return self | |
def __exit__(self, *args, **kwargs): | |
self.commit_log.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment