Hi, I try to create dependencies between 2 flows b...
# ask-community
s
Hi, I try to create dependencies between 2 flows but have some problems passing results from the first one to the following. Below is updated minimal code to reproduce. I use LocalAgent to run flows via Server. They all are finished with success status but results from flow_a are not passed to flow_b. Here are entries from logs:
Copy code
Flow_a: Created 3 chunks
Flow_b: Got result from previous flow None
Copy code
import prefect as pf
from prefect import Client, task, Parameter, Flow
from prefect.engine.results import LocalResult
from prefect.environments.storage import Local
from prefect.utilities.configuration import set_temporary_config
from prefect.tasks.prefect import FlowRunTask
from prefect.environments import LocalEnvironment
import socket

def get_server_config(server, port):
    return {
                "cloud.api": "http://{}:{}".format(server, port),
                "cloud.graphql": "http://{}:{}/graphql".format(server, port),
                "backend": "server",
            }

def get_logger():
    return pf.context.get('logger')

@task()
def create_chunks(inputs):
    logger = get_logger()
    chunks = ['a', 'b', 'c']
    <http://logger.info|logger.info>('Created %d chunks', len(chunks))
    return chunks

@task()
def accept_results(result):
    logger = get_logger()
    <http://logger.info|logger.info>('Got result from previous flow %s', result)
    return result

def main():
    hostname = socket.gethostname()
    labels = [hostname]
    env = LocalEnvironment(labels=labels)
    with set_temporary_config(get_server_config('xxx', '4200')):
        with Flow('flow_a', storage=Local(), environment=env,
                  result=LocalResult(dir='c:/temp/flows/flow_a',
                                     location='{flow_run_id}_{task_name}_{map_index}.txt')
                  ) as flow_a:
            inputs = Parameter('inputs', required=True)
            create_chunks(inputs)

        with Flow('flow_b', storage=Local(), environment=env,
                  result=LocalResult(dir='c:/temp/flows/flow_b',
                                     location='{flow_run_id}_{task_name}_{map_index}.txt')
                  ) as flow_b:
            result = Parameter('result', required=True)
            accept_results(result)

        fa = FlowRunTask(flow_name="flow_a", project_name="Test", wait=True)
        fb = FlowRunTask(flow_name="flow_b", project_name="Test", wait=True)

        with Flow('flow_c', storage=Local(), environment=env,
                  result=LocalResult(dir='c:/temp/flows/flow_c',
                                     location='{flow_run_id}_{task_name}_{map_index}.txt')
                  ) as flow_c:
            a = fa(parameters={'inputs': {}})
            b = fb(upstream_tasks=[a], parameters={'result': a.result})

        client = Client()
        id_a = client.register(flow_a, project_name="Test")
        id_b = client.register(flow_b, project_name="Test")
        id_c = client.register(flow_c, project_name="Test")
        client.create_flow_run(flow_id=id_c)

if __name__ == "__main__":
    main()
👀 1
n
Hi @Sergiy Krutsenko! Take a look at this thread; I think that'll help you resolve this issue but let me know if you have additional questions 🙂
s
@nicholas I checked but it is not what I need. My question is more about how flow_b must look to accept outputs from flow_b and then how flow_c should use flow_a and flow_b to work together. When I run flow_c only flow_a runs successfully, but flow_b fails.
n
I apologize @Sergiy Krutsenko - I linked the wrong thread, this one should help more!
s
@nicholas It does not work as expected. I can run flow_c, but flow_b receives None from flow_a as result.
with Flow('flow_c') as flow:
a = flow_a(parameters={...})
b = flow_b(upstream_tasks=[a], parameters={"p":a.result})
n
@Sergiy Krutsenko have you configured some results on
flow_a
? If you wouldn't mind sharing your example code as you're working with it now, that'd be helpful 🙂
s
@nicholas Yes, I save results using LocalResult class for all 3 flows. I run flow_c using Server, local agent and client.create_flow_run. The code of flow_a is not exact copy (it is on my office pc) but similar to this:
Copy code
with Flow('flow_a', storage=Local(), result=LocalResult(dir='c:/temp/flow_a', location='{flow_run_id}_{task_name}_{map_index}.txt')) as flow: input = Parameter('input_params', required=True)
  create_chunks(input)
n
Gotcha - are you able to share it in its entirety or at least a min reproducible version?
s
@nicholas Hi, I updated my original post with reproducible code.
n
Hi @Sergiy Krutsenko - sorry for the slow response here; I was incorrect in how this is handled and it's a more general pattern we're looking into. Right now the best way to handle this would be to pull in results using the location you've set them in each of your flows.
s
@nicholas Hi, your advice works with a fixed path for location. But the problem is how flow_b can get dynamic flow run id for flow_a, executed before it? If I run 2 instances of flow_c and flow_a within the second run finished before flow_a within the first one, the fixed path for location does not work. I can get a concurrency race.
n
@Sergiy Krutsenko have you tried using the
wait
kwarg on the
StartFlowRunTask
?
s
@nicholas Yes, it is in code
n
Ah I see I didn’t understand your question, take a look at templated result locations, I think that’ll solve your race condition. https://docs.prefect.io/core/concepts/results.html#templating-result-locations
s
@nicholas It doesn't. When I use template {flow_run_id} in file path, flow_b has no idea about flow_run_id of flow_a. So it cannot read location using this template. All flows are executed in separate processes and flow_run_id for a flow is known only when the flow_a is starting. How can I pass the flow_run_id from one flow run/process to the next one? It seems Prefect has no solution for that.
n
@Sergiy Krutsenko - the
StartFlowRun
task returns the flow run id, you can use that in a downstream task to fetch results:
Copy code
from prefect import task, Flow
from prefect.tasks.prefect import StartFlowRun

@task
def get_id(input):
    # State message for this task are returned in the format:
    # <<flow run id>> finished in state << state >>
    # so we can grab the id easily from that message
    id = input.state.message.split(" ", 1)[0]
    print(id)
    return id

with Flow("Orchestrator") as flow_c:
  a = StartFlowRun(
        project_name="<<project>>",
        wait=True,
    )(flow_name="flow_a")

  get_id(a)

# output:
# d2605193-3eb3-45ef-9976-dcbe7446186ee
You can reuse the
get_id
task on any
StartFlowRun
tasks to get the flow run ids you need, and you can access the state itself for any sort of conditional logic (maybe if the run failed you don't want to access any results from it)
s
@nicholas Your latest advice works fine for me. Thanks a lot for your help :)
n
Glad you got it working!