Hello all. I tried using `create_flow_run.map()` w...
# prefect-community
i
Hello all. I tried using
create_flow_run.map()
with
wait_for_flow_run()
and
get_task_run_result()
but I'm getting some errors. I'm not sure if I'm doing this correctly
Here's my code
Copy code
@task(log_stdout=True)
def hello_world(user_input: str, age_input: int):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"hello {user_input}!. You're {age_input} today")

# This is the flow we can import
with Flow("dummy-flow-to-be-run-multiple-times") as dummy_flow:
    user_param = Parameter("user_input", default="world")
    age_param = Parameter("age_input", default=18)
    hw = hello_world(user_param, age_param)

dummy_flow.storage = storage
dummy_flow.register(project_name=PROJECT, build=False)

# This is a flow we use to map the parameters to different runs of the imported flow
# The parameters is a list of dictionaries where each dictionary is the set of input paramaters needed to run imported flow
with Flow("mapped_flows", 
          executor=LocalDaskExecutor()) as flow:
    parameters = [dict(user_input="Prefect", age_input=21),
                  dict(user_input="Marvin", age_input=27),
                  dict(user_input="World", age_input=12),
                  ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-flow-to-be-run-multiple-times"),
    )
    wait_for_mapped_flows = wait_for_flow_run(
        mapped_flows, raise_final_state=True, stream_logs=True
    )
    flow_result = get_task_run_result(
        mapped_flows, "hello_world-1", upstream_tasks=[wait_for_mapped_flows]
    )
Here's the error
Copy code
Task 'wait_for_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    logger=self.logger,
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/ifeanyi/.pyenv/versions/prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 258, in wait_for_flow_run
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 576, in from_flow_run_id
    flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}})
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/flow_run.py", line 618, in _query_for_flow_run
    result = client.graphql(flow_run_query)
  File "/usr/local/lib/python3.7/site-packages/prefect/client/client.py", line 570, in graphql
    raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['flow_run'], 'message': 'parsing UUID failed, expected String, but encountered Array', 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
a
Can you try reregistering your flow? This often helps with such "parsing UUID" errors
your flow seems fine
i
@Anna Geller the flow is already registered. I'm guessing the problem may be with
create_flow_run.map()
. I'm not sure it generates the same output as
create_flow_run
Any ideas?
a
yes, I do have some, writing up my answer šŸ™‚
i
Okay. I'll be waiting
a
Upon closer and more thorough investigation, there are actually multiple issues with this flow: ā€¢ you are trying to get task run results even though your task doesn't return anything, so there are no results šŸ™‚ - the modified flow in the gist now returns the same string you were only logging so far ā€¢ in order to use results, and then to
get_task_run_results
, you need to configure the result on your relevant task decorator, here the
hello_world
task ā€¢ you need to explicitly provide the
Result location
because this is what the
get_task_run_result
is using to retrieve the task run results ā€¢ another issue here is with the flow idempotency key - we need to add that if you map over the same flow because the flow run ID of the child flow run is by default also the task run ID for the
create_flow_run
task causing the issue with parsing UUID you saw ā€¢ there are no results configured on the task which is why you cannot store and get task run results ā€¢ then, there is the issue that create_flow_run is a mapped task while your
wait_for_flow_run
and
get_task_run_result
are not - again, the flow in the Gist below fixes that I'm sorry that I originally just superficially looked at the flow and considered it looked fine - by looking more closely there were indeed quite many issues lurking around šŸ˜„ Here is the flow that should work: https://gist.github.com/5ab8c8e0d9cd65bc197bdb37a510f11e LMK if you have any questions about it
i
Oh wow! Thank you very much for your detailed analysis
šŸ‘ 1
So I tried to implement this but I get this error in prefect cloud
Copy code
Task 'get_task_run_result[1]': Exception encountered during task execution!
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    logger=self.logger,
  File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 467, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/Users/ifeanyi/.pyenv/versions/prefect/lib/python3.7/site-packages/prefect/tasks/prefect/flow_run.py", line 233, in get_task_run_result
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 81, in get_result
    self._result = self._load_result()
  File "/usr/local/lib/python3.7/site-packages/prefect/backend/task_run.py", line 90, in _load_result
    self.state.load_result()
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/state.py", line 153, in load_result
    self._result = result_reader.read(known_location)  # type: ignore
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/local_result.py", line 83, in read
    with open(os.path.join(self.dir, location), "rb") as f:
FileNotFoundError: [Errno 2] No such file or directory: '/Users/ifeanyi/.prefect/results/f2e547b0-30e2-4d1d-9281-924e0aeb07af'
Looks like
LocalResult()
doesn't work on the cloud
a
I updated the Gist - can you try that version? https://gist.github.com/0c922ac90e43a0581b9ec43079cdb5c2 but both should work if you start a local agent. What agent type do you use? the local result only works with local agent
i
Working with prefect cloud. Where do I confirm what agent it is?
a
in your cloud UI for example? šŸ™‚ https://cloud.prefect.io/yourteamname/agent or maybe ask someone who deployed your agent in your team? also you may check https://docs.prefect.io/orchestration/getting-started/quick-start.html
i
I can see that link but it doesn't specify the agent type
or I don't see the agent type
it does say the name is "team-name-vertex"
a
In that case the cloud URL to the agent would be:
Copy code
<https://cloud.prefect.io/team-name-vertex/agent>
but really it doesn't matter, you can go to the Agents tab in your UI