Peter Roelants

    Peter Roelants

    1 year ago
    Hi Prefect, If I understand correctly there is currently no direct way of running a Flow with StartFlowRun and return a result to be used by a next task. E.g.
    flow_a = FlowRunTask(flow_name='flow_a', wait=True)
    
    @task
    def task_b(param_b):
        ...
    
    with Flow('flow_c') as flow:
        param_a = Parameter('param_a')
        result_a = flow_a(parameters={'param_a': param_a})
        result_b = task_b(result_a)
    However I noticed people creating some workarounds. However, I want to avoid setting a persisted result, and want to share a serializable result directly to the next task. I was wondering: Is there a canonical way to add a result to a flow, get that result from a StartFlowRun, and use that result in a following task?
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @Peter Roelants, this experience is something we’re working on improving, but for now there is no canonical way to do this. Persisting the file is the most common way users approach this.
    Peter Roelants

    Peter Roelants

    1 year ago
    Is there another way besides persisting a file? I'm running flows in a distributed setting with different environments. I want to avoid adding an additional service like Redis or S3 to persist results.
    Would it be possible to somehow attach the result to the flow's success State?
    Kevin Kho

    Kevin Kho

    1 year ago
    The best suggestion of the team is to use PrefectResult to store and and then you would need to query for it to retrieve it. Note that this is pushing your result to Prefect so we would see the data.
    Peter Roelants

    Peter Roelants

    1 year ago
    @Kevin Kho Would you happen to have an example somewhere on how to use
    PrefectResult
    to share results from a
    StartFlowRun
    ?
    Kevin Kho

    Kevin Kho

    1 year ago
    Do you mean
    StartFlowRun
    ?
    Peter Roelants

    Peter Roelants

    1 year ago
    Yes, I meant
    StartFlowRun
    , sorry for the confusion (edited the mistake above)
    Kevin Kho

    Kevin Kho

    1 year ago
    I'll have to make an example. Will get back to you in a bit
    Peter Roelants

    Peter Roelants

    1 year ago
    Thank you, that would be really helpful of you.
    Kevin Kho

    Kevin Kho

    1 year ago
    I may have to get back to you a bit later today
    Peter Roelants

    Peter Roelants

    1 year ago
    That's ok, thanks for letting me know
    Kevin Kho

    Kevin Kho

    1 year ago
    Here is an example
    from prefect import client, task, Flow
    from prefect.engine.results.prefect_result import PrefectResult
    from prefect.tasks.prefect.flow_run import StartFlowRun
    from prefect.client.client import Client
    import prefect
    
    client = Client()
    
    @task
    def abc():
        return 1
    
    @task
    def bcd(x):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(f"Got the value {x}")
        return x
    
    @task
    def get_result(flow_id):
        logger = prefect.context.get("logger")
        flow_id = str(flow_id).split(" ")[0]
        <http://logger.info|logger.info>(f"Got the value {str(flow_id)}")
        query = """
                query { 
            flow_run(where: { id: {_eq: \"""" + flow_id + """\"} })  {
                id
                task_runs {
                id
                name
                serialized_state
                }
            }
            }
            """
        results = client.graphql(query)
        # Get the result here
        out = results['data']["flow_run"][0]["task_runs"][0]["serialized_state"]["_result"]["location"]
        return out
    
    with Flow("flow_a") as flow1:
        abc()
    
    flow1.result = PrefectResult()    
    flow1.register("omlds")
    
    test = StartFlowRun("flow_a", "omlds", wait=True)
    
    with Flow("flow_b") as flow2:
        result1 = test()
        result1_out = get_result(result1)
        bcd(result1_out)
    
    flow2.run()
    Peter Roelants

    Peter Roelants

    1 year ago
    Great, thanks, I'll try it out
    Thanks again for the example, I think I was able to get it to work. I noticed a couple of things: • I could get the flow run run info via the client api:
    client = prefect.Client()
    flow_run_info = client.get_flow_run_info(flow_run_id)
    • It is also possible to get the result via:
    target_task_run: TaskRunInfoResult
    result = target_task_run.state.load_result(target_task_run.state._result).result
    • The order of tasks in
    flow_run_info.task_runs
    doesn't seem to be guaranteed. I'm using task slugs to identify the task where I want to get the result from together with this helper function to get the right TaskRunInfoResult:
    def get_target_task_run_from_slug(target_slug: str, task_runs: List[TaskRunInfoResult]) -> TaskRunInfoResult:
        """Get a given task based on the task's slug from a list of task runs."""
        target_task_runs = list(filter(lambda t: t.task_slug and target_slug in t.task_slug, task_runs))
        return target_task_runs[0]
    Additional, I also found this GitHub comment to be relevant.
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey thanks for circling back and the additional info and suggestions! Glad you got it working.
    Someone tagged this and I just wanted to mention the new KV store might be helpful for your use case.