Mehdi Nazari
01/24/2022, 5:12 PMwith Flow("Reporting-Data-Import",
storage=Local(path="/app/flow.py", stored_as_script=True),
run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
# load parameters
with open("parameter.json", "r") as f:
default_param = f.read()
#
json_param = Parameter("data_sources", default=default_param if default_param else "").default
flow_parameter = extract_parameters(json_param)
is_valid = is_parameter_valid(flow_parameter)
with case(is_valid, False):
<http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
# copy table & content
copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
Please share your thoughts as I’m not sure where my issue is.Kevin Kho
Flow
block would only contain tasks so the open
file should not be there. You can do:
with Flow(..) as flow:
...
if __name__ == "__main__":
with open(...) as f:
default_param = f.read()
flow.run(parameters=default_param)
flow_parameter.sources
because this doesn’t exist until runtime. This will be called during the build time. I think this may be your bigger issue?Mehdi Nazari
01/24/2022, 6:02 PMKevin Kho
Mehdi Nazari
01/24/2022, 6:07 PMwith Flow("Reporting-Data-Import",
storage=Local(path="/app/flow.py", stored_as_script=True),
run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
json_param = Parameter("data_sources", default="")
flow_parameter = extract_parameters(json_param)
# flow_parameter = FlowParameter(json_param)
# flow_parameter.load_parameter()
# flow_parameter.extract()
#
is_valid = is_parameter_valid(flow_parameter)
with case(is_valid, False):
<http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
# copy table & content
copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
Here is another issue happening: flow_parameter is recognized as a task as opposed to the object I need to work on.
Error:
[2022-01-24 10:04:19-0800] INFO - prefect | Invalid parameter! <Task: extract_parameters>!
Traceback (most recent call last):
File "/Users/mehdinazari/Documents/workbench/work/assemigroup/integrations/prefect_pipelines/mfiles_reporting_data_import/flow.py", line 137, in <module>
copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
AttributeError: 'FunctionTask' object has no attribute 'sources'
Kevin Kho
with Flow()
constructs the DAG so while it is constructed, flow_parameter
is still of type Task
because the code hasn’t run until flow.run()
is called. That is why you can’t do flow_parameter.sources
. flow_parameter.sources
is executed during Flow construction. You need it to execute during Flow execution. Tasks defer the execution to runtime so this needs to be wrapped in a taskMehdi Nazari
01/24/2022, 6:13 PMKevin Kho
copy_table_content.map(param=task(lambda x: x.sources)(flow_parameter), upstream_tasks=[is_valid])
or
@task
def pull_out_sources(x):
return x.sources
...
copy_table_content.map(param=pull_out_sources(flow_parameter), upstream_tasks=[is_valid])
...
Mehdi Nazari
01/24/2022, 6:35 PMKevin Kho
Mehdi Nazari
01/24/2022, 6:39 PM[2022-01-24 10:38:20-0800] INFO - prefect.FlowRunner | Beginning Flow run for 'Reporting-Data-Import'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'data_sources': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'data_sources': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'extract_parameters': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'extract_parameters': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'copy_table_content': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'copy_table_content': Finished task run for task with final state: 'Failed'
[2022-01-24 10:38:20-0800] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
And I’m not sure where it is failing!export PREFECT__LOGGING__LEVEL="INFO"
export PREFECT__LOGGING__FORMAT="[%(asctime)s] %(levelname)s - %(name)s | %(message)s"
export PREFECT__FLOWS__RUN_ON_SCHEDULE=false
export PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.executors.LocalExecutor"
export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
Kevin Kho
copy_table_content
import prefect
@task
def pull_out_sources(x):
<http://prefect.context.logger.info|prefect.context.logger.info>(x)
...
<http://prefect.context.logger.info|prefect.context.logger.info>(f"succeeded for the value of {x}")
return x.sources
something like this.case
is also not doing much:
with case(is_valid, False):
<http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
Because the logger.info will also be ran during Flow build-time. Not execution timeMehdi Nazari
01/24/2022, 7:02 PMKevin Kho
def custom_task(func=None, **task_init_kwargs):
if func is None:
return partial(custom_task, **task_init_kwargs)
@wraps(func)
def safe_func(**kwargs):
try:
return func(**kwargs)
except Exception as e:
print(f"Full Traceback: {traceback.format_exc()}")
raise RuntimeError(type(e)) from None # from None is necessary to not log the stacktrace
safe_func.__name__ = func.__name__
return task(safe_func, **task_init_kwargs)
@custom_task
def abc(x):
return x