Last active
August 22, 2024 05:43
-
-
Save Forpetesake2/03c6bab6c9aa24ef3fe39c669c9d65f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import json | |
import argparse | |
# Replace with your actual Shotstack API key | |
SHOTSTACK_API_KEY = "your_shotstack_api_key" | |
async def push_mp4_to_shotstack(session, url): | |
"""Push an mp4 URL to the Shotstack API for processing.""" | |
print(f"Pushing URL to Shotstack: {url}") | |
payload = { | |
"url": url, | |
"outputs": { | |
"renditions": [ | |
{ | |
"fixOffset": True | |
} | |
] | |
} | |
} | |
headers = { | |
'cache-control': 'no-cache', | |
'content-type': 'application/json', | |
'x-api-key': SHOTSTACK_API_KEY | |
} | |
async with session.post( | |
"https://api.shotstack.io/ingest/v1/sources", | |
headers=headers, | |
json=payload | |
) as response: | |
response_data = await response.json() | |
source_id = response_data["data"]["id"] | |
print(f"Shotstack Source ID: {source_id}") | |
return source_id | |
async def fetch_rendition_url(session, source_id): | |
"""Check the status of the rendition and return the URL once ready.""" | |
url = f"https://api.shotstack.io/ingest/v1/sources/{source_id}" | |
headers = { | |
'cache-control': 'no-cache', | |
'content-type': 'application/json', | |
'x-api-key': SHOTSTACK_API_KEY | |
} | |
print(f"Checking status for Source ID: {source_id}") | |
while True: | |
async with session.get(url, headers=headers) as response: | |
response_data = await response.json() | |
try: | |
rendition_status = response_data["data"]["attributes"]["outputs"]["renditions"][0]["status"] | |
print(f"Current rendition status for Source ID {source_id}: {rendition_status}") | |
if rendition_status == "ready": | |
rendition_url = response_data["data"]["attributes"]["outputs"]["renditions"][0]["url"] | |
print(f"Rendition URL is ready: {rendition_url}") | |
return rendition_url | |
except KeyError as e: | |
print(f"Error: Expected key is missing in the response. {e}") | |
print("Full response data:") | |
print(json.dumps(response_data, indent=4)) | |
raise | |
print("Rendition not ready yet. Waiting 5 seconds before checking again...") | |
await asyncio.sleep(5) | |
async def update_json_with_new_urls(session, original_json): | |
"""Process video clips in the JSON and update their URLs with new Shotstack URLs.""" | |
tasks = [] | |
for track in original_json["timeline"]["tracks"]: | |
for clip in track["clips"]: | |
if clip["asset"]["type"] == "video": | |
print(f"Processing clip with source: {clip['asset']['src']}") | |
task = asyncio.create_task(process_clip(session, clip)) | |
tasks.append(task) | |
await asyncio.gather(*tasks) | |
return original_json | |
async def process_clip(session, clip): | |
"""Push a video clip's URL to Shotstack and update the clip with the new URL.""" | |
source_id = await push_mp4_to_shotstack(session, clip["asset"]["src"]) | |
new_url = await fetch_rendition_url(session, source_id) | |
clip["asset"]["src"] = new_url | |
print(f"Updated clip source to new URL: {new_url}") | |
async def main(input_file, output_file): | |
"""Main function to handle reading, processing, and saving the JSON file.""" | |
print(f"Reading input JSON file: {input_file}") | |
with open(input_file, 'r') as f: | |
original_json = json.load(f) | |
async with aiohttp.ClientSession() as session: | |
print("Updating JSON with new URLs...") | |
updated_json = await update_json_with_new_urls(session, original_json) | |
print(f"Writing updated JSON to file: {output_file}") | |
with open(output_file, 'w') as f: | |
json.dump(updated_json, f, indent=4) | |
print(f"Updated JSON saved to {output_file}") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Process MP4 files in a JSON through the Shotstack API.') | |
parser.add_argument('input_file', type=str, help='Path to the input JSON file') | |
parser.add_argument('output_file', type=str, help='Path to save the updated JSON file') | |
args = parser.parse_args() | |
asyncio.run(main(args.input_file, args.output_file)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment