Last active
September 7, 2025 15:29
-
-
Save 903124/2067553afa90ec771520c1512c369384 to your computer and use it in GitHub Desktop.
Updating https://github.com/AndRoo88/Baseball-Flight-Calculator to use real Statcast data as input
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Updating https://github.com/AndRoo88/Baseball-Flight-Calculator to use real Statcast data as input | |
| # Modify processing.py to change scipy deprecated rotation | |
| # Here one pitch is randomly selected from each pitch typea and calculate its trajectory using different seam orientation | |
| #from scipy.spatial.transform import Rotation as R | |
| def _create_rotation_from_matrix(matrix): | |
| # """ | |
| # Create a scipy Rotation object from a rotation matrix. | |
| # Handles compatibility between old (from_dcm) and new (from_matrix) scipy versions. | |
| # """ | |
| # try: | |
| # # Try new method first (scipy >= 1.4.0) | |
| # return R.from_matrix(matrix) | |
| # except AttributeError: | |
| # # Fall back to old method (scipy < 1.4.0) | |
| # return R.from_dcm(matrix) | |
| # def _get_rotation_vector(rotation_obj): | |
| # """ | |
| # Get rotation vector from scipy Rotation object. | |
| # Handles compatibility between old (as_rotvec) and new methods. | |
| # """ | |
| # try: | |
| # # Try current method | |
| # return rotation_obj.as_rotvec() | |
| # except AttributeError: | |
| # # Fall back if needed | |
| # return rotation_obj.as_rotvec() | |
| #Change line for incorperate Statcast velocity direction | |
| #while BallState0[1] < 60.5 and BallState0[2] > 0. and t < 20: | |
| # .... | |
| import os | |
| import math | |
| import pandas as pd | |
| import numpy as np | |
| # Run from within umba/ so local imports work | |
| import processing | |
| # ---- Seam orientation scenarios (degrees) ---- | |
| DEFAULT_SEAM_SCENARIOS = [ | |
| {"name": "baseline_Y0_Z0", "Yangle": 0.0, "Zangle": 0.0}, | |
| {"name": "Y45_Z0", "Yangle": 45.0, "Zangle": 0.0}, | |
| {"name": "Y0_Z45", "Yangle": 0.0, "Zangle": 45.0}, | |
| {"name": "Y90_Z0", "Yangle": 90.0, "Zangle": 0.0}, | |
| {"name": "Y0_Z90", "Yangle": 0.0, "Zangle": 90.0}, | |
| {"name": "Y45_Z45", "Yangle": 45.0, "Zangle": 45.0}, | |
| {"name": "Y-45_Z45", "Yangle": -45.0, "Zangle": 45.0}, | |
| ] | |
| # ---- Helpers from your provided initial-condition mapping ---- | |
| def row_to_umba_inputs(pitch_row): | |
| # Positions (ft) | |
| x0 = pitch_row['release_pos_x'] | |
| # Use Statcast 'release_pos_y' if present; otherwise derive from extension (ft) | |
| y0 = pitch_row['release_pos_y'] if pd.notna(pitch_row.get('release_pos_y', np.nan)) else (60.5 - pitch_row['release_extension']) | |
| z0 = pitch_row['release_pos_z'] | |
| # Velocity components (ft/s) | |
| vx_fts = pitch_row['vx0'] | |
| vy_fts = pitch_row['vy0'] | |
| vz_fts = pitch_row['vz0'] | |
| # Total velocity (mph) | |
| # v_total_fts = math.sqrt(vx_fts**2 + vy_fts**2 + vz_fts**2) | |
| Vtot_mph = pitch_row['release_speed'] | |
| # Release angles (deg) | |
| horizontal_speed_fts = math.sqrt(vx_fts**2 + vy_fts**2) | |
| Theta = math.degrees(math.atan2(vz_fts, horizontal_speed_fts)) # vertical angle | |
| Psi = math.degrees(math.atan2(vx_fts, vy_fts)) # horizontal angle | |
| # Spin rate (rpm) | |
| SpinRate = pitch_row.get('release_spin_rate', pitch_row.get('spin_rate', 2000.0)) | |
| # Spin axis (deg) -> clock tilt (hours, minutes) | |
| spin_axis_deg = pitch_row.get('spin_axis', 180.0) | |
| tilt_hours = ((360.0 - spin_axis_deg + 90.0) % 360.0) / 30.0 | |
| Tiltm = int((tilt_hours % 1) * 60) | |
| TiltH = int(tilt_hours) | |
| # Spin efficiency (%) from active_spin column | |
| SpinE = float(pitch_row['active_spin']) | |
| # Gyro pole forward: derive a deterministic choice without user input. | |
| # Use handedness as proxy: R -> 'l', L -> 'r' (consistent, no prompts). | |
| p_throws = str(pitch_row.get('p_throws', 'R') or 'R').upper() | |
| LorR = 'l' if p_throws == 'R' else 'r' | |
| # Consistent with processing.PitchedBallTraj(y0 := 60.5 - y), | |
| # pass y as Statcast release_pos_y (or fallback 60.5 - extension). | |
| x, y, z = x0, y0, z0 | |
| return x, y, z, Vtot_mph, Theta, Psi, SpinRate, TiltH, Tiltm, SpinE, LorR | |
| def pick_one_pitch_per_type(df): | |
| # Drop rows missing critical fields | |
| needed = ['pitch_type','release_pos_x','release_pos_z','vx0','vy0','vz0','active_spin'] | |
| # Allow release_pos_y OR release_extension to be present | |
| df = df.dropna(subset=[c for c in needed if c != 'release_pos_y']) | |
| df = df[ (pd.notna(df.get('release_pos_y', np.nan))) | (pd.notna(df.get('release_extension', np.nan))) ] | |
| # Also need spin axis and spin rate ideally | |
| df = df.dropna(subset=['spin_axis','release_spin_rate'], how='any') | |
| # Take first per pitch_type | |
| subset = df.groupby('pitch_type', as_index=False).head(1) | |
| return subset | |
| def load_pitch_stat_csv(): | |
| # Prefer a 2024 file with active_spin if present | |
| preferred = [ | |
| os.path.join('../Statcast_data', 'Statcast_2024_active_spin.csv'), | |
| os.path.join('../Statcast_data', 'Statcast_2024.csv'), | |
| ] | |
| for p in preferred: | |
| if os.path.exists(p): | |
| return p | |
| # Fallback: pick the most recent year in Statcast_data | |
| statcast_dir = 'Statcast_data' | |
| if not os.path.isdir(statcast_dir): | |
| raise FileNotFoundError("Statcast_data directory not found.") | |
| candidates = [os.path.join(statcast_dir, f) for f in os.listdir(statcast_dir) | |
| if f.lower().endswith('.csv')] | |
| if not candidates: | |
| raise FileNotFoundError("No CSV files found in Statcast_data.") | |
| # Most recent by filename | |
| candidates.sort(reverse=True) | |
| return candidates[0] | |
| def run_from_df(pitch_stat_df, seam_scenarios=DEFAULT_SEAM_SCENARIOS, out_csv="umba/seam_scenarios_results.csv"): | |
| subset = pick_one_pitch_per_type(pitch_stat_df) | |
| rows = [] | |
| for _, row in subset.iterrows(): | |
| try: | |
| x, y, z, Vtot, Theta, Psi, SpinRate, TiltH, Tiltm, SpinE, LorR = row_to_umba_inputs(row) | |
| except Exception as e: | |
| # Skip problematic rows | |
| continue | |
| for scen in seam_scenarios: | |
| Yangle = scen['Yangle'] | |
| Zangle = scen['Zangle'] | |
| # i: iteration index; seamsOn: True; FullRot: False (no animation) | |
| try: | |
| result = processing.umba( | |
| x, y, z, | |
| Vtot, Theta, Psi, | |
| SpinRate, | |
| TiltH, Tiltm, | |
| SpinE, | |
| Yangle, Zangle, | |
| LorR, | |
| 0, # i | |
| True, # seamsOn | |
| False # FullRot | |
| ) | |
| (pX,pY,pZ,IX,IY,IZ,DX,DY,DZ,FX,FY,FZ,TF,aX,aY,aZ,TiltTime) = result | |
| rows.append({ | |
| "pitch_type": row['pitch_type'], | |
| "description": scen['name'], | |
| "Yangle": Yangle, | |
| "Zangle": Zangle, | |
| "spin_axis": row.get('spin_axis', np.nan), | |
| "active_spin": row.get('active_spin', np.nan), | |
| "release_speed": row.get('release_speed', np.nan), | |
| "p_throws": row.get('p_throws', np.nan), | |
| "FX_ft": FX, "FY_ft": FY, "FZ_ft": FZ, "TF_s": TF | |
| }) | |
| except Exception as e: | |
| print('a') | |
| # Keep going even if one scenario fails | |
| rows.append({ | |
| "pitch_type": row['pitch_type'], | |
| "description": scen['name'], | |
| "Yangle": Yangle, | |
| "Zangle": Zangle, | |
| "error": str(e) | |
| }) | |
| results = pd.DataFrame(rows) | |
| os.makedirs(os.path.dirname(out_csv), exist_ok=True) | |
| results.to_csv(out_csv, index=False) | |
| return results | |
| def main(): | |
| csv_path = load_pitch_stat_csv() | |
| usecols = [ | |
| 'pitch_type','release_pos_x','release_pos_y','release_pos_z', | |
| 'release_extension','release_speed', | |
| 'vx0','vy0','vz0','ax','ay','az', | |
| 'release_spin_rate','spin_axis','active_spin','p_throws' | |
| ] | |
| # Read lazily only needed columns; missing ones are tolerated | |
| df = pd.read_csv(csv_path, usecols=[c for c in usecols if c in pd.read_csv(csv_path, nrows=0).columns]) | |
| # Ensure active_spin exists (if provided file already has it) | |
| if 'active_spin' not in df.columns: | |
| raise ValueError("active_spin column not found in the selected Statcast CSV.") | |
| results = run_from_df(df, DEFAULT_SEAM_SCENARIOS, out_csv="umba/seam_scenarios_results.csv") | |
| print(f"Wrote {len(results)} scenario rows to umba/seam_scenarios_results.csv") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment