Hi, I am trying the following flow as an short exa...
# ask-community
a
Hi, I am trying the following flow as an short example of what I want:
Copy code
# Flow #########################
    @task
    def generate_context(wfr_df, lvnr, tol):
        print(wfr_df, lvnr, tol)
        return wfr_df, tol

    with Flow('TDBB') as flow:
        wfr_df = Parameter("wfr_df")
        modeldef = Parameter("modeldef")
        tol = Parameter("tol")
        lvnr = Parameter("lvnr")

        context_level = generate_context(wfr_df, lvnr, tol)
        context_level_ml = context_level[0]
        context_level_ci = context_level[1]
        print(context_level_ml)
        print(context_level_ci)

    flow.run(parameters=dict(
        wfr_df=mls,
        modeldef=("tx", "ty", "mwx", "mwy", "rwx", "rwy", "mx", "my", "rx", "ry"),
        tol=1e6,
        lvnr=3,
    ))
I am getting the following error :
Flow.run received the following unexpected parameters: modeldef
when I check
self.parameters()
from
flow.py
I have the following response:
Copy code
{<Parameter: tol>, <Parameter: lvnr>, <Parameter: wfr_df>}
Can anyone point out what I am doing wrong here?
j
Hi @Akshay Verma, the problem is that your
modeldef
Parameter isn't being used in your flow. When using the functional API, Tasks (including Parameters) are only added to the flow when they are either called as functions or used as the input to another task. If you want to add your
modeldef
parameter, just call it (
modeldef()
) or use it as an input for another one of your tasks. Then it'll be a valid parameter of the flow!
a
Thanks, fixed that. But I am getting warnings in the log that I do not understand.
Copy code
[2019-08-21 07:04:41,631] INFO - prefect.FlowRunner | Beginning Flow run for 'TDBB'
[2019-08-21 07:04:41,632] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-21 07:04:41,663] INFO - prefect.TaskRunner | Task 'tol': Starting task run...
[2019-08-21 07:04:41,664] WARNING - prefect.TaskRunner | Task 'tol': can't use cache because it is now invalid
[2019-08-21 07:04:41,666] INFO - prefect.TaskRunner | Task 'tol': finished task run for task with final state: 'Success'
j
Ah yes, sorry, that log is a bug that snuck into the 0.6.0 release. It prints even when there is no issue. I believe it’s been fixed if you upgrade to 0.6.1, or you can simply disregard it.
a
Thanks, fixed that.
I tried switching fake function with the actual one by adding the processing part by doing the following:
Copy code
@task
    def generate_context(wfr_df, lvnr, tol):
        print(wfr_df, lvnr, tol)
        mloflv, cid = split_clv(wfr_df, lvnr, tol)
        return wfr_df, tol
This causes the following in the logging:
Copy code
[2019-08-22 13:46:45,468] INFO - prefect.TaskRunner | Task 'generate_context': Starting task run...
[2019-08-22 13:46:45,470] INFO - prefect.TaskRunner | Unexpected error: AttributeError("'int' object has no attribute 'copy'")
What is the meaning of the second logging line?
j
Usually that line is reporting an error that took place when you ran your task