Source code for pyretis.simulation.scheduler
"""Main infinite-swapping scheduler loop."""
import copy
from pyretis.simulation.setup import setup_internal, setup_runner
[docs]def scheduler(config):
"""Run the infinite-swapping scheduler loop."""
# setup repex, runner and futures
md_items, state = setup_internal(config)
runner, futures = setup_runner(state)
try:
# submit the first number of workers
while state.initiate():
# give each worker its own md_items
worker_md_items = copy.deepcopy(md_items)
# pick and prep ens and path for the next job
worker_md_items = state.prep_md_items(worker_md_items)
# submit job to scheduler
futures.add(runner.submit_work(worker_md_items))
# main step loop
while state.loop():
# Get futures as they are completed
future = futures.as_completed()
if future:
worker_md_items = state.treat_output(future.result())
# submit new job
if state.cstep + state.workers <= state.tsteps:
# chose ens and path for the next job
worker_md_items = state.prep_md_items(worker_md_items)
# submit job to scheduler
futures.add(runner.submit_work(worker_md_items))
# end client (normal, graceful shutdown)
runner.stop()
except BaseException:
# On any error or interrupt (including the KeyboardInterrupt
# raised from the SIGTERM handler), force-release the worker pool
# so it is never orphaned, then re-raise.
runner.close()
raise