Last active
August 24, 2018 22:06
-
-
Save dwaltrip/bd3321880180f556ba0f9d1c4962b6f7 to your computer and use it in GitHub Desktop.
"tail -f" in python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import time | |
import subprocess | |
import select | |
# Modified from: https://stackoverflow.com/a/12523371/111635 | |
def tail_file(filename, process_line): | |
assert callable(process_line), '`process_line` should be a callable' | |
f = subprocess.Popen(['tail','-F',filename],\ | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
p = select.poll() | |
p.register(f.stdout) | |
while True: | |
if p.poll(1): | |
while True: | |
line = f.stdout.readline() | |
if line: | |
process_line(line) | |
else: | |
break | |
time.sleep(1) | |
if __name__ == '__main__': | |
if not len(sys.argv) > 1: | |
print('Requires single argument, the path to the logfile') | |
sys.exit(1) | |
filename = sys.argv[1] | |
print('Calling `tail_file` on %s...' % filename) | |
def process_line(line): | |
print('received line:', line) | |
tail_file(filename, process_line) |
And if you are feeling particularly fancy, and need to tail a rotating log file (as I do), here is the solution:
import os, time
def tail_rotating_file(filename, sleep=0.5):
assert os.path.isfile(filename), ('"%s" is not a file or is missing' % filename)
file_id = unique_file_identifier(filename)
line_group = []
f = open(filename, 'r')
try:
while True:
line = f.readline()
if line:
line_group.append(line)
else:
# Only send the lines once we are all caught up (line = None)
if len(line_group) > 0:
yield line_group
line_group = []
# Check if the log has rotated. If so, get the new log file
# TODO: any potential errors to handle here? missing file? etc?
latest_file_id = unique_file_identifier(filename)
if latest_file_id != file_id:
file_id = latest_file_id
f.close()
f = open(filename, 'r')
# Wait for more lines to accumulate
time.sleep(sleep)
finally:
f.close()
def unique_file_identifier(filename):
# NOTE: `st_ino` is always 0 on windows, which won't work
return os.stat(filename).st_ino
Note: I also return the list of most recent lines since the last sleep
, instead of yielding each line one by one, as I'm doing some processing where I roll up errors/tracebacks from multiple lines into a single structured log entry.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This solution works okay.
However, it turns out there is a much better way (simpler, more elegant, & just as efficient):
Source: https://stackoverflow.com/a/1703705/111635