Skip to content

Instantly share code, notes, and snippets.

@chawza
Created April 23, 2026 01:38
Show Gist options
  • Select an option

  • Save chawza/2237ca3959e952099ca7386ba2c55725 to your computer and use it in GitHub Desktop.

Select an option

Save chawza/2237ca3959e952099ca7386ba2c55725 to your computer and use it in GitHub Desktop.
Batch log FileHandler vs WatchedFileHanlder vs MemoryHandler + WatchedFileHandler

200 logs

> uv run test_logging.py
Generating 200 identical JSON payloads for fairness...

Payload size: ~373 bytes per log.

len(payloads) = 200payloads generated.

Starting benchmarks...

--- Standard FileHandler ---
  Loop Time (App Delay) : 0.0053 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0053 seconds

--- WatchedFileHandler ---
  Loop Time (App Delay) : 0.0065 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0065 seconds

--- MemoryHandler (Batch: 1000) -> WatchedFileHandler ---
  Loop Time (App Delay) : 0.0027 seconds
  Final Flush Time      : 0.0046 seconds
  Total Execution Time  : 0.0073 seconds

Cleaning up log files...
Done.

400 logs

> uv run test_logging.py
Generating 400 identical JSON payloads for fairness...

Payload size: ~375 bytes per log.

len(payloads) = 400payloads generated.

Starting benchmarks...

--- Standard FileHandler ---
  Loop Time (App Delay) : 0.0025 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0025 seconds

--- WatchedFileHandler ---
  Loop Time (App Delay) : 0.0032 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0032 seconds

--- MemoryHandler (Batch: 1000) -> WatchedFileHandler ---
  Loop Time (App Delay) : 0.0009 seconds
  Final Flush Time      : 0.0021 seconds
  Total Execution Time  : 0.0029 seconds

Cleaning up log files...
Done.

1000 logs

> uv run test_logging.py
Generating 1000 identical JSON payloads for fairness...

Payload size: ~374 bytes per log.

len(payloads) = 1000payloads generated.

Starting benchmarks...

--- Standard FileHandler ---
  Loop Time (App Delay) : 0.0178 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0178 seconds

--- WatchedFileHandler ---
  Loop Time (App Delay) : 0.0264 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0264 seconds

--- MemoryHandler (Batch: 1000) -> WatchedFileHandler ---
  Loop Time (App Delay) : 0.0201 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0201 seconds

Cleaning up log files...
Done.

2000 logs

> uv run test_logging.py
Generating 2000 identical JSON payloads for fairness...

Payload size: ~376 bytes per log.

len(payloads) = 2000payloads generated.

Starting benchmarks...

--- Standard FileHandler ---
  Loop Time (App Delay) : 0.0254 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0254 seconds

--- WatchedFileHandler ---
  Loop Time (App Delay) : 0.0433 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0433 seconds

--- MemoryHandler (Batch: 1000) -> WatchedFileHandler ---
  Loop Time (App Delay) : 0.0362 seconds
  Final Flush Time      : 0.0000 seconds
  Total Execution Time  : 0.0362 seconds

Cleaning up log files...
Done.
import time
import logging
import logging.handlers
import time
import json
import random
import uuid
import os
def generate_payloads(n):
print(f"Generating {n} identical JSON payloads for fairness...")
payloads =[]
for _ in range(n):
# Realistic push notification payload size and structure
data = {
"request_id": str(uuid.uuid4()),
"user_id": random.randint(10000, 99999),
"device_token": "token_" + "".join(random.choices("abcdef0123456789", k=64)),
"payload": {
"title": "New Message",
"body": "You have a new message from a user in your network.",
"action": "OPEN_CHAT",
"badge": 1
},
"status": random.choice(["success", "failed", "timeout", "rate_limited"]),
"response_time_ms": random.randint(10, 500),
"timestamp": time.time()
}
payloads.append(json.dumps(data))
return payloads
def run_test(test_name, handler, payloads):
# Setup Logger
logger = logging.getLogger(test_name)
logger.setLevel(logging.INFO)
logger.propagate = False
# Ensure no duplicate handlers
if logger.hasHandlers():
logger.handlers.clear()
# Standardize formatting
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# If it's a MemoryHandler, set the formatter on the target, not the buffer
if isinstance(handler, logging.handlers.MemoryHandler):
handler.target.setFormatter(formatter)
else:
handler.setFormatter(formatter)
logger.addHandler(handler)
# --- START BENCHMARK ---
start_time = time.perf_counter()
# 1. Measure Loop Time (How long the 'business logic' takes)
for msg in payloads:
logger.info(msg)
loop_end_time = time.perf_counter()
# 2. Force a final flush and measure Total Time
handler.flush()
if isinstance(handler, logging.handlers.MemoryHandler):
handler.target.flush()
total_end_time = time.perf_counter()
# --- END BENCHMARK ---
# Cleanup
handler.close()
loop_time = loop_end_time - start_time
total_time = total_end_time - start_time
flush_time = total_end_time - loop_end_time
print(f"--- {test_name} ---")
print(f" Loop Time (App Delay) : {loop_time:.4f} seconds")
print(f" Final Flush Time : {flush_time:.4f} seconds")
print(f" Total Execution Time : {total_time:.4f} seconds\n")
return total_time
def main(NUM_PAYLOADS = 200, WRITE_BATCH = 1000):
# Log files
file_fh = "test_fh.log"
file_wfh = "test_wfh.log"
file_mem = "test_mem_wfh.log"
# Pre-generate datasets so JSON stringification time doesn't affect the benchmark
payloads = generate_payloads(NUM_PAYLOADS)
print(f"\nPayload size: ~{len(payloads[0])} bytes per log.\n")
print(f'{len(payloads) = }payloads generated.\n')
print("Starting benchmarks...\n")
time.sleep(2)
# 1. Standard FileHandler
fh = logging.FileHandler(file_fh, mode='w')
run_test("Standard FileHandler", fh, payloads)
time.sleep(2)
# 2. WatchedFileHandler
wfh = logging.handlers.WatchedFileHandler(file_wfh, mode='w')
run_test("WatchedFileHandler", wfh, payloads)
time.sleep(2)
# 3. MemoryHandler targeting a WatchedFileHandler
target_wfh = logging.handlers.WatchedFileHandler(file_mem, mode='w')
mem_handler = logging.handlers.MemoryHandler(
capacity=WRITE_BATCH,
flushLevel=logging.ERROR, # Only flush on error OR when capacity is reached
target=target_wfh
)
run_test(f"MemoryHandler (Batch: {WRITE_BATCH}) -> WatchedFileHandler", mem_handler, payloads)
# Cleanup test files (Comment this out if you want to inspect the actual log files)
print("Cleaning up log files...")
for f in [file_fh, file_wfh, file_mem]:
if os.path.exists(f):
os.remove(f)
print("Done.")
if __name__ == "__main__":
main(2000, 1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment