This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from collections import namedtuple | |
| import re | |
| import sys | |
| SIGNALFX_API = 'https://app.signalfx.com/#/' | |
| RES_TYPE_RE= re.compile(r"^(?P<type>\S+?)\..*:$") | |
| # STATES |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from md5 import md5 | |
| class FakeHashRing(object): | |
| def __init__(self, nodes): | |
| self.nodes = list(nodes) | |
| def _hash(self, key): | |
| return int(md5(str(key).encode('utf-8')).hexdigest(), 16) % len(self.nodes) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from job_class import Job | |
| def delete_expired_stuff(time): | |
| print "Delete expired stuff before %s" % time | |
| class CleanupJob(Job): | |
| # configuration: run at 5am | |
| run_at = '05:00' | |
| # implementation: nuke expired sessions |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class storelast(object): | |
| def __init__(self,source): | |
| self.source = source | |
| def next(self): | |
| item = self.source.next() | |
| self.last = item | |
| return item | |
| def __iter__(self): | |
| return self |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def trace(source): | |
| for item in source: | |
| print item | |
| yield item | |
| r404 = trace(r for r in log if r['status'] == 404) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| lines = (for line in open("run/foo/access-log")) | |
| # PYTHON 3 ONLY!!! | |
| for i,line in enumerate(lines): | |
| print line | |
| if i == 10: lines.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| def follow(thefile): | |
| thefile.seek(0,2) | |
| while True: | |
| line = thefile.readline() | |
| if not line: | |
| time.sleep(0.1) | |
| continue | |
| yield line |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| wwwlog = open("access-log") | |
| total = 0 | |
| for line in wwwlog: | |
| bytestr = line.rsplit(None,1)[1] | |
| if bytestr != '-': | |
| total += int(bytestr) | |
| print "Total", total |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # raise exception | |
| mock = Mock(side_effect=KeyError('foo')) | |
| mock() | |
| # KeyError: 'foo' | |
| # return value based on argument | |
| values = {'a': 1, 'b': 2, 'c': 3} | |
| def side_effect(arg): | |
| return values[arg] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| test_params = { | |
| 'empty_line': ('', {}), | |
| 'get_ok': ('GET 200', {'request': 'GET', 'status': '200'}), | |
| 'get_not_found': ('GET 404', {'request': 'GET', 'status': '404'}), | |
| } | |
| @pytest.mark.parametrize('line,expected', test_params.values(), ids=test_params.keys()) | |
| def test_decode(self, line, expected): | |
| assert Decoder().decode(line) == expected | |
NewerOlder