Hi Community, I have a Flow configured to take in ...
# ask-community
m
Hi Community, I have a Flow configured to take in a parameter and after a few processing on the parameter, It sends it to a mapped task for further processing. Problem is, to be able to debug it locally on my computer, I’m facing a few issues. I have been following the debugging guides and not sure what is it that I’m doing wrong.
Copy code
with Flow("Reporting-Data-Import",
          storage=Local(path="/app/flow.py", stored_as_script=True),
          run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
    # load parameters
    with open("parameter.json", "r") as f:
        default_param = f.read()
    # 
    json_param = Parameter("data_sources", default=default_param if default_param else "").default
    flow_parameter = extract_parameters(json_param)
    is_valid = is_parameter_valid(flow_parameter)
    with case(is_valid, False):
        <http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
    # copy table & content
    copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
Please share your thoughts as I’m not sure where my issue is.
k
I think the
Flow
block would only contain tasks so the
open
file should not be there. You can do:
Copy code
with Flow(..) as flow:
    ...

if __name__ == "__main__":
    with open(...) as f:
        default_param = f.read()
    flow.run(parameters=default_param)
You also can’t do
flow_parameter.sources
because this doesn’t exist until runtime. This will be called during the build time. I think this may be your bigger issue?
m
Thanks, Are you suggesting to handle that? I will need to parse the JSON provided parameter from the UI, then upon validation, I will need to map it out on a subsequent task.
k
Yes you need to pull that out in a task
m
Ok, that makes sense!
Copy code
with Flow("Reporting-Data-Import",
          storage=Local(path="/app/flow.py", stored_as_script=True),
          run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
    json_param = Parameter("data_sources", default="")
    flow_parameter = extract_parameters(json_param)
    # flow_parameter = FlowParameter(json_param)
    # flow_parameter.load_parameter()
    # flow_parameter.extract()
    # 
    is_valid = is_parameter_valid(flow_parameter)
    with case(is_valid, False):
        <http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
    # copy table & content
    copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
Here is another issue happening: flow_parameter is recognized as a task as opposed to the object I need to work on. Error:
Copy code
[2022-01-24 10:04:19-0800] INFO - prefect | Invalid parameter! <Task: extract_parameters>!
Traceback (most recent call last):
  File "/Users/mehdinazari/Documents/workbench/work/assemigroup/integrations/prefect_pipelines/mfiles_reporting_data_import/flow.py", line 137, in <module>
    copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
AttributeError: 'FunctionTask' object has no attribute 'sources'
k
Yes that is the same reason.
with Flow()
constructs the DAG so while it is constructed,
flow_parameter
is still of type
Task
because the code hasn’t run until
flow.run()
is called. That is why you can’t do
flow_parameter.sources
.
flow_parameter.sources
is executed during Flow construction. You need it to execute during Flow execution. Tasks defer the execution to runtime so this needs to be wrapped in a task
m
That is making sense too. Do you have an example/suggestion I can take a look at? I’m not sure how to overcome that!
k
Copy code
copy_table_content.map(param=task(lambda x: x.sources)(flow_parameter), upstream_tasks=[is_valid])
or
Copy code
@task
def pull_out_sources(x):
    return x.sources

...
copy_table_content.map(param=pull_out_sources(flow_parameter), upstream_tasks=[is_valid])
...
m
Still no luck! local debugging is a challenging with Prefect! I will need more help.
k
What is your error?
m
Copy code
[2022-01-24 10:38:20-0800] INFO - prefect.FlowRunner | Beginning Flow run for 'Reporting-Data-Import'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'data_sources': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'data_sources': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'extract_parameters': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'extract_parameters': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task '<lambda>': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task '<lambda>': Finished task run for task with final state: 'Success'
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'copy_table_content': Starting task run...
[2022-01-24 10:38:20-0800] INFO - prefect.TaskRunner | Task 'copy_table_content': Finished task run for task with final state: 'Failed'
[2022-01-24 10:38:20-0800] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
And I’m not sure where it is failing!
Additionally, these are my environment variables set:
Copy code
export PREFECT__LOGGING__LEVEL="INFO"
export PREFECT__LOGGING__FORMAT="[%(asctime)s] %(levelname)s - %(name)s | %(message)s"
export PREFECT__FLOWS__RUN_ON_SCHEDULE=false
export PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.executors.LocalExecutor"
export PREFECT__CLOUD__USE_LOCAL_SECRETS=false
k
This look to be failing inside the task.I would add logging to
copy_table_content
Copy code
import prefect

@task
def pull_out_sources(x):
    <http://prefect.context.logger.info|prefect.context.logger.info>(x)
    ...
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"succeeded for the value of {x}")
    return x.sources
something like this.
Your
case
is also not doing much:
Copy code
with case(is_valid, False):
        <http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
Because the logger.info will also be ran during Flow build-time. Not execution time
m
Is there a way I can connect my IDE to the local executor and step through the code? That’d give me a lot of visibility. I feel like I’m modifying code to be able to debug, which will make it different from what is going to production. Or any advanced training materials offered by Prefect at all?
k
You can try doing something like this to propagate the error:
Copy code
def custom_task(func=None, **task_init_kwargs):
    if func is None:
        return partial(custom_task, **task_init_kwargs)

    @wraps(func)
    def safe_func(**kwargs):
        try:
            return func(**kwargs)
        except Exception as e:
            print(f"Full Traceback: {traceback.format_exc()}")
            raise RuntimeError(type(e)) from None  # from None is necessary to not log the stacktrace

    safe_func.__name__ = func.__name__
    return task(safe_func, **task_init_kwargs)

@custom_task
def abc(x):
    return x