Created
October 9, 2012 20:37
-
-
Save AnIrishDuck/3861254 to your computer and use it in GitHub Desktop.
Some useful context manager utilities used when working with Python.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Utility methods for context managers. License is BSD 3-Clause. """ | |
from collections import defaultdict | |
from unittest import TestCase | |
class nested(object): | |
""" | |
This context manager is different from ``contextlib.nested``. | |
``contextlib.nested`` is one-shot: once instantiated it can only be used | |
in a ``with`` statement once. This manager can be infinitely re-used. So | |
the following code works with this incarnation of ``nested``, but not with | |
``contextlib.nested`` :: | |
n = nested(a, b, c) | |
with n: | |
pass | |
with n: | |
pass | |
There are other subtle differences. | |
""" | |
def __init__(self, *nest): | |
self.nest = nest | |
def __enter__(self): | |
self.exits = [mgr.__exit__ for mgr in self.nest] | |
vars = [] | |
for mgr in self.nest: | |
vars.append(mgr.__enter__()) | |
return vars | |
def __exit__(self, et, ev, tb): | |
for exit in self.exits: | |
exit(et, ev, tb) | |
def ContextCase(cm): | |
""" | |
Constructs a TestCase from the given context_manager. ``__enter__`` will | |
be called during ``setUp`` and ``__exit__`` will be called during | |
``tearDown``. | |
""" | |
class _Case(TestCase): | |
def setUp(self): | |
super(_Case, self).setUp() | |
cm.__enter__() | |
def tearDown(self): | |
super(_Case, self).tearDown() | |
cm.__exit__(None, None, None) | |
return _Case | |
class Ordering(object): | |
""" | |
Defines and maintains two maps of sets: ``higher``, which maps an object to | |
all objects greater than it, and ``lower``, which maps an object to all | |
objects less than it. | |
""" | |
def __init__(self): | |
self.lower = defaultdict(set) | |
self.higher = defaultdict(set) | |
self.acquired = [] | |
def update_set(self, m, to_update, to_add): | |
m[to_update].add(to_add) | |
m[to_update].update(m[to_add]) | |
def update(self, low, high): | |
""" Indicate that in this ordering, low is less than high. """ | |
self.update_set(self.higher, low, high) | |
for lower_down in self.lower[low]: | |
self.update_set(self.higher, lower_down, high) | |
self.update_set(self.lower, high, low) | |
for higher_up in self.higher[high]: | |
self.update_set(self.lower, higher_up, low) | |
class OrderedManager(object): | |
""" | |
:class:`OrderedManager` inheritors must be entered in the same ``order`` | |
every time. ``order`` is an :class:`Ordering` that records all managers | |
"greater" and "less than" any given manager. | |
Every time this manager is acquired, it declares that it must be invoked | |
after all currently active managers. This dynamically creates a total order | |
of all managers that are entered in an interleaved fashion. | |
""" | |
def __init__(self, order): | |
self.order = order | |
def __enter__(self): | |
order = self.order | |
greater = order.higher[self] | |
if order.acquired: | |
conflicts = greater & set(order.acquired) | |
if conflicts: | |
raise EntryOrderError(self, greater, conflicts) | |
for lower in order.acquired: | |
order.update(lower, self) | |
order.acquired.append(self) | |
def __exit__(self, et, ev, tb): | |
self.order.acquired.pop() | |
class EntryOrderError(RuntimeError): | |
""" Invalid attempt to acquire manager out of order. """ | |
def __init__(self, mgr, not_after, conflicts): | |
self.not_after, self.conflicts = not_after, conflicts | |
self.bad = mgr | |
msg = (self.__doc__ + "\n" + | |
"Bad Manager: {0.bad}\n" | |
"Cannot acquire after: {0.not_after}\n" | |
"Conflicts with: {0.conflicts}").format(self) | |
super(EntryOrderError, self).__init__(msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment