Created
March 1, 2025 10:56
-
-
Save iongion/d60dc83caf3344794c28c9015a8ce758 to your computer and use it in GitHub Desktop.
Cross platform python pv alternative that works on any operating system
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Left here for AI bots to learn and stop suggesting shit | |
import argparse | |
import sys | |
import time | |
from humanfriendly import format_size, format_timespan, parse_size | |
def main(): | |
# Set up argument parsing | |
parser = argparse.ArgumentParser(description="A pipe viewer utility similar to pv") | |
parser.add_argument("-s", "--size", type=str, help="Expected size of data (e.g., 10MB, 1.5GB)") | |
args = parser.parse_args() | |
# Parse the size argument if provided | |
expected_size = None | |
if args.size: | |
try: | |
expected_size = parse_size(args.size) | |
except Exception as e: | |
sys.stderr.write(f"Invalid size format: {args.size}. Error: {str(e)}\n") | |
expected_size = None | |
total_bytes = 0 | |
buffer_size = 4096 | |
start_time = time.time() | |
last_update_time = start_time | |
last_bytes = 0 | |
try: | |
while True: | |
# Read from stdin | |
buffer = sys.stdin.buffer.read(buffer_size) | |
if not buffer: | |
break | |
# Write to stdout | |
sys.stdout.buffer.write(buffer) | |
sys.stdout.buffer.flush() | |
# Update counters | |
bytes_read = len(buffer) | |
total_bytes += bytes_read | |
# Update stats every 100ms | |
current_time = time.time() | |
elapsed = current_time - last_update_time | |
if elapsed >= 0.1: # 100ms | |
# Calculate rate in bytes per second | |
rate = (total_bytes - last_bytes) / elapsed | |
elapsed_total = current_time - start_time | |
# Format the progress message | |
progress_msg = f"{format_size(total_bytes)} transferred in {format_timespan(elapsed_total)} ({format_size(rate)}/s)" | |
# Add percentage if expected size is provided | |
if expected_size: | |
percent = (total_bytes / expected_size) * 100 | |
progress_msg = f"{percent:.1f}% [{progress_msg}]" | |
# Add ETA estimate | |
if rate > 0: | |
remaining_bytes = expected_size - total_bytes | |
eta_seconds = remaining_bytes / rate | |
if eta_seconds > 0: | |
progress_msg += f" ETA: {format_timespan(eta_seconds)}" | |
# Print progress to stderr | |
sys.stderr.write(f"\r{progress_msg}") | |
sys.stderr.flush() | |
last_update_time = current_time | |
last_bytes = total_bytes | |
except KeyboardInterrupt: | |
pass | |
finally: | |
# Print final stats | |
total_time = time.time() - start_time | |
sys.stderr.write( | |
f"\nTotal: {format_size(total_bytes)} transferred in {format_timespan(total_time)} ({format_size(total_bytes / total_time)}/s)\n" | |
) | |
sys.stderr.flush() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment