Source code for pyretis.simulation.scheduler

"""Main infinite-swapping scheduler loop."""

import copy

from pyretis.simulation.setup import setup_internal, setup_runner


[docs]def scheduler(config): """Run the infinite-swapping scheduler loop.""" # setup repex, runner and futures md_items, state = setup_internal(config) runner, futures = setup_runner(state) try: # submit the first number of workers while state.initiate(): # give each worker its own md_items worker_md_items = copy.deepcopy(md_items) # pick and prep ens and path for the next job worker_md_items = state.prep_md_items(worker_md_items) # submit job to scheduler futures.add(runner.submit_work(worker_md_items)) # main step loop while state.loop(): # Get futures as they are completed future = futures.as_completed() if future: worker_md_items = state.treat_output(future.result()) # submit new job if state.cstep + state.workers <= state.tsteps: # chose ens and path for the next job worker_md_items = state.prep_md_items(worker_md_items) # submit job to scheduler futures.add(runner.submit_work(worker_md_items)) # end client (normal, graceful shutdown) runner.stop() except BaseException: # On any error or interrupt (including the KeyboardInterrupt # raised from the SIGTERM handler), force-release the worker pool # so it is never orphaned, then re-raise. runner.close() raise