David Jenkins
09/08/2021, 7:24 PMrandom_string = GenerateRandomData(data=data)
3. random_string from flow1 will then be a parameter sent to flow2, so the parameter is dynamically set at runtime since the value of random_string is different with each execution of the three flows.
I learn best from reading code, so a basic example would be most helpful.Kevin Kho
create_flow_run
, wait_for_flow_run
and get_task_run_result
. You can see them here . What you want is to call create_flow_run
and wait_for_flow_run
. After those two, use the get_task_run_result
and feed in the task slug to get the output of that task.
After that, it should be straightforward to pass the output of get_task_run_result
as a parameter to flow2
with Flow(..) as flow:
flow_id = create_flow_run(...)
x = wait_flow_for_run(flow_id)
random_string = get_task_run_result(generate-random-data, flow_id)
flow2 = create_flow_run(..., parameters={"random_string": random_string})
There is also another way to do this on Prefect Cloud using the KV Store where you can persist small values to be used in other flows or future flow runs.David Jenkins
09/09/2021, 6:05 PMFelix Vemmer
10/04/2021, 10:25 PMprefect.exceptions.ClientError: 400 Client Error: Bad Request for url: <https://api.prefect.io/>
The following error messages were provided by the GraphQL server:
GRAPHQL_PARSE_FAILED: Syntax Error: Expected Name, found (
The GraphQL query was:
query {
flow_run(where: { id: { _eq: FlowRunView(flow_run_id='a5be9f4c-0ebd-4685-bd80-9d04daca8a6d', name='get-notion-posts', state=<Success: "All reference tasks succeeded.">, labels=<BoxList: ['prod']>, cached_task_runs=0) } }) {
id
name
flow_id
serialized_state
states {
timestamp
serialized_state
}
labels
parameters
context
updated
run_config
}
}
The passed variables were:
null
The get-notion-posts
flow looks like this and returns a List of a Pydantic model with some information mainly a post_id
and a scheduled_time
. I can see the flow execute online as a Fargate run with no problems and everything works out as expected. However, once the flow is supposed to retrieve the results I get the error mentioned above.
get-notion-posts
flow:
@task(slug="get-notion-posts", result=LocalResult())
def get_notion_posts(notion_token: str, database_url: str) -> List[NotionPost]: # pylint: disable=C0116
# sourcery skip: inline-immediately-returned-variable
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Fetching Posts from Notion")
notion = Notion(notion_token)
notion_posts = notion.query_db(database_url)
<http://logger.info|logger.info>(f"Retrieved {len(notion_posts)} posts from Notion")
return notion_posts
with Flow(name="get-notion-posts") as get_notion_posts_flow:
notion_token = PrefectSecret("NOTION_TOKEN")
DATABASE_URL = '<https://www.notion.so/c2acffa4ffdc4ebfada40a6fbaf3f4c1?v=db38b94499a64104bc43e9acf6ea05df>'
database_url = Parameter("database_url", default=DATABASE_URL)
notion_posts = get_notion_posts(notion_token=notion_token, database_url=database_url)
In main flow I call this flow first to get any potential posts and then schedule them as future flows with create_flow_run
:
with Flow("schedule-posts") as schedule_posts_flow:
child_run_id = create_flow_run(
flow_name="get-notion-posts",
run_name="get-notion-posts",
# flow_id="get-notion-posts",
parameters={
'database_url': '<https://www.notion.so/c2acffa4ffdc4ebfada40a6fbaf3f4c1?v=db38b94499a64104bc43e9acf6ea05df>',
},
)
wait_flow_run_id = wait_for_flow_run(child_run_id)
notion_posts = get_task_run_result(flow_run_id=wait_flow_run_id, task_slug='get-notion-posts')
print(notion_posts)
# child_run_id = create_flow_run(
# flow_name="create-posts",
# parameters={
# 'post_id': notion_posts.id,
# 'user_id': '0750e9c6-84f0-4c0e-8039-45e3013f27e3',
# },
# scheduled_start_time=notion_posts.result[0].properties.scheduled_time.date.start,
# )
state = schedule_posts_flow.run()
I am also not able to pass in the notion_posts
task run result into the commented out section. Not sure if this is because I run it locally and it needs to be run in cloud but, I might mess up or do not use LocalResult()
correctly.
If you have any help or hints I would highly appreciate it 🙏Kevin Kho
notion_posts = get_task_run_result(flow_run_id=wait_flow_run_id, task_slug='get-notion-posts')
to notion_posts = get_task_run_result(flow_run_id=child_run_id, task_slug='get-notion-posts')
and see if it helps?Felix Vemmer
10/05/2021, 6:39 AM[2021-10-05 08:35:19+0200] INFO - prefect.create_flow_run | Created flow run 'get-notion-posts': <https://cloud.prefect.io/felix-vemmer-account/flow-run/8c0f9024-dc43-4ad1-a6c8-5537e074bb1a>
Then I get a couple of these:
[2021-10-05 08:36:02+0200] DEBUG - prefect.get_task_run_result | Waiting for flow run 8c0f9024-dc43-4ad1-a6c8-5537e074bb1a to finish before retreiving result for task run 'get-notion-posts'...
However then an exception is raised:
Traceback (most recent call last):
File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 859, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/utilities/executors.py", line 445, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 207, in get_task_run_result
task_run = flow_run.get_task_run(task_slug=task_slug, map_index=map_index)
File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/flow_run.py", line 697, in get_task_run
result = TaskRunView.from_task_slug(
File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/task_run.py", line 283, in from_task_slug
cls._query_for_task_run(
File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/task_run.py", line 305, in _query_for_task_run
task_runs = TaskRunView._query_for_task_runs(where=where, **kwargs)
File "/Users/felixvemmer/.pyenv/versions/3.9.2/envs/social-notion/lib/python3.9/site-packages/prefect/backend/task_run.py", line 369, in _query_for_task_runs
raise ValueError(
ValueError: No task runs found while querying for task runs where {'task': {'slug': {'_eq': 'get-notion-posts'}}, 'flow_run_id': {'_eq': '8c0f9024-dc43-4ad1-a6c8-5537e074bb1a'}, 'map_index': {'_eq': -1}}
[2021-10-05 08:36:07+0200] DEBUG - prefect.TaskRunner | Task 'get_task_run_result': Handling state change from Running to Failed
So it seem that the correct flow_run_id
is being looked for but something still goes wrong 🧐Kevin Kho
Felix Vemmer
10/05/2021, 4:16 PMget-notion-posts
flow looks like:Felix Vemmer
10/05/2021, 4:17 PMschedule_posts_flow
I just ran locally for now from my local machine, as I could not get it to run with retrieving results from get-notion-posts
flow, to use these results and pass them to create-posts
flow.Kevin Kho
Felix Vemmer
10/05/2021, 5:38 PMget-notion-posts
and got this:Felix Vemmer
10/05/2021, 5:43 PMLocalResult()
and the result location in /Users/felixvemmer/.prefect/results/prefect-result-2021-10-05t06-36-06-099038-00-00
cause a problem as it was executed in cloud environment? Would I even need to specify a result handler for get_task_run_result
? 🤔Felix Vemmer
10/05/2021, 6:06 PMget_task_run_result
:
Results are loaded from the Result location of the task which may not be accessible from where this task is executed. You will need to ensure results can be accessed.
I will try another result handler like S3 and let you know ifff anything changed! Anyways thanks a ton for your help already!Felix Vemmer
10/05/2021, 7:38 PMFelix Vemmer
10/05/2021, 7:45 PMget_task_run_result
, as parameters
into another create_flow_run
. So just like in your example the random_string
would be the result object.
However do you know if it is possible to pass a dictionary
or Pydantic
model as well? Or can it only be a string?Kevin Kho