I'm trying to figure out how to set the output of ...
# ask-community
d
I'm trying to figure out how to set the output of a particular task in one flow as a parameter in another flow and I am just not getting it. I cannot find documentation or any code examples. This is my setup: 1. I have three dependent flows: flow1 -> flow2 -> flow3 2. flow1 has around 10 tasks. One of those tasks, say the 4th one, named GenerateRandomData, randomly generates a string based upon the data sent to it. So, something like this:
Copy code
random_string = GenerateRandomData(data=data)
3. random_string from flow1 will then be a parameter sent to flow2, so the parameter is dynamically set at runtime since the value of random_string is different with each execution of the three flows. I learn best from reading code, so a basic example would be most helpful.
k
Hey @David Jenkins, in 0.15.0, there were 3 tasks added,
create_flow_run
,
wait_for_flow_run
and
get_task_run_result
. You can see them here . What you want is to call
create_flow_run
and
wait_for_flow_run
. After those two, use the
get_task_run_result
and feed in the task slug to get the output of that task. After that, it should be straightforward to pass the output of
get_task_run_result
as a parameter to flow2
Copy code
with Flow(..) as flow:
     flow_id = create_flow_run(...)
     x = wait_flow_for_run(flow_id)
     random_string = get_task_run_result(generate-random-data, flow_id)

     flow2 = create_flow_run(..., parameters={"random_string": random_string})
There is also another way to do this on Prefect Cloud using the KV Store where you can persist small values to be used in other flows or future flow runs.
🧐 1
d
thank you, kindly
👍 1
f
@Kevin Kho I tried to follow the example you posted above, but I am still getting the following error for waiting and returning flow run results:
Copy code
prefect.exceptions.ClientError: 400 Client Error: Bad Request for url: <https://api.prefect.io/>

The following error messages were provided by the GraphQL server:

    GRAPHQL_PARSE_FAILED: Syntax Error: Expected Name, found (

The GraphQL query was:

    query {
            flow_run(where: { id: { _eq: FlowRunView(flow_run_id='a5be9f4c-0ebd-4685-bd80-9d04daca8a6d', name='get-notion-posts', state=<Success: "All reference tasks succeeded.">, labels=<BoxList: ['prod']>, cached_task_runs=0) } }) {
                id
                name
                flow_id
                serialized_state
                states {
                    timestamp
                    serialized_state
            }
                labels
                parameters
                context
                updated
                run_config
        }
    }

The passed variables were:

    null
The
get-notion-posts
flow looks like this and returns a List of a Pydantic model with some information mainly a
post_id
and a
scheduled_time
. I can see the flow execute online as a Fargate run with no problems and everything works out as expected. However, once the flow is supposed to retrieve the results I get the error mentioned above.
get-notion-posts
flow:
Copy code
@task(slug="get-notion-posts", result=LocalResult())
def get_notion_posts(notion_token: str, database_url: str) -> List[NotionPost]:  # pylint: disable=C0116
    # sourcery skip: inline-immediately-returned-variable

    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Fetching Posts from Notion")

    notion = Notion(notion_token)
    notion_posts = notion.query_db(database_url)

    <http://logger.info|logger.info>(f"Retrieved {len(notion_posts)} posts from Notion")

    return notion_posts


with Flow(name="get-notion-posts") as get_notion_posts_flow:

    notion_token = PrefectSecret("NOTION_TOKEN")

    DATABASE_URL = '<https://www.notion.so/c2acffa4ffdc4ebfada40a6fbaf3f4c1?v=db38b94499a64104bc43e9acf6ea05df>'

    database_url = Parameter("database_url", default=DATABASE_URL)

    notion_posts = get_notion_posts(notion_token=notion_token, database_url=database_url)
In main flow I call this flow first to get any potential posts and then schedule them as future flows with
create_flow_run
:
Copy code
with Flow("schedule-posts") as schedule_posts_flow:

    child_run_id = create_flow_run(
        flow_name="get-notion-posts",
        run_name="get-notion-posts",
        # flow_id="get-notion-posts",
        parameters={
            'database_url': '<https://www.notion.so/c2acffa4ffdc4ebfada40a6fbaf3f4c1?v=db38b94499a64104bc43e9acf6ea05df>',
        },
    )

    wait_flow_run_id = wait_for_flow_run(child_run_id)

    notion_posts = get_task_run_result(flow_run_id=wait_flow_run_id, task_slug='get-notion-posts')

    print(notion_posts)

    # child_run_id = create_flow_run(
    #     flow_name="create-posts",
    #     parameters={
    #         'post_id': notion_posts.id,
    #         'user_id': '0750e9c6-84f0-4c0e-8039-45e3013f27e3',
    #     },
    #     scheduled_start_time=notion_posts.result[0].properties.scheduled_time.date.start,
    # )


state = schedule_posts_flow.run()
I am also not able to pass in the
notion_posts
task run result into the commented out section. Not sure if this is because I run it locally and it needs to be run in cloud but, I might mess up or do not use
LocalResult()
correctly. If you have any help or hints I would highly appreciate it 🙏
k
Can you try changing
notion_posts = get_task_run_result(flow_run_id=wait_flow_run_id, task_slug='get-notion-posts')
to
notion_posts = get_task_run_result(flow_run_id=child_run_id, task_slug='get-notion-posts')
and see if it helps?
f
Hi @Kevin Kho thanks for the suggestion! So I did change it and get following in the logs:
Copy code
[2021-10-05 08:35:19+0200] INFO - prefect.create_flow_run | Created flow run 'get-notion-posts': <https://cloud.prefect.io/felix-vemmer-account/flow-run/8c0f9024-dc43-4ad1-a6c8-5537e074bb1a>
Then I get a couple of these:
Copy code
[2021-10-05 08:36:02+0200] DEBUG - prefect.get_task_run_result | Waiting for flow run 8c0f9024-dc43-4ad1-a6c8-5537e074bb1a to finish before retreiving result for task run 'get-notion-posts'...
However then an exception is raised:
Copy code
Traceback (most recent call last):
  File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 207, in get_task_run_result
    task_run = flow_run.get_task_run(task_slug=task_slug, map_index=map_index)
  File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/flow_run.py", line 697, in get_task_run
    result = TaskRunView.from_task_slug(
  File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/task_run.py", line 283, in from_task_slug
    cls._query_for_task_run(
  File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/task_run.py", line 305, in _query_for_task_run
    task_runs = TaskRunView._query_for_task_runs(where=where, **kwargs)
  File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/task_run.py", line 369, in _query_for_task_runs
    raise ValueError(
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'get-notion-posts'}}, 'flow_run_id': {'_eq': '8c0f9024-dc43-4ad1-a6c8-5537e074bb1a'}, 'map_index': {'_eq': -1}}
[2021-10-05 08:36:07+0200] DEBUG - prefect.TaskRunner | Task 'get_task_run_result': Handling state change from Running to Failed
So it seem that the correct
flow_run_id
is being looked for but something still goes wrong 🧐
k
When you go to the flow run page of that subflow, what is the task slug?
f
Hey Kevin, so this is what the
get-notion-posts
flow looks like:
The
schedule_posts_flow
I just ran locally for now from my local machine, as I could not get it to run with retrieving results from
get-notion-posts
flow, to use these results and pass them to
create-posts
flow.
k
Can you click into that to see the slug?
f
so I clicked on
get-notion-posts
and got this:
Could it be that
LocalResult()
and the result location in
/Users/felixvemmer/.prefect/results/prefect-result-2021-10-05t06-36-06-099038-00-00
cause a problem as it was executed in cloud environment? Would I even need to specify a result handler for
get_task_run_result
? 🤔
Actually just read this for
get_task_run_result
:
Copy code
Results are loaded from the Result location of the task which may not be accessible from where this task is executed. You will need to ensure results can be accessed.
I will try another result handler like S3 and let you know ifff anything changed! Anyways thanks a ton for your help already!
@Kevin Kho I figured the issue for some reason a copy is added to the task slug:
Now the last piece of the puzzle I am trying to solve is how I can pass the
get_task_run_result
, as
parameters
into another
create_flow_run
. So just like in your example the
random_string
would be the result object. However do you know if it is possible to pass a
dictionary
or
Pydantic
model as well? Or can it only be a string?
k
It just has to be JSONSerializeable