Last active
October 22, 2020 04:50
-
-
Save copperlight/eac3e2632acf600758e981a133542c8d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bz2 | |
import fnmatch | |
import gzip | |
import os | |
import re | |
from typing import Callable, Generator, List, Optional, Pattern, Tuple | |
""" | |
Generators for building log processing pipelines. | |
Code borrowed and combined from Python Generator Hacking, with some modifications: | |
https://www.dabeaz.com/usenix2009/generators/index.html | |
Given a set of log files which may be compressed, stream all lines, filter them by pattern | |
match and transform them into dictionaries, maybe remapping some of the field values. | |
Example log line: | |
2020-10-04T19:11:45.945 DEBUG [RxComputationScheduler-1] com.amazonaws.latency: | |
ServiceName=[Amazon S3], StatusCode=[200], | |
Example usage: | |
import re | |
from logprocessing import * | |
log_format = r'.*com.amazonaws.latency: ' \ | |
r'ServiceName=\[([^\]]*)\], ' \ | |
r'StatusCode=\[([^\]]*)\], ' | |
log_pattern = re.compile(log_format) | |
columns = ( | |
'ServiceName', | |
'StatusCode' | |
) | |
remaps = [ | |
('StatusCode', int), | |
] | |
lines = lines_from_dir('*.log', '.') | |
filtered = gen_grep('com.amazonaws.latency', lines) | |
events = lines_to_events(filtered, log_pattern, columns, remaps) | |
for e in events: | |
# do something with the sequence of log events | |
pass | |
""" | |
def gen_files(pattern: str, top: str) -> Generator: | |
"""Generate filenames in a directory that match a pattern.""" | |
for dirpath, dirnames, filenames in os.walk(top): | |
for name in fnmatch.filter(filenames, pattern): | |
yield os.path.join(dirpath, name) | |
def gen_streams(filenames: Generator) -> Generator: | |
"""Given a sequence of filenames, generate a sequence of file streams.""" | |
for name in filenames: | |
if name.endswith('.gz'): | |
yield gzip.open(name) | |
elif name.endswith('.bz2'): | |
yield bz2.BZ2File(name) | |
else: | |
yield open(name) | |
def gen_cat(sources: Generator) -> Generator: | |
"""Concatenate multiple generators into a single sequence.""" | |
for s in sources: | |
for item in s: | |
yield item | |
def lines_from_dir(pattern: str, dirname: str) -> Generator: | |
"""Generate lines from files in a directory.""" | |
files = gen_files(pattern, dirname) | |
streams = gen_streams(files) | |
lines = gen_cat(streams) | |
return lines | |
def gen_grep(pattern: str, lines: Generator) -> Generator: | |
"""Grep a sequence of lines that match regex pattern.""" | |
pattern = re.compile(pattern) | |
for line in lines: | |
if pattern.search(line): | |
yield line | |
def field_remap( | |
events: Generator, | |
remaps: List[Tuple[str, Callable]] | |
) -> Generator: | |
"""Take a sequence of dictionaries and remap fields.""" | |
for e in events: | |
for name, func in remaps: | |
e[name] = func(e[name]) | |
yield e | |
def lines_to_events( | |
lines: Generator, | |
log_pattern: Pattern, | |
columns: Tuple, | |
remaps: Optional[List[Tuple[str, Callable]]] = None | |
) -> Generator: | |
"""Parse an app log file into a sequence of dictionaries.""" | |
if not remaps: | |
remaps = [] | |
groups = (log_pattern.match(line) for line in lines) | |
tuples = (g.groups() for g in groups if g) | |
events = (dict(zip(columns, t)) for t in tuples) | |
return field_remap(events, remaps) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment