Skip to content

Instantly share code, notes, and snippets.

View kylegallatin's full-sized avatar

Kyle Gallatin kylegallatin

View GitHub Profile
import sys
import grpc
def get_grpc_connection(host_port: str, secure: bool):
if secure:
credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel(host_port, credentials)
else:
channel = grpc.insecure_channel(host_port)
return channel
@kylegallatin
kylegallatin / train_online_model.py
Created December 21, 2022 13:14
Training a logistic regression model with River
from river import linear_model, compose, preprocessing, metrics
# create logistic regression model with a standard scaler
model = compose.Pipeline(
preprocessing.StandardScaler(),
linear_model.LogisticRegression()
)
# use ROCAUC to evaluate our model
metric = metrics.ROCAUC()
import pickle
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
x, y = make_classification(n_samples=100,
n_features=3,
n_informative=2,
n_classes=2,
n_redundant=0,
n_repeated=0,)
import time
class PerformanceMonitor:
def __init__(self):
self.inference_times = []
def record(self, prediction_time: float) -> None:
self.inference_times.append(prediction_time)
def load_latest_model() -> sklearn.pipeline.Pipeline:
latest_model_path = pd.read_csv("metadata_store.csv").query("timestamp == timestamp.max()")["model_path"].values[0]
loaded_model = joblib.load(latest_model_path)
return loaded_model
def model_predict(user_id: str) -> float:
model = load_latest_model()
features = feature_store.get_user_feature(user_id)
return model.predict([[v for k,v in features.items() if k != "target"]])
class TestExample:
def test_assertion(self):
assert True==True
import time
import joblib
import csv
from datetime import datetime
import pandas as pd
import logging
from typing import Iterable, Callable, Dict
import sklearn.pipeline
from sklearn.pipeline import make_pipeline
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
## logistic regression parameter config
parameters = {
"penalty":"l2",
"C":1.0,
"max_iter": 100
import csv
import joblib
import sklearn.pipeline
from datetime import datetime
## helper function to save a model/pipeline
def save_pipeline(pipeline: sklearn.pipeline.Pipeline, ts: datetime = None) -> str:
model_path = f"data/pipeline-{ts}.pkl".replace(" ","-")
joblib.dump(pipeline, model_path)
return model_path
from typing import Iterable, Callable, Dict
class SuperSimpleFeatureStore:
def __init__(self, dataframe: pd.DataFrame):
self.dataframe = dataframe
self.feature_dict = dataframe.set_index("user_id").T.to_dict()
def register_feature(self, name: str, feature_definition: Callable) -> None:
for key in self.feature_dict: