Created
September 1, 2022 21:07
-
-
Save pingzh/6717ff99b4ca31d5b02161f7999a9dd8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SchedulerFramework: | |
""" | |
General Rules: | |
1. SchedulerFramework does not assume scheduler implementation | |
2. SchedulerFramework does not parse dag files | |
""" | |
def __init__(self, executor, scheduler_cls): | |
self.executor = executor | |
self.scheduler = scheduler_cls(executor=self.executor, num_runs=-1) | |
def run(self): | |
self.before_run_setup() | |
while not self.loop_stop_criteria(): | |
self.scheduler.make_scheduling_decisions() # mark tis state as scheduled | |
# | |
self.scheduler.send_queuable_tasks_to_executor() # mark tis state as queued and send to executor | |
self.heartbeat() | |
self.scheduler.process_system_events() | |
self.before_exit() | |
def before_run_setup(self): | |
self.scheduler.before_run_setup() | |
def loop_stop_criteria(self): | |
return self.scheduler.should_exit_loop() | |
def heartbeat(self): | |
self.executor.heartbeat() | |
self.scheduler.heartbeat() | |
self.update_heartbeat_time() | |
def update_heartbeat_time(self): | |
pass | |
def before_exit(self): | |
self.scheduler.before_exit() | |
class BaseScheduler: | |
def __init__(self, executor): | |
self.executor = executor | |
def before_exit(self): | |
"""Before exit clean up""" | |
pass | |
def before_run_setup(self): | |
"""Set up before the scheduling loop""" | |
pass | |
def make_scheduling_decisions(self): | |
""" | |
scheduling decision means marking state of schedulable task instances as `scheduled`, it includes: | |
1. Create new dag runs if needed | |
2. Collect schedulable task instances | |
3. Manage task instance states, e.g. marking upstream_failed, skipped | |
4. Mark their state as `scheduled` | |
""" | |
pass | |
def send_queuable_tasks_to_executor(self): | |
""" | |
This method does: | |
1. Find queuable task instances by applying dag/task concurrency limit, pool slot quota etc | |
2. Update state of queuable task instances as `queued` | |
3. Send them to the executor | |
""" | |
pass | |
def process_system_events(self): | |
""" | |
This method processes system events including 1) zombie tasks 2) executor events | |
""" | |
pass | |
def should_exit_loop(self): | |
"""When to exit the scheduling loop""" | |
pass | |
def heartbeat(self): | |
"""Heartbeat the scheduler""" | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment