|
import re |
|
import requests |
|
import hashlib |
|
import json |
|
import subprocess |
|
from datetime import datetime, UTC |
|
from urllib.parse import quote |
|
from pathlib import Path |
|
from collections import OrderedDict |
|
|
|
|
|
def setup_directories(document_id): |
|
"""Create directory structure for files""" |
|
base_dir = Path("files") |
|
doc_dir = base_dir / str(document_id) |
|
base_dir.mkdir(exist_ok=True) |
|
doc_dir.mkdir(exist_ok=True) |
|
return base_dir, doc_dir |
|
|
|
def initialize_workspace(): |
|
"""Create initial directory structure and sample input file""" |
|
try: |
|
# Create files directory |
|
files_dir = Path("files") |
|
files_dir.mkdir(exist_ok=True) |
|
|
|
# Create sample input.json if it doesn't exist |
|
input_path = files_dir / "input.json" |
|
if not input_path.exists(): |
|
sample_input = [ |
|
"607369", |
|
"607370" |
|
] |
|
with open(input_path, 'w') as f: |
|
json.dump(sample_input, f, indent=2) |
|
print("\nInitialized workspace:") |
|
print("- Created 'files' directory") |
|
print("- Created sample input.json") |
|
print("\nPlease update files/input.json with your document IDs and run the script again") |
|
return False |
|
return True |
|
except Exception as e: |
|
print(f"Error initializing workspace: {str(e)}") |
|
return False |
|
|
|
def calculate_sha256(filename): |
|
"""Calculate SHA256 hash of a file""" |
|
sha256_hash = hashlib.sha256() |
|
with open(filename, 'rb') as f: |
|
for byte_block in iter(lambda: f.read(4096), b''): |
|
sha256_hash.update(byte_block) |
|
return sha256_hash.hexdigest() |
|
|
|
def get_attestation(file_hash): |
|
"""Get attestation from SimpleProof API""" |
|
url = "https://app.simpleproof.com/api/proof/attestation" |
|
payload = { |
|
"category": "p-0097", # user ID |
|
"hash": file_hash, |
|
"num": 0 |
|
} |
|
response = requests.post(url, json=payload) |
|
return response.json() |
|
|
|
def get_download_url(attestation): |
|
"""Get download URL for OTS file""" |
|
url = "https://app.simpleproof.com/api/proof/download-url" |
|
payload = { |
|
"space": attestation["space"], |
|
"key": f"{attestation['prefix']}{attestation['otsName']}", |
|
"size": attestation["otsSize"] |
|
} |
|
response = requests.post(url, json=payload) |
|
return response.json()["url"] |
|
|
|
def verify_timestamp_with_blockchain(pdf_file, ots_file, doc_dir): |
|
"""Verify timestamp against blockchain using OTS CLI and mempool.space API""" |
|
try: |
|
# Run OTS verification and capture output |
|
cmd = ['ots', '--no-bitcoin', 'verify', '-f', str(pdf_file), str(ots_file)] |
|
result = subprocess.run(cmd, capture_output=True, text=True) |
|
|
|
# Save the complete OTS output |
|
ots_result_path = doc_dir / f"{pdf_file.stem}-ots-result.txt" |
|
with open(ots_result_path, 'w') as f: |
|
f.write(result.stderr) |
|
|
|
merkle_roots = re.findall(r"merkleroot (\w{64})", result.stderr) |
|
block_heights = re.findall(r"Bitcoin block (\d+) has", result.stderr) |
|
|
|
if not merkle_roots or not block_heights: |
|
return False, None, None, None |
|
|
|
for height, merkleroot in zip(block_heights, merkle_roots): |
|
try: |
|
response = requests.get(f"https://mempool.space/api/block-height/{height}") |
|
block_hash = response.text.strip() |
|
|
|
block_data = requests.get(f"https://mempool.space/api/block/{block_hash}").json() |
|
if merkleroot != block_data['merkle_root']: |
|
return False, None, None, None |
|
|
|
return True, int(height), block_hash, block_data['timestamp'] |
|
|
|
except Exception: |
|
return False, None, None, None |
|
|
|
except Exception: |
|
return False, None, None, None |
|
|
|
|
|
def process_document(document_id): |
|
"""Process a document through the entire verification workflow""" |
|
print(f"\nProcessing document {document_id}...") |
|
|
|
try: |
|
# Setup directories |
|
base_dir, doc_dir = setup_directories(document_id) |
|
|
|
# Download document |
|
print("→ Downloading document...") |
|
pdf_url = f"https://www.transparencia.gob.sv/institutions/capres/documents/{document_id}/download" |
|
response = requests.get(pdf_url) |
|
pdf_path = doc_dir / f"{document_id}.pdf" |
|
with open(pdf_path, 'wb') as f: |
|
f.write(response.content) |
|
print("✓ Document downloaded") |
|
|
|
# Calculate hash |
|
print("→ Calculating SHA256...") |
|
file_hash = calculate_sha256(pdf_path) |
|
print(f"✓ SHA256: {file_hash}") |
|
|
|
# Get attestation |
|
print("→ Getting attestation...") |
|
attestation = get_attestation(file_hash) |
|
print("✓ Attestation received") |
|
|
|
# Check attestation status |
|
if attestation.get("status") == "document.status.pending": |
|
print("\n! Document attestation is pending") |
|
print("→ Document has not been stamped yet") |
|
result_data = { |
|
"sha256": file_hash, |
|
"status": "PENDING", |
|
"last_checked": datetime.now(UTC).isoformat() |
|
} |
|
return result_data |
|
|
|
# Download OTS file |
|
print("→ Downloading OTS file...") |
|
try: |
|
ots_url = get_download_url(attestation) |
|
except KeyError as e: |
|
print("\nError: Invalid attestation response") |
|
print(f"Missing field: {str(e)}") |
|
print("Attestation response content:") |
|
print(json.dumps(attestation, indent=2)) |
|
raise Exception(f"Invalid attestation response - missing {str(e)} field") from e |
|
except Exception as e: |
|
print("\nError: Failed to get download URL") |
|
print("Attestation response content:") |
|
print(json.dumps(attestation, indent=2)) |
|
raise Exception("Failed to process attestation response") from e |
|
|
|
ots_path = doc_dir / f"{document_id}.ots" |
|
response = requests.get(ots_url) |
|
with open(ots_path, 'wb') as f: |
|
f.write(response.content) |
|
print("✓ OTS file downloaded") |
|
|
|
# Verify timestamp |
|
print("→ Verifying blockchain timestamp...") |
|
is_valid, block_height, block_hash, block_time = verify_timestamp_with_blockchain(pdf_path, ots_path, doc_dir) |
|
verification_status = "PASS" if is_valid else "FAIL" |
|
print(f"✓ Blockchain verification: {verification_status}") |
|
|
|
# Create result data |
|
result_data = { |
|
"file_name": attestation["srcName"], |
|
"sha256": file_hash, |
|
"ots_verification": verification_status, |
|
"block_height": block_height, |
|
"block_hash": block_hash, |
|
"block_time": block_time |
|
} |
|
|
|
# Add SimpleProof URL only if verification passed |
|
if verification_status == "PASS": |
|
simpleproof_url = f"https://verify.simpleproof.com/SP/p-0097/{file_hash}" |
|
result_data["simpleproof-url"] = simpleproof_url |
|
|
|
print("✓ Verification complete") |
|
return result_data |
|
|
|
except Exception as e: |
|
if hasattr(e, '__cause__') and e.__cause__ is not None: |
|
print(f"Error processing document {document_id}: {str(e)}") |
|
else: |
|
print(f"\nError processing document {document_id}:") |
|
print(f"Error type: {type(e).__name__}") |
|
print(f"Error message: {str(e)}") |
|
if isinstance(e, requests.exceptions.RequestException): |
|
print(f"URL: {e.request.url}") |
|
print(f"Status code: {e.response.status_code if e.response else 'No response'}") |
|
return None |
|
|
|
def batch_process_documents(): |
|
"""Process all documents from input.json and update verified.json, maintaining input order""" |
|
# Initialize workspace if needed |
|
if not initialize_workspace(): |
|
return |
|
|
|
try: |
|
# Load input documents while preserving order |
|
with open("files/input.json", 'r') as f: |
|
document_ids = json.load(f) |
|
|
|
# Initialize or load verified.json |
|
verification_path = Path("files/verified.json") |
|
|
|
# Create new ordered dict for results |
|
verification_data = OrderedDict() |
|
|
|
# If verification file exists, load existing PASS verifications |
|
if verification_path.exists(): |
|
with open(verification_path, 'r') as f: |
|
existing_verifications = json.load(f) |
|
for doc_id, data in existing_verifications.items(): |
|
if data.get('ots_verification') == 'PASS': |
|
verification_data[doc_id] = data |
|
|
|
# Process each document in order |
|
print(f"Found {len(document_ids)} documents to process") |
|
documents_to_process = [] |
|
|
|
# First, check which documents need processing |
|
for document_id in document_ids: |
|
if (document_id in verification_data and |
|
verification_data[document_id].get('ots_verification') == 'PASS'): |
|
print(f"Skipping document {document_id} - already verified successfully") |
|
else: |
|
documents_to_process.append(document_id) |
|
|
|
print(f"\nDocuments requiring verification: {len(documents_to_process)}") |
|
|
|
# Process only the documents that need verification |
|
for document_id in documents_to_process: |
|
result = process_document(document_id) |
|
if result and result.get('ots_verification') == 'PASS': |
|
verification_data[document_id] = result |
|
|
|
# Ensure order matches input.json after each verification |
|
ordered_verification = OrderedDict() |
|
for doc_id in document_ids: |
|
if doc_id in verification_data: |
|
ordered_verification[doc_id] = verification_data[doc_id] |
|
|
|
# Save after each successful verification, with correct order |
|
with open(verification_path, 'w') as f: |
|
json.dump(ordered_verification, f, indent=2) |
|
elif result: |
|
# Document was processed but didn't pass verification |
|
status = result.get('status', 'FAIL') |
|
if status == 'PENDING': |
|
print(f"Document {document_id} is pending stamping - not added to verified.json") |
|
else: |
|
print(f"Document {document_id} failed verification - not added to verified.json") |
|
|
|
# Final order check and correction |
|
final_verification = OrderedDict() |
|
for doc_id in document_ids: |
|
if doc_id in verification_data: |
|
final_verification[doc_id] = verification_data[doc_id] |
|
|
|
# Save final ordered version |
|
with open(verification_path, 'w') as f: |
|
json.dump(final_verification, f, indent=2) |
|
|
|
print("\nBatch processing complete") |
|
successfully_verified = len(final_verification) |
|
print(f"Documents with successful verification: {successfully_verified}") |
|
print(f"Documents pending or failed: {len(document_ids) - successfully_verified}") |
|
print(f"Results saved to {verification_path} in original input order") |
|
|
|
# Verify final order matches input.json |
|
with open(verification_path, 'r') as f: |
|
final_data = json.load(f) |
|
final_order = list(final_data.keys()) |
|
input_order = [id for id in document_ids if id in final_data] |
|
if final_order == input_order: |
|
print("✓ Final verified.json order matches input.json") |
|
else: |
|
print("! Warning: Final order verification failed") |
|
|
|
except Exception as e: |
|
print(f"Error in batch processing: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
batch_process_documents() |