Forked from andrewjbennett/convert-h10-acc-xyz-to-somnopose.py
Last active
January 29, 2026 19:22
-
-
Save AJolly/b085c44c552967796abbb28471de5db5 to your computer and use it in GitHub Desktop.
Convert the accelerometer data from the Polar H10 to the format generated by Somnopose, to import into OSCAR.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Convert Polar H10 accelerometer data to SomnoPose format for OSCAR import | |
| Usage: | |
| python convert-h10-acc-xyz-to-somnopose.py <input_file.jsonl> # Process single ACC.jsonl file | |
| python convert-h10-acc-xyz-to-somnopose.py # Auto-process all unprocessed ACC.jsonl files | |
| OSCAR can import SomnoPose data for position tracking during sleep. | |
| Example in OSCAR: https://i.imgur.com/SDLQJEY.png | |
| This is a quick mod to use jsonl files as recorded via PolarRecorder | |
| """ | |
| import json | |
| import math | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import List, NamedTuple | |
| class AccData(NamedTuple): | |
| """Accelerometer data point with calculated orientation""" | |
| timestamp: int # Unix timestamp in seconds | |
| x: int # X-axis in milli-g | |
| y: int # Y-axis in milli-g | |
| z: int # Z-axis in milli-g | |
| pitch: float # Calculated pitch angle | |
| roll: float # Calculated roll angle | |
| def calculate_angles(x: int, y: int, z: int) -> tuple[float, float]: | |
| """ | |
| Calculate pitch and roll angles from accelerometer data | |
| From: https://stackoverflow.com/a/10320532 | |
| """ | |
| pitch = math.atan2(-x, math.sqrt(y*y + z*z)) * 180 / math.pi | |
| roll = math.atan2(y, z) * 180 / math.pi | |
| return (pitch, roll) | |
| def extract_acc_data(jsonl_file: Path) -> tuple[List[AccData], int, str]: | |
| """ | |
| Extract accelerometer data from ACC.jsonl file | |
| Returns: | |
| - List of AccData points | |
| - First phoneTimestamp (milliseconds) | |
| - Recording name | |
| """ | |
| acc_data = [] | |
| first_timestamp_ms = None | |
| recording_name = None | |
| with open(jsonl_file, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| entry = json.loads(line) | |
| # Only process ACC data | |
| if entry.get('dataType') != 'ACC': | |
| continue | |
| # Capture first timestamp and recording name | |
| if first_timestamp_ms is None: | |
| first_timestamp_ms = entry.get('phoneTimestamp') | |
| recording_name = entry.get('recordingName') | |
| phone_timestamp_ms = entry.get('phoneTimestamp') | |
| # Extract all ACC samples from this entry | |
| for sample in entry.get('data', []): | |
| x = sample.get('x') | |
| y = sample.get('y') | |
| z = sample.get('z') | |
| if x is None or y is None or z is None: | |
| continue | |
| # Calculate angles | |
| pitch, roll = calculate_angles(x, y, z) | |
| # Convert phone timestamp (ms) to Unix seconds | |
| timestamp_sec = int(phone_timestamp_ms / 1000) | |
| acc_data.append(AccData( | |
| timestamp=timestamp_sec, | |
| x=x, y=y, z=z, | |
| pitch=pitch, roll=roll | |
| )) | |
| except json.JSONDecodeError as e: | |
| print(f"Warning: Could not parse line: {e}", file=sys.stderr) | |
| continue | |
| return acc_data, first_timestamp_ms, recording_name | |
| def create_filename(timestamp_ms: int = None, recording_name: str = None) -> str: | |
| """ | |
| Create filename in format: 2026-01-15 05-27-08 somnopose.csv | |
| Matches rr_extractor_script.py naming convention with 'somnopose' appended | |
| """ | |
| # Try to parse from recording name first (e.g., "sleep_20260115_052707") | |
| if recording_name: | |
| try: | |
| parts = recording_name.split('_') | |
| if len(parts) >= 3: | |
| date_part = parts[1] # "20260115" | |
| time_part = parts[2] # "052707" | |
| # Format: YYYYMMDD -> YYYY-MM-DD | |
| formatted_date = f"{date_part[0:4]}-{date_part[4:6]}-{date_part[6:8]}" | |
| # Format: HHMMSS -> HH-MM-SS | |
| formatted_time = f"{time_part[0:2]}-{time_part[2:4]}-{time_part[4:6]}" | |
| return f"{formatted_date} {formatted_time} somnopose.csv" | |
| except: | |
| pass | |
| # Fallback to timestamp | |
| if timestamp_ms: | |
| dt = datetime.fromtimestamp(timestamp_ms / 1000.0) | |
| return dt.strftime("%Y-%m-%d %H-%M-%S somnopose.csv") | |
| # Last resort: use current time | |
| return datetime.now().strftime("%Y-%m-%d %H-%M-%S somnopose.csv") | |
| def write_somnopose_csv(acc_data: List[AccData], output_path: Path): | |
| """ | |
| Write ACC data in SomnoPose CSV format | |
| Output format: | |
| - Timestamp: iOS time (Unix epoch - 978307200) | |
| - Orientation: roll angle | |
| - Inclination: pitch angle | |
| - One sample per second (deduplicates by timestamp) | |
| """ | |
| prev_timestamp = None | |
| samples_written = 0 | |
| with open(output_path, 'w') as f: | |
| # Write CSV header | |
| f.write("Timestamp,Orientation,Inclination\n") | |
| for sample in acc_data: | |
| # Convert Unix timestamp to iOS time (Unix epoch - 978307200) | |
| ios_timestamp = sample.timestamp - 978307200 | |
| # Only output one value per second | |
| if prev_timestamp == ios_timestamp: | |
| continue | |
| prev_timestamp = ios_timestamp | |
| # Write: timestamp, roll, pitch | |
| f.write(f"{ios_timestamp},{sample.roll:.2f},{sample.pitch:.2f}\n") | |
| samples_written += 1 | |
| return samples_written | |
| def find_acc_jsonl_files(polar_dir: Path) -> List[Path]: | |
| """Find all ACC.jsonl files in polar subdirectories""" | |
| acc_files = [] | |
| # Search for ACC.jsonl files in subdirectories | |
| for acc_file in polar_dir.rglob("ACC.jsonl"): | |
| acc_files.append(acc_file) | |
| return sorted(acc_files) | |
| def get_existing_output_files(output_dir: Path) -> set: | |
| """Get set of existing somnopose output filenames""" | |
| if not output_dir.exists(): | |
| return set() | |
| return {f.name for f in output_dir.glob("*somnopose.csv")} | |
| def process_single_file(input_file: Path, output_dir: Path = None) -> bool: | |
| """Process a single ACC.jsonl file""" | |
| # Extract data | |
| print(f"Processing {input_file}...") | |
| acc_data, timestamp_ms, recording_name = extract_acc_data(input_file) | |
| if not acc_data: | |
| print(f" Warning: No accelerometer data found in {input_file}") | |
| return False | |
| # Create output filename | |
| output_filename = create_filename(timestamp_ms, recording_name) | |
| # Determine output path | |
| if output_dir: | |
| output_path = output_dir / output_filename | |
| else: | |
| output_path = Path(output_filename) | |
| # Write SomnoPose CSV | |
| samples_written = write_somnopose_csv(acc_data, output_path) | |
| print(f" ✓ Processed {len(acc_data)} ACC samples → {samples_written} 1-sec samples") | |
| print(f" ✓ Saved to: {output_path}") | |
| return True | |
| def auto_process_mode(): | |
| """Automatically process all unprocessed ACC.jsonl files""" | |
| script_dir = Path(__file__).parent | |
| polar_dir = script_dir / "polar" | |
| output_dir = script_dir / "export" / "Jolly" | |
| if not polar_dir.exists(): | |
| print(f"Error: polar/ directory not found at {polar_dir}") | |
| sys.exit(1) | |
| # Create output directory if it doesn't exist | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Find all ACC.jsonl files | |
| acc_files = find_acc_jsonl_files(polar_dir) | |
| if not acc_files: | |
| print("No ACC.jsonl files found in polar/ subdirectories") | |
| return | |
| print(f"Found {len(acc_files)} ACC.jsonl files in polar/") | |
| # Get existing output files | |
| existing_outputs = get_existing_output_files(output_dir) | |
| print(f"Found {len(existing_outputs)} existing somnopose output files in {output_dir}") | |
| # Process each file | |
| processed = 0 | |
| skipped = 0 | |
| for acc_file in acc_files: | |
| # Check if output file already exists by creating expected filename | |
| try: | |
| # Quick check: read just enough to get timestamp/recording name | |
| with open(acc_file, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| data = json.loads(line) | |
| if data.get('dataType') == 'ACC': | |
| timestamp_ms = data.get('phoneTimestamp') | |
| recording_name = data.get('recordingName') | |
| expected_filename = create_filename(timestamp_ms, recording_name) | |
| if expected_filename in existing_outputs: | |
| print(f"\nSkipping {acc_file.relative_to(script_dir)} (already processed)") | |
| skipped += 1 | |
| break | |
| else: | |
| print(f"\n{acc_file.relative_to(script_dir)}") | |
| if process_single_file(acc_file, output_dir): | |
| processed += 1 | |
| break | |
| except: | |
| continue | |
| except Exception as e: | |
| print(f"\nError processing {acc_file}: {e}") | |
| print(f"\n{'='*60}") | |
| print(f"Summary:") | |
| print(f" Processed: {processed} files") | |
| print(f" Skipped: {skipped} files (already processed)") | |
| print(f" Total: {len(acc_files)} files") | |
| print(f"{'='*60}") | |
| def main(): | |
| if len(sys.argv) >= 2: | |
| # Single file mode | |
| input_file = sys.argv[1] | |
| if not Path(input_file).exists(): | |
| print(f"Error: File '{input_file}' not found") | |
| sys.exit(1) | |
| # Support both old .txt format and new .jsonl format | |
| input_path = Path(input_file) | |
| if input_path.suffix == '.jsonl': | |
| # New JSONL format | |
| process_single_file(input_path) | |
| else: | |
| # Old TXT format - legacy support | |
| print("Error: Old TXT format no longer supported. Please use ACC.jsonl files.") | |
| print("ACC.jsonl files are located in polar/<recording_name>/<device_name>/ACC.jsonl") | |
| sys.exit(1) | |
| else: | |
| # Auto-process mode | |
| print("Auto-processing mode: scanning polar/ for unprocessed ACC.jsonl files\n") | |
| auto_process_mode() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment