Marius Haberstock
03/08/2022, 3:18 PMapply_map
. Details following in the thread 🙂iterate_files
task to decide for each file, if I call task process_file_a
or task process_file_b
.
I tried using apply_map
based on the example of the documentation (https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines)
However, the merge
function does not seem to work inside a task.
This is my current code:
with Flow() as flow:
zip_filename = Parameter("zip_filename")
files = download_zipfile(zip_filename)
infos = apply_map(iterate_files, file=files, zip_filename=unmapped(zip_filename))
upload_file.map(info=infos)
@task(log_stdout=True)
def iterate_files(file, zip_filename):
is_recfile = file[0].endswith("...")
with case(is_recfile, True):
result_1 = parse_recfile.run(file, zip_filename)
is_xmlfile = file[0] == '...'
with case(is_xmlfile, True):
result_2 = process_xml_file.run(file, zip_filename)
return merge(result_1, result_2).run()
Does anyone have a idea how to solve this? 🙂Kevin Kho
03/08/2022, 3:22 PMrun()
?Marius Haberstock
03/08/2022, 3:25 PMrun()
`Task 'iterate_files[0]': Exception encountered during task execution! Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state value = prefect.utilities.executors.run_task_with_timeout( File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/marius/Documents/Projekte/Pexon/FMC/project-panorama/prefect-experimentation/5008-flow/flow.py", line 44, in iterate_files File "/usr/local/lib/python3.9/site-packages/prefect/core/task.py", line 633, in call new.bind( File "/usr/local/lib/python3.9/site-packages/prefect/core/task.py", line 693, in bind raise ValueError( ValueError: Could not infer an active Flow context while creating edge to <Task: parse_recfile>. This often means you called a task outside a with Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call `parse_recfile.run(...)``Kevin Kho
03/08/2022, 3:25 PMiterate_files
Marius Haberstock
03/08/2022, 3:27 PMTraceback (most recent call last):
File "/Users/marius/Documents/Projekte/Pexon/FMC/project-panorama/prefect-experimentation/5008-flow/flow.py", line 113, in <module>
infos = apply_map(iterate_files, file=files, zip_filename=unmapped(zip_filename))
File "/Users/marius/miniforge3/envs/prefect-fmc/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 131, in apply_map
res = func(*args2, **kwargs2)
File "/Users/marius/Documents/Projekte/Pexon/FMC/project-panorama/prefect-experimentation/5008-flow/flow.py", line 41, in iterate_files
is_recfile = file[0].endswith("...")
AttributeError: 'GetItem' object has no attribute 'endswith'
Kevin Kho
03/08/2022, 3:28 PMis_recfile = file[0].endswith("...")
is not a task and it executes eagerly. A task will help defer the execution
same with this one:
is_xmlfile = file[0] == '...'
Marius Haberstock
03/08/2022, 3:30 PMKevin Kho
03/08/2022, 3:32 PMMarius Haberstock
03/08/2022, 3:40 PMKevin Kho
03/08/2022, 3:41 PMMarius Haberstock
03/08/2022, 3:46 PMKevin Kho
03/08/2022, 3:48 PMAnna Geller
03/09/2022, 5:46 PM