Created
November 29, 2023 06:10
-
-
Save geovedi/613b27a57adec2bc2a606c83bad2ebde to your computer and use it in GitHub Desktop.
How to make rolling hurst function faster — TLDR: Use Numba!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
from typing import Union, Callable | |
def generate_ohlcv_data( | |
start_date: str, end_date: str, freq: str = "D", seed: int = None | |
) -> pd.DataFrame: | |
if seed is not None: | |
np.random.seed(seed) | |
dates = pd.date_range(start=start_date, end=end_date, freq=freq) | |
open_prices = np.random.rand(len(dates)) * 100 # Random open prices | |
close_prices = ( | |
open_prices + np.random.randn(len(dates)) * 10 | |
) # Close prices near open | |
high_prices = np.maximum(open_prices, close_prices) + np.random.rand(len(dates)) * 5 | |
low_prices = np.minimum(open_prices, close_prices) - np.random.rand(len(dates)) * 5 | |
volume = np.random.randint(100, 10000, len(dates)) # Random volume | |
data = { | |
"open": open_prices, | |
"high": high_prices, | |
"low": low_prices, | |
"close": close_prices, | |
"volume": volume, | |
} | |
return pd.DataFrame(data, index=dates) | |
# smidelis -- | |
# https://discord.com/channels/700048804539400213/785934927011119124/1174743574953340940 | |
def smidelis_hurst(ts, max_lag=20): | |
"""Compute the Hurst Exponent of the time series vector ts""" | |
lags = range(2, max_lag) | |
tau = [np.std(np.subtract(ts[lag:], ts[:-lag])) for lag in lags] | |
return np.polyfit(np.log(lags), np.log(tau), 1)[0] | |
def smidelis_rolling_hurst(dataframe, window=128, max_lag=20, column="close"): | |
"""Compute rolling Hurst Exponent""" | |
hurst_exp = ( | |
dataframe[column] | |
.rolling(window=window) | |
.apply(lambda x: smidelis_hurst(x, max_lag=max_lag), raw=True) | |
) | |
return hurst_exp | |
# basic | |
def basic_hurst(ts: np.ndarray, max_lag: int = 20) -> float: | |
lags = np.arange(2, max_lag) | |
tau = [np.std(ts[lag:] - ts[:-lag]) for lag in lags] | |
tau_filtered = np.array(tau)[np.array(tau) > 0] | |
lags_filtered = lags[: len(tau_filtered)] | |
if len(tau_filtered) == 0 or np.any(tau_filtered <= 0): | |
return np.nan | |
return np.polyfit(np.log(lags_filtered), np.log(tau_filtered), 1)[0] | |
def basic_rolling_hurst( | |
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close" | |
) -> pd.Series: | |
series = dataframe[column] | |
result = np.full(len(series), np.nan) | |
for start in range(len(series) - window + 1): | |
window_ts = series.iloc[start : start + window] | |
if ( | |
not window_ts.isna().any() and np.ptp(window_ts) > 0 | |
): # Check for NaN and non-zero range | |
result[start + window - 1] = basic_hurst(window_ts.values, max_lag) | |
return pd.Series(result, index=series.index) | |
# test_01 | |
def test_01_hurst(ts: np.ndarray, max_lag: int = 20) -> float: | |
lags = np.arange(2, max_lag) | |
tau = [np.std(np.subtract(ts[lag:], ts[:-lag])) for lag in lags] | |
return np.polyfit(np.log(lags), np.log(tau), 1)[0] | |
def test_01_rolling_hurst( | |
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close" | |
) -> np.ndarray: | |
stride_size = dataframe[column].values.strides[0] | |
rolling_views = np.lib.stride_tricks.as_strided( | |
dataframe[column].values, | |
shape=(dataframe[column].size - window + 1, window), | |
strides=(stride_size, stride_size), | |
) | |
hurst_exp = np.apply_along_axis(test_01_hurst, 1, rolling_views, max_lag=max_lag) | |
return pd.Series( | |
np.concatenate([np.full(window - 1, np.nan), hurst_exp]), index=dataframe.index | |
) | |
# test_02 | |
def test_02_rolling_apply(df: pd.Series, window: int, func: Callable) -> np.ndarray: | |
result = np.full(df.size, np.nan) | |
for start in range(df.size - window + 1): | |
window_data = df.iloc[start : start + window].values | |
if not np.any(np.isnan(window_data)): | |
result[start + window - 1] = func(window_data) | |
return result | |
def test_02_rolling_hurst( | |
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close" | |
) -> pd.Series: | |
hurst_exp = test_02_rolling_apply( | |
dataframe[column], window, lambda x: test_01_hurst(x, max_lag) | |
) | |
return pd.Series(hurst_exp, index=dataframe.index) | |
# test_03 | |
def test_03_rolling_hurst( | |
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close" | |
) -> pd.Series: | |
def apply_hurst_on_window(window_data: np.ndarray) -> float: | |
if not np.isnan(window_data).any(): | |
return test_01_hurst(window_data, max_lag=max_lag) | |
else: | |
return np.nan | |
hurst_exp = ( | |
dataframe[column] | |
.rolling(window=window, min_periods=window) | |
.apply(apply_hurst_on_window, raw=True) | |
) | |
return hurst_exp | |
# optimized with numba | |
from numba import jit | |
@jit(nopython=True) | |
def fast_hurst(ts, max_lag, precomputed_log_lags): | |
tau = np.empty(max_lag - 2) | |
for idx, lag in enumerate(range(2, max_lag)): | |
lagged_diff = ts[lag:] - ts[:-lag] | |
tau[idx] = np.std(lagged_diff) | |
valid_tau = tau[tau > 0] | |
if len(valid_tau) == 0: | |
return np.nan | |
# Simple linear regression calculation -- Numba doesn't support all NumPy functions | |
n = len(valid_tau) | |
sum_x = precomputed_log_lags[:n].sum() | |
sum_y = np.log(valid_tau).sum() | |
sum_xy = (precomputed_log_lags[:n] * np.log(valid_tau)).sum() | |
sum_x_squared = (precomputed_log_lags[:n] ** 2).sum() | |
m = (n * sum_xy - sum_x * sum_y) / (n * sum_x_squared - sum_x**2) | |
return m | |
def optimized_rolling_hurst( | |
dataframe: pd.DataFrame, window: int = 128, max_lag: int = 20, column: str = "close" | |
) -> pd.Series: | |
series = dataframe[column].fillna(method="ffill") | |
result = np.full(len(series), np.nan) | |
precomputed_log_lags = np.log(np.arange(2, max_lag)) | |
for start in range(len(series) - window + 1): | |
window_ts = series.values[start : start + window] | |
if np.ptp(window_ts) > 0: | |
result[start + window - 1] = fast_hurst( | |
window_ts, max_lag, precomputed_log_lags | |
) | |
return pd.Series(result, index=dataframe.index) | |
def compare_series(series1, series2, tol=1e-5): | |
return np.isclose(series1, series2, equal_nan=True, atol=tol).all() | |
""" | |
df = generate_ohlcv_data('2021-01-01', '2021-12-31', 'D') | |
x0 = smidelis_rolling_hurst(df) | |
x1 = test_01_rolling_hurst(df) | |
x2 = test_02_rolling_hurst(df) | |
x3 = test_03_rolling_hurst(df) | |
x4 = optimized_rolling_hurst(df) | |
list(map(compare_series, [x0, x1, x2, x3, x4], [x0, x0, x0, x0, x0])) | |
In [195]: %timeit smidelis_rolling_hurst(df) | |
33.7 ms ± 24.2 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) | |
In [196]: %timeit test_01_rolling_hurst(df) | |
32.8 ms ± 45.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) | |
In [197]: %timeit test_02_rolling_hurst(df) | |
36.6 ms ± 39.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) | |
In [198]: %timeit test_03_rolling_hurst(df) | |
33.7 ms ± 48.3 µs per loop (mean ± std. dev. of 7 runs, 10 loops each) | |
In [199]: %timeit optimized_rolling_hurst(df) | |
1.9 ms ± 3.63 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each) | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment