https://prefect.io logo
m

Mehdi Nazari

01/24/2022, 5:12 PM
Hi Community, I have a Flow configured to take in a parameter and after a few processing on the parameter, It sends it to a mapped task for further processing. Problem is, to be able to debug it locally on my computer, I’m facing a few issues. I have been following the debugging guides and not sure what is it that I’m doing wrong.
Copy code
with Flow("Reporting-Data-Import",
          storage=Local(path="/app/flow.py", stored_as_script=True),
          run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
    # load parameters
    with open("parameter.json", "r") as f:
        default_param = f.read()
    # 
    json_param = Parameter("data_sources", default=default_param if default_param else "").default
    flow_parameter = extract_parameters(json_param)
    is_valid = is_parameter_valid(flow_parameter)
    with case(is_valid, False):
        <http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
    # copy table & content
    copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
Please share your thoughts as I’m not sure where my issue is.
k

Kevin Kho

01/24/2022, 5:58 PM
I think the
Flow
block would only contain tasks so the
open
file should not be there. You can do:
Copy code
with Flow(..) as flow:
    ...

if __name__ == "__main__":
    with open(...) as f:
        default_param = f.read()
    flow.run(parameters=default_param)
You also can’t do
flow_parameter.sources
because this doesn’t exist until runtime. This will be called during the build time. I think this may be your bigger issue?
m

Mehdi Nazari

01/24/2022, 6:02 PM
Thanks, Are you suggesting to handle that? I will need to parse the JSON provided parameter from the UI, then upon validation, I will need to map it out on a subsequent task.
k

Kevin Kho

01/24/2022, 6:03 PM
Yes you need to pull that out in a task
m

Mehdi Nazari

01/24/2022, 6:07 PM
Ok, that makes sense!
Copy code
with Flow("Reporting-Data-Import",
          storage=Local(path="/app/flow.py", stored_as_script=True),
          run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
    json_param = Parameter("data_sources", default="")
    flow_parameter = extract_parameters(json_param)
    # flow_parameter = FlowParameter(json_param)
    # flow_parameter.load_parameter()
    # flow_parameter.extract()
    # 
    is_valid = is_parameter_valid(flow_parameter)
    with case(is_valid, False):
        <http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
    # copy table & content
    copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
Here is another issue happening: flow_parameter is recognized as a task as opposed to the object I need to work on. Error:
Copy code
[2022-01-24 10:04:19-0800] INFO - prefect | Invalid parameter! <Task: extract_parameters>!
Traceback (most recent call last):
  File "/Users/mehdinazari/Documents/workbench/work/assemigroup/integrations/prefect_pipelines/mfiles_reporting_data_import/flow.py", line 137, in <module>
    copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
AttributeError: 'FunctionTask' object has no attribute 'sources'
k

Kevin Kho

01/24/2022, 6:10 PM
Yes that is the same reason.
with Flow()
constructs the DAG so while it is constructed,
flow_parameter
is still of type
Task
because the code hasn’t run until
flow.run()
is called. That is why you can’t do
flow_parameter.sources
.
flow_parameter.sources
is executed during Flow construction. You need it to execute during Flow execution. Tasks defer the execution to runtime so this needs to be wrapped in a task
m

Mehdi Nazari

01/24/2022, 6:13 PM
That is making sense too. Do you have an example/suggestion I can take a look at? I’m not sure how to overcome that!
k

Kevin Kho

01/24/2022, 6:17 PM
Copy code
copy_table_content.map(param=task(lambda x: x.sources)(flow_parameter), upstream_tasks=[is_valid])
or
Copy code
@task
def pull_out_sources(x):
    return x.sources

...
copy_table_content.map(param=pull_out_sources(flow_parameter), upstream_tasks=[is_valid])
...
m

Mehdi Nazari

01/24/2022, 6:35 PM
Still no luck! local debugging is a challenging with Prefect! I will need more help.
k

Kevin Kho

01/24/2022, 6:37 PM
What is your error?
m

Mehdi Nazari

01/24/2022, 6:39 PM
Copy code
[2022-01-24 10:38:20-0800] INFO - prefect.FlowRunner | Beginning Flow run for 'Reporting-Data-Import'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'data_sources': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'data_sources': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'extract_parameters': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'extract_parameters': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'copy_table_content': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'copy_table_content': Finished task run for task with final state: 'Failed'
[2022-01-24 10:38:20-0800] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
And I’m not sure where it is failing!
Additionally, these are my environment variables set:
Copy code
export PREFECT__LOGGING__LEVEL="INFO"
export PREFECT__LOGGING__FORMAT="[%(asctime)s] %(levelname)s - %(name)s | %(message)s"
export PREFECT__FLOWS__RUN_ON_SCHEDULE=false
export PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.executors.LocalExecutor"
export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
k

Kevin Kho

01/24/2022, 6:42 PM
This look to be failing inside the task.I would add logging to
copy_table_content
Copy code
import prefect

@task
def pull_out_sources(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    ...
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"succeeded for the value of {x}")
    return x.sources
something like this.
Your
case
is also not doing much:
Copy code
with case(is_valid, False):
        <http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
Because the logger.info will also be ran during Flow build-time. Not execution time
m

Mehdi Nazari

01/24/2022, 7:02 PM
Is there a way I can connect my IDE to the local executor and step through the code? That’d give me a lot of visibility. I feel like I’m modifying code to be able to debug, which will make it different from what is going to production. Or any advanced training materials offered by Prefect at all?
k

Kevin Kho

01/24/2022, 7:40 PM
You can try doing something like this to propagate the error:
Copy code
def custom_task(func=None, **task_init_kwargs):
    if func is None:
        return partial(custom_task, **task_init_kwargs)

    @wraps(func)
    def safe_func(**kwargs):
        try:
            return func(**kwargs)
        except Exception as e:
            print(f"Full Traceback: {traceback.format_exc()}")
            raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace

    safe_func.__name__ = func.__name__
    return task(safe_func, **task_init_kwargs)

@custom_task
def abc(x):
    return x
3 Views