Hi Prefect, If I understand correctly there is cur...
# ask-community
p
Hi Prefect, If I understand correctly there is currently no direct way of running a Flow with StartFlowRun and return a result to be used by a next task. E.g.
Copy code
flow_a = FlowRunTask(flow_name='flow_a', wait=True)

@task
def task_b(param_b):
    ...

with Flow('flow_c') as flow:
    param_a = Parameter('param_a')
    result_a = flow_a(parameters={'param_a': param_a})
    result_b = task_b(result_a)
However I noticed people creating some workarounds. However, I want to avoid setting a persisted result, and want to share a serializable result directly to the next task. I was wondering: Is there a canonical way to add a result to a flow, get that result from a StartFlowRun, and use that result in a following task?
k
Hey @Peter Roelants, this experience is something we’re working on improving, but for now there is no canonical way to do this. Persisting the file is the most common way users approach this.
p
Is there another way besides persisting a file? I'm running flows in a distributed setting with different environments. I want to avoid adding an additional service like Redis or S3 to persist results.
Would it be possible to somehow attach the result to the flow's success State?
k
The best suggestion of the team is to use PrefectResult to store and and then you would need to query for it to retrieve it. Note that this is pushing your result to Prefect so we would see the data.
p
@Kevin Kho Would you happen to have an example somewhere on how to use
PrefectResult
to share results from a
StartFlowRun
?
k
Do you mean
StartFlowRun
?
p
Yes, I meant
StartFlowRun
, sorry for the confusion (edited the mistake above)
k
I'll have to make an example. Will get back to you in a bit
🙏 1
🙏 1
p
Thank you, that would be really helpful of you.
k
I may have to get back to you a bit later today
p
That's ok, thanks for letting me know
k
Here is an example
Copy code
from prefect import client, task, Flow
from prefect.engine.results.prefect_result import PrefectResult
from prefect.tasks.prefect.flow_run import StartFlowRun
from prefect.client.client import Client
import prefect

client = Client()

@task
def abc():
    return 1

@task
def bcd(x):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"Got the value {x}")
    return x

@task
def get_result(flow_id):
    logger = prefect.context.get("logger")
    flow_id = str(flow_id).split(" ")[0]
    <http://logger.info|logger.info>(f"Got the value {str(flow_id)}")
    query = """
            query { 
        flow_run(where: { id: {_eq: \"""" + flow_id + """\"} })  {
            id
            task_runs {
            id
            name
            serialized_state
            }
        }
        }
        """
    results = client.graphql(query)
    # Get the result here
    out = results['data']["flow_run"][0]["task_runs"][0]["serialized_state"]["_result"]["location"]
    return out

with Flow("flow_a") as flow1:
    abc()

flow1.result = PrefectResult()    
flow1.register("omlds")

test = StartFlowRun("flow_a", "omlds", wait=True)

with Flow("flow_b") as flow2:
    result1 = test()
    result1_out = get_result(result1)
    bcd(result1_out)

flow2.run()
👍 1
p
Great, thanks, I'll try it out
Thanks again for the example, I think I was able to get it to work. I noticed a couple of things: • I could get the flow run run info via the client api:
Copy code
client = prefect.Client()
flow_run_info = client.get_flow_run_info(flow_run_id)
• It is also possible to get the result via:
Copy code
target_task_run: TaskRunInfoResult
result = target_task_run.state.load_result(target_task_run.state._result).result
• The order of tasks in
flow_run_info.task_runs
doesn't seem to be guaranteed. I'm using task slugs to identify the task where I want to get the result from together with this helper function to get the right `TaskRunInfoResult`:
Copy code
def get_target_task_run_from_slug(target_slug: str, task_runs: List[TaskRunInfoResult]) -> TaskRunInfoResult:
    """Get a given task based on the task's slug from a list of task runs."""
    target_task_runs = list(filter(lambda t: t.task_slug and target_slug in t.task_slug, task_runs))
    return target_task_runs[0]
Additional, I also found this GitHub comment to be relevant.
k
Hey thanks for circling back and the additional info and suggestions! Glad you got it working.
Someone tagged this and I just wanted to mention the new KV store might be helpful for your use case.