Skip to content

Instantly share code, notes, and snippets.

@903124
Created August 29, 2025 15:41
Show Gist options
  • Select an option

  • Save 903124/554a389307fed76841fb95d58f01f1c5 to your computer and use it in GitHub Desktop.

Select an option

Save 903124/554a389307fed76841fb95d58f01f1c5 to your computer and use it in GitHub Desktop.
import os
from typing import Dict, List
import numpy as np
import pandas as pd
DRS_DIR = os.path.join("Statcast_data", "DRS")
def list_drs_position_files(drs_dir: str) -> List[str]:
"""Return list of absolute paths to OAA_*.csv files in the DRS directory."""
files = []
for fname in os.listdir(drs_dir):
if fname.lower().startswith("oaa_") and fname.lower().endswith(".csv"):
files.append(os.path.join(drs_dir, fname))
return sorted(files)
def read_innings(drs_dir: str) -> pd.DataFrame:
"""Read innings.csv and prepare columns for merging.
The file contains outs by defensive position. Map to innings by dividing by 3.
"""
innings_path = os.path.join(drs_dir, "innings.csv")
df = pd.read_csv(innings_path)
# Normalize column names
rename_map = {
"id": "player_id",
"name": "player_name",
}
df = df.rename(columns=rename_map)
# Ensure types
if "player_id" in df.columns:
df["player_id"] = pd.to_numeric(df["player_id"], errors="coerce").astype("Int64")
if "year" in df.columns:
df["year"] = pd.to_numeric(df["year"], errors="coerce").astype("Int64")
# Compute innings per positional outs columns
outs_cols_to_pos: Dict[str, str] = {
"outs_2": "C",
"outs_3": "1B",
"outs_4": "2B",
"outs_5": "3B",
"outs_6": "SS",
"outs_7": "LF",
"outs_8": "CF",
"outs_9": "RF",
# Note: outs_2 corresponds to Catcher
}
for outs_col, pos in outs_cols_to_pos.items():
if outs_col in df.columns:
df[f"innings_{pos}"] = pd.to_numeric(df[outs_col], errors="coerce") / 3.0
else:
df[f"innings_{pos}"] = np.nan
# Bring in catching_runs; ensure column exists
df["catching_runs"] = pd.to_numeric(df.get("catching_runs", np.nan), errors="coerce")
return df[[
"player_id",
"player_name",
"year",
"innings_C",
"innings_1B",
"innings_2B",
"innings_3B",
"innings_SS",
"innings_LF",
"innings_CF",
"innings_RF",
"catching_runs",
]]
def read_oaa_position_file(path: str) -> pd.DataFrame:
"""Read a single OAA_<pos>.csv file and return a normalized DataFrame.
Position is derived from the file name (e.g., OAA_ss.csv -> SS), not the file contents.
"""
df = pd.read_csv(path)
# Derive position from filename
fname = os.path.basename(path)
# Expected patterns: OAA_1b.csv, OAA_2b.csv, OAA_3b.csv, OAA_ss.csv, OAA_lf.csv, OAA_cf.csv, OAA_rf.csv
suffix = fname.split("OAA_", 1)[1].rsplit(".", 1)[0].strip().lower()
suffix_map = {
"1b": "1B",
"2b": "2B",
"3b": "3B",
"ss": "SS",
"lf": "LF",
"cf": "CF",
"rf": "RF",
}
pos = suffix_map.get(suffix)
# Normalize columns and types (do not rely on in-file pos)
col_map = {
"last_name, first_name": "player_name",
"player_id": "player_id",
"year": "year",
"fielding_runs_prevented": "fielding_runs_prevented",
"diff_success_rate_formatted": "diff_success_rate_formatted"
}
existing_cols = {orig: new for orig, new in col_map.items() if orig in df.columns}
df = df[list(existing_cols.keys())].rename(columns=existing_cols)
# Attach parsed position
df["pos"] = pos
# Coerce types
if "player_id" in df.columns:
df["player_id"] = pd.to_numeric(df["player_id"], errors="coerce").astype("Int64")
if "year" in df.columns:
df["year"] = pd.to_numeric(df["year"], errors="coerce").astype("Int64")
df["fielding_runs_prevented"] = pd.to_numeric(
df["fielding_runs_prevented"], errors="coerce"
)
df["diff_success_rate_formatted"] = pd.to_numeric(
df["diff_success_rate_formatted"].str.rstrip("%"), errors="coerce"
)
# Drop rows without essential fields
df = df.dropna(subset=["player_id", "year", "pos", "fielding_runs_prevented"]).copy()
return df
def combine_all_positions(drs_dir: str) -> pd.DataFrame:
"""Combine all OAA position files into a single normalized DataFrame."""
all_files = list_drs_position_files(drs_dir)
frames: List[pd.DataFrame] = []
for path in all_files:
frames.append(read_oaa_position_file(path))
if not frames:
return pd.DataFrame(columns=[
"player_id", "player_name", "year", "pos", "fielding_runs_prevented",
"innings", "per162_runs",
])
return pd.concat(frames, ignore_index=True)
def attach_innings(oaa_df: pd.DataFrame, innings_df: pd.DataFrame) -> pd.DataFrame:
"""Attach innings per player-year-position by merging with innings_df and mapping pos-specific innings."""
merged = oaa_df.merge(
innings_df,
on=["player_id", "year"],
how="left",
suffixes=("", "_inn"),
)
pos_to_innings_col: Dict[str, str] = {
"C": "innings_C",
"1B": "innings_1B",
"2B": "innings_2B",
"3B": "innings_3B",
"SS": "innings_SS",
"LF": "innings_LF",
"CF": "innings_CF",
"RF": "innings_RF",
}
def pick_innings(row: pd.Series) -> float:
col = pos_to_innings_col.get(row["pos"]) # type: ignore[index]
if col is None or col not in row:
return np.nan
return row[col]
merged["innings"] = merged.apply(pick_innings, axis=1)
return merged
def compute_per162(merged_df: pd.DataFrame) -> pd.DataFrame:
"""Compute per-162 games (1458 innings) run values for each player-year-position."""
df = merged_df.copy()
# Avoid division by zero
df["innings"] = pd.to_numeric(df["innings"], errors="coerce")
df.loc[df["innings"] <= 0, "innings"] = np.nan
df["per162_runs"] = (df["fielding_runs_prevented"] / df["innings"]) * 1458.0
return df
def build_position_shift_matrix(per_pos_df: pd.DataFrame, include_c: bool = False) -> pd.DataFrame:
"""Build a harmonic-mean weighted matrix using same-player cross-season deltas.
For each player, form all season pairs between positions i and j:
d = per162(j, season_b) - per162(i, season_a)
w = 2 * inn_i(season_a) * inn_j(season_b) / (inn_i(season_a) + inn_j(season_b))
Aggregate weighted means over all players and all such season pairs.
Positions considered: 1B, 2B, 3B, SS, LF, CF, RF.
If include_c=True, include C as well.
"""
positions: List[str] = (["C"] if include_c else []) + ["1B", "2B", "3B", "SS", "LF", "CF", "RF"]
# Keep only valid rows
df = per_pos_df.dropna(subset=["pos", "per162_runs", "innings"]).copy()
df = df[df["innings"] > 0]
deltas = pd.DataFrame(index=positions, columns=positions, dtype=float)
counts = pd.DataFrame(index=positions, columns=positions, dtype="Int64")
weight_sums = pd.DataFrame(index=positions, columns=positions, dtype=float)
# Initialize accumulators
total_weighted_sum: Dict[tuple, float] = {(i, j): 0.0 for i in positions for j in positions}
total_weight: Dict[tuple, float] = {(i, j): 0.0 for i in positions for j in positions}
total_count: Dict[tuple, int] = {(i, j): 0 for i in positions for j in positions}
for (player_id, player_name), g in df.groupby(["player_id", "player_name"], dropna=False):
pos_groups: Dict[str, pd.DataFrame] = {p: sub for p, sub in g.groupby("pos")}
for i in positions:
gi = pos_groups.get(i)
if gi is None:
continue
vi = gi["per162_runs"].to_numpy(dtype=float)
wi = gi["innings"].to_numpy(dtype=float)
for j in positions:
if i == j:
# Self deltas set to 0 with weight equal to sum of innings at the position
total_weight[(i, j)] += float(wi.sum())
total_count[(i, j)] += int(gi.shape[0])
continue
gj = pos_groups.get(j)
if gj is None:
continue
vj = gj["per162_runs"].to_numpy(dtype=float)
wj = gj["innings"].to_numpy(dtype=float)
# Pairwise deltas via outer difference and harmonic mean weights
# Shape: (len(gi), len(gj))
diff = vj[None, :] - vi[:, None]
denom = (wi[:, None] + wj[None, :])
with np.errstate(divide='ignore', invalid='ignore'):
w = (2.0 * wi[:, None] * wj[None, :]) / denom
# Filter valid weights
mask = np.isfinite(w) & (w > 0)
if not np.any(mask):
continue
weighted_sum = float(np.sum(diff[mask] * w[mask]))
weight_sum = float(np.sum(w[mask]))
count_pairs = int(mask.sum())
total_weighted_sum[(i, j)] += weighted_sum
total_weight[(i, j)] += weight_sum
total_count[(i, j)] += count_pairs
for i in positions:
for j in positions:
wsum = total_weight[(i, j)]
deltas.loc[i, j] = total_weighted_sum[(i, j)] / wsum if wsum > 0 else (0.0 if i == j else np.nan)
counts.loc[i, j] = total_count[(i, j)]
weight_sums.loc[i, j] = wsum
deltas.index.name = "from_pos"
deltas.columns.name = "to_pos"
counts.index.name = "from_pos"
counts.columns.name = "to_pos"
weight_sums.index.name = "from_pos"
weight_sums.columns.name = "to_pos"
return deltas, counts, weight_sums
def solve_position_values_from_deltas(deltas: pd.DataFrame, weights: pd.DataFrame | None = None) -> pd.DataFrame:
"""Solve for per-position values x such that x_j - x_i ~= deltas[i,j] (weighted LS).
If weights is provided, each equation for pair (i,j) is weighted by sqrt(weights[i,j]).
Returns a DataFrame [pos, solved_value] where values are centered to mean 0.
"""
positions = list(deltas.index)
pos_to_idx: Dict[str, int] = {p: i for i, p in enumerate(positions)}
rows: List[List[float]] = []
rhs: List[float] = []
scales: List[float] = []
for i, pi in enumerate(positions):
for j, pj in enumerate(positions):
if j <= i:
continue
val = deltas.loc[pi, pj]
if pd.isna(val):
continue
w = None
if weights is not None:
try:
w = float(weights.loc[pi, pj])
except Exception:
w = None
scale = np.sqrt(max(w, 0.0)) if (w is not None) else 1.0
row = [0.0] * len(positions)
row[pos_to_idx[pi]] = -1.0
row[pos_to_idx[pj]] = 1.0
rows.append(row)
rhs.append(float(val))
scales.append(scale)
if not rows:
return pd.DataFrame({"pos": positions, "solved_value": [np.nan] * len(positions)})
A = np.asarray(rows)
b = np.asarray(rhs)
s = np.asarray(scales)
if s.ndim == 1:
s = s.reshape(-1, 1)
A = A * s
b = b * s.ravel()
x, *_ = np.linalg.lstsq(A, b, rcond=None)
x = x - np.mean(x)
return pd.DataFrame({"pos": positions, "solved_value": x})
def main() -> None:
innings_df = read_innings(DRS_DIR)
oaa_df = combine_all_positions(os.path.join(DRS_DIR, "Positional_OAA"))
if oaa_df.empty:
print("No OAA position files found in Statcast_data/DRS")
return
merged = attach_innings(oaa_df, innings_df)
per162_df = compute_per162(merged)
include_c = False
# Optionally append catcher rows synthesized from innings.csv (catching_runs + innings_C)
if include_c:
c_base = innings_df[[
"player_id", "player_name", "year", "catching_runs", "innings_C"
]].copy()
c_base = c_base.dropna(subset=["player_id", "year", "catching_runs", "innings_C"]).copy()
c_base = c_base[c_base["innings_C"] > 0]
catcher_oaa_like = pd.DataFrame({
"player_id": c_base["player_id"].astype("Int64"),
"player_name": c_base["player_name"],
"year": c_base["year"].astype("Int64"),
"pos": "C",
"fielding_runs_prevented": pd.to_numeric(c_base["catching_runs"], errors="coerce")
})
catcher_merged = attach_innings(catcher_oaa_like, innings_df)
catcher_per162 = compute_per162(catcher_merged)
# Keep same essential columns
catcher_per162 = catcher_per162[[
"player_id", "player_name", "year", "pos",
"fielding_runs_prevented", "innings", "per162_runs"
]]
per162_df = pd.concat([per162_df, catcher_per162], ignore_index=True)
# Retain essential columns and order
per162_df = per162_df[[
"player_id",
"player_name",
"year",
"pos",
"fielding_runs_prevented",
"innings",
"per162_runs",
]].sort_values(["player_name", "year", "pos"]).reset_index(drop=True)
out_per162_path = os.path.join(
DRS_DIR, "DRS_per162_player_position.csv"
)
per162_df.to_csv(out_per162_path, index=False)
shift_matrix, counts, weight_sums = build_position_shift_matrix(per162_df, include_c=include_c)
out_matrix_path = os.path.join(DRS_DIR, "DRS_position_shift_matrix_mean_delta.csv")
out_counts_path = os.path.join(DRS_DIR, "DRS_position_shift_matrix_counts.csv")
out_weights_path = os.path.join(DRS_DIR, "DRS_position_shift_matrix_weight_sums.csv")
shift_matrix.to_csv(out_matrix_path)
counts.to_csv(out_counts_path)
weight_sums.to_csv(out_weights_path)
# Solve per-position values from deltas (weighted least squares), also write 1B-anchored
solved = solve_position_values_from_deltas(shift_matrix, weight_sums)
out_solved_path = os.path.join(DRS_DIR, "DRS_position_values_solved.csv")
solved.to_csv(out_solved_path, index=False)
try:
anchor_val = float(solved.loc[solved["pos"] == "1B", "solved_value"].iloc[0])
solved_anchored = solved.copy()
solved_anchored["solved_value"] = solved_anchored["solved_value"] - anchor_val
except Exception:
solved_anchored = solved.copy()
out_solved_anchored_path = os.path.join(DRS_DIR, "DRS_position_values_solved_1Banchored.csv")
solved_anchored.to_csv(out_solved_anchored_path, index=False)
# Also print a quick summary
print(f"Wrote per-player per-position per-162 runs to: {out_per162_path}")
print(f"Wrote position shift mean delta matrix to: {out_matrix_path}")
print(f"Wrote position shift counts matrix to: {out_counts_path}")
print(f"Wrote position shift weight sums to: {out_weights_path}")
print(f"Wrote solved position values (mean-centered) to: {out_solved_path}")
print(f"Wrote solved position values anchored at 1B=0 to: {out_solved_anchored_path}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment