https://prefect.io logo
Title
a

Akshay Verma

08/20/2019, 9:50 AM
Hi, I am trying the following flow as an short example of what I want:
# Flow #########################
    @task
    def generate_context(wfr_df, lvnr, tol):
        print(wfr_df, lvnr, tol)
        return wfr_df, tol

    with Flow('TDBB') as flow:
        wfr_df = Parameter("wfr_df")
        modeldef = Parameter("modeldef")
        tol = Parameter("tol")
        lvnr = Parameter("lvnr")

        context_level = generate_context(wfr_df, lvnr, tol)
        context_level_ml = context_level[0]
        context_level_ci = context_level[1]
        print(context_level_ml)
        print(context_level_ci)

    flow.run(parameters=dict(
        wfr_df=mls,
        modeldef=("tx", "ty", "mwx", "mwy", "rwx", "rwy", "mx", "my", "rx", "ry"),
        tol=1e6,
        lvnr=3,
    ))
I am getting the following error :
Flow.run received the following unexpected parameters: modeldef
when I check
self.parameters()
from
flow.py
I have the following response:
{<Parameter: tol>, <Parameter: lvnr>, <Parameter: wfr_df>}
Can anyone point out what I am doing wrong here?
j

Jeremiah

08/20/2019, 1:44 PM
Hi @Akshay Verma, the problem is that your
modeldef
Parameter isn't being used in your flow. When using the functional API, Tasks (including Parameters) are only added to the flow when they are either called as functions or used as the input to another task. If you want to add your
modeldef
parameter, just call it (
modeldef()
) or use it as an input for another one of your tasks. Then it'll be a valid parameter of the flow!
a

Akshay Verma

08/21/2019, 7:06 AM
Thanks, fixed that. But I am getting warnings in the log that I do not understand.
[2019-08-21 07:04:41,631] INFO - prefect.FlowRunner | Beginning Flow run for 'TDBB'
[2019-08-21 07:04:41,632] INFO - prefect.FlowRunner | Starting flow run.
[2019-08-21 07:04:41,663] INFO - prefect.TaskRunner | Task 'tol': Starting task run...
[2019-08-21 07:04:41,664] WARNING - prefect.TaskRunner | Task 'tol': can't use cache because it is now invalid
[2019-08-21 07:04:41,666] INFO - prefect.TaskRunner | Task 'tol': finished task run for task with final state: 'Success'
j

Jeremiah

08/21/2019, 2:13 PM
Ah yes, sorry, that log is a bug that snuck into the 0.6.0 release. It prints even when there is no issue. I believe it’s been fixed if you upgrade to 0.6.1, or you can simply disregard it.
a

Akshay Verma

08/22/2019, 1:47 PM
Thanks, fixed that.
I tried switching fake function with the actual one by adding the processing part by doing the following:
@task
    def generate_context(wfr_df, lvnr, tol):
        print(wfr_df, lvnr, tol)
        mloflv, cid = split_clv(wfr_df, lvnr, tol)
        return wfr_df, tol
This causes the following in the logging:
[2019-08-22 13:46:45,468] INFO - prefect.TaskRunner | Task 'generate_context': Starting task run...
[2019-08-22 13:46:45,470] INFO - prefect.TaskRunner | Unexpected error: AttributeError("'int' object has no attribute 'copy'")
What is the meaning of the second logging line?
j

Jeremiah

08/22/2019, 3:59 PM
Usually that line is reporting an error that took place when you ran your task