Last active
October 30, 2023 09:43
-
-
Save perillo/816876d348be36b7727c9cc8c96a53f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from math import nan | |
import sys | |
from garmin_fit_sdk import Decoder, Stream | |
import numpy as np | |
from numpy.lib.stride_tricks import sliding_window_view | |
def load(path): | |
"""load loads a FIT file. | |
Returns the FIT messages and errors. | |
""" | |
stream = Stream.from_file(path) | |
decoder = Decoder(stream) | |
ok = decoder.check_integrity() | |
messages, errors = decoder.read() | |
if not ok: | |
# Ensure that an integrity error is always reported. | |
errors.insert(0, f"{path}: integrity error") | |
return messages, errors | |
def average(x): | |
"""Returns the average of the array elements, ignoring NaNs.""" | |
return np.nanmean(x) | |
def average_nonzero(x): | |
"""Returns the average of the array elements, ignoring zeroes.""" | |
return np.nanmean(x[np.nonzero(x)]) | |
def moving_average(x, window, rate=1): | |
"""Return the moving average (aka rolling average) of array x, using the | |
specified window, as seconds. | |
The array x is assumed to be sampled at a constant rate, in seconds. The | |
sampling rate rate is measured in hertz. | |
NaNs are ignored when computing the average value of each sliding window. | |
""" | |
windowsize = window * rate | |
if x.size < windowsize <= 0: | |
return 0 | |
return np.nanmean(sliding_window_view(x, window_shape=window), axis=1) | |
def normalized_power(x, rate=1): | |
"""Returns the normalized power. | |
The sampling rate is measured in hertz. | |
""" | |
rolling = moving_average(x, 30, rate) | |
tmp = np.power(rolling, 4) | |
avg = np.average(tmp) | |
return pow(avg, 1 / 4) | |
def main(): | |
messages, errors = load(sys.argv[1]) | |
records = messages["record_mesgs"] | |
size = len(records) | |
cadence_array = np.empty(size) | |
heart_rate_array = np.empty(size) | |
power_array = np.empty(size) | |
speed_array = np.empty(size) | |
temperature_array = np.empty(size) | |
for i, record in enumerate(records): | |
cadence_array[i] = record.get("cadence", nan) | |
heart_rate_array[i] = record.get("heart_rate", nan) | |
power_array[i] = record.get("power", nan) | |
speed_array[i] = record.get("speed", nan) | |
temperature_array[i] = record.get("temperature", nan) | |
print("avg_cadence:", average_nonzero(cadence_array)) | |
print("avg_heart_rate:", average(heart_rate_array)) | |
print("avg_power:", average(power_array)) | |
print("avg_speed:", average(speed_array)) | |
print("agv_temperature:", average(temperature_array)) | |
print("normalized power:", normalized_power(power_array)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment