Mathis Börner
07/24/2019, 10:29 AMimport time
import prefect
from prefect import task, Flow, Parameter
from prefect import tags
from prefect.engine import signals
from prefect.environments.storage import LocalStorage
from prefect.environments import LocalEnvironment
from prefect.utilities.tasks import pause_task
from prefect.engine.state import Resume
@task
def hold(cfg):
pause_task()
return cfg
@task
def process(cfg):
try:
return prefect.contex.key + cfg['value']
except:
raise signals.FAIL()
def prepare_flow(flow, cfg, key, name, storage):
with prefect.context(key=key):
flow.run(cfg=cfg)
previous_name, flow.name = flow.name, name
storage_name = storage.add_flow(flow)
flow.name = previous_name
return storage_name
def run_flow(flow_name, storage):
flow_test = storage.get_flow(flow_name)
resume_states = {t: Resume()
for t in flow_test.get_tasks(tags='holdpoint')}
flow_test.run(task_states=resume_states)
if __name__ == '__main__':
env = LocalEnvironment()
storage = LocalStorage('storage')
with Flow('Using Context', environment=env) as flow:
cfg = Parameter('cfg')
with tags('holdpoint'):
cfg = hold(cfg)
process(cfg)
cfg = {'value': 'def'}
key = 'abc'
flow_name = prepare_flow(flow, cfg, key, 'test_flow', storage)
time.sleep(5)
run_flow(flow_name, storage)