Skip to content

Instantly share code, notes, and snippets.

@cnolanminich
cnolanminich / concurrent_airflow_dagruns.py
Created October 20, 2025 18:23
concurrent_airflow_dagruns
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.settings import Session
from airflow.models import DagRun
from datetime import timedelta
import pandas as pd
import numpy as np
# Constants for flexibility
@cnolanminich
cnolanminich / jenkinsfile
Created October 17, 2025 20:14
example hybrid jenkinsfile
pipeline {
agent any
environment {
// Dagster Cloud Configuration
DAGSTER_CLOUD_ORGANIZATION = 'your-org-name' // Replace with your Dagster+ org name
DAGSTER_CLOUD_API_TOKEN = credentials('dagster-cloud-api-token') // Jenkins credential ID
DAGSTER_BUILD_STATEDIR = "${WORKSPACE}/build_state"
// AWS ECR Configuration
@cnolanminich
cnolanminich / high_velocity_for_current_customers_or_active_poc.sql
Created August 27, 2025 13:57
high velocity credits for current POC customers / dagster+ customers
with daily as(
select
ds::date as reporting_date,
sum(standard_credits) as daily_standard_credits,
sum(high_velocity_credits) as daily_high_velo_credits,
daily_standard_credits + daily_high_velo_credits as total_credits_daily
from purina.product.usage_metrics_daily
where usage_metrics_daily.organization_id = {org_id here}
group by ds::date
)
@cnolanminich
cnolanminich / simple_databricks_asset.py
Created July 23, 2025 17:28
simple_databricks_asset
# hooli-ml/databricks_mlops/dagster_pipeline.py
import dagster as dg
from hooli_ml.defs.resources import get_env, MLWorkflowConfig, DatabricksResource
from databricks.sdk.service import jobs
NOTEBOOK_ROOT_PATH = "/Users/[email protected]/.bundle/databricks_mlops/dev/files/"
@dg.asset(kinds={"databricks", "feature_engineering"})
def feature_engineering_pickup(context: dg.AssetExecutionContext, databricks_resource: DatabricksResource):
run_id = databricks_resource.run_and_stream_notebook_logs_sdk(
@cnolanminich
cnolanminich / automation_asset.py
Created June 5, 2025 18:35
automation conditions that update once per period once all upstreams are materialized, and then again if any upstreams are materialized within that timeframe
# runs at 5 minutes past the hour for demonstration purposes.
daily_success_condition = (
dg.AutomationCondition.newly_updated()
.since(dg.AutomationCondition.on_cron("*/5 * * * *"))
)
custom_condition = (
dg.AutomationCondition.on_cron("*/5 * * * *") # Runs at 9 AM daily
| (
dg.AutomationCondition.any_deps_updated() # When any dependency updates
@cnolanminich
cnolanminich / historical_concurrency.sql
Last active October 1, 2025 14:15
Get historical concurrency for Dagster OSS instance
with seconds_in_day as (
select generate_series(0, 86400 - 1) as sec
)
, base_days as (
select generate_series(current_date - 7,current_date,'1 day'::interval) as days
),
days_and_seconds as (
select
base_days.days,
base_days.days + (seconds_in_day.sec || ' seconds')::interval as start_bucket,
@cnolanminich
cnolanminich / credits.sql
Created May 14, 2025 16:35
Dagster OSS Credit estimate
with events as (select distinct
DATE_FORMAT(timestamp, '%Y-%m') as event_month,
dagster_event_type,
coalesce(run_id, '||', step_key) as step_id,
count(1) as credits
from event_logs
where dagster_event_type = 'STEP_START'
@cnolanminich
cnolanminich / dbt_sources_as_external_assets.py
Created January 16, 2025 16:59
example of building dbt external assets from
import json
import textwrap
from typing import Any, Mapping, List, Tuple
from dagster import (
AutomationCondition,
AssetKey,
BackfillPolicy,
DailyPartitionsDefinition,
job,
op,
@cnolanminich
cnolanminich / deployment
Created January 7, 2025 21:15
Dagster GitHub Action with releases for prod and dev for pushes to main
name: Dagster Cloud Hybrid Deployment
on:
push: # For full deployment
branches:
- "main"
- "master"
pull_request: # For branch deployments
types: [opened, synchronize, reopened, closed]
release:
types: [published]
@cnolanminich
cnolanminich / postgres_resource.py
Created December 11, 2024 20:39
example postgres resource
import pandas as pd
from dagster import asset, ResourceParam
from sqlalchemy import create_engine
# Define the PostgreSQL resource
class PostgresResource:
def __init__(self, connection_string):
self.connection_string = connection_string
def get_connection(self):