m

    Mathis Börner

    3 years ago
    import time
    
    import prefect
    from prefect import task, Flow, Parameter
    from prefect import tags
    from prefect.engine import signals
    from prefect.environments.storage import LocalStorage
    from prefect.environments import LocalEnvironment
    from prefect.utilities.tasks import pause_task
    from prefect.engine.state import Resume
    
    
    @task
    def hold(cfg):
        pause_task()
        return cfg
    
    @task
    def process(cfg):
        try:
            return prefect.contex.key + cfg['value']
        except:
            raise signals.FAIL()
    
    def prepare_flow(flow, cfg, key, name, storage):
        with prefect.context(key=key):
            flow.run(cfg=cfg)
            previous_name, flow.name = flow.name, name
            storage_name = storage.add_flow(flow)
        flow.name = previous_name
        return storage_name
    
    
    def run_flow(flow_name, storage):
        flow_test = storage.get_flow(flow_name)
        resume_states = {t: Resume()
                         for t in flow_test.get_tasks(tags='holdpoint')}
        flow_test.run(task_states=resume_states)
    
    
    if __name__ == '__main__':
        env = LocalEnvironment()
        storage = LocalStorage('storage')
        with Flow('Using Context', environment=env) as flow:
            cfg = Parameter('cfg')
            with tags('holdpoint'):
                cfg = hold(cfg)
            process(cfg)
        cfg = {'value': 'def'}
        key = 'abc'
        flow_name = prepare_flow(flow, cfg, key, 'test_flow', storage)
        time.sleep(5)
        run_flow(flow_name, storage)
    so the problem with this flow is, that it never reaches the second flow.run, because there is for some reason no break in the first run and it gets into an infinite loop of trying to continue the flow.