Hi, I’m getting an error I don’t understand, along...
# prefect-community
s
Hi, I’m getting an error I don’t understand, along with unexpected behavior:
I have a task called
build_task
which accepts 3 parameters.
Copy code
task_results = S3Result(bucket=F'mybucket-{prefect_config.RUN_ENV}', location="prefect/results/{task_name}.txt")

@task
def build_task(creds, result, datasets):
    #builds a list
    ...
    actions_data = task_results.write(task_list, **prefect.context)
    return actions_data
I execute it as follows:
Copy code
with Flow(
    PREFECT_FLOW_NAME, 
    storage=STORAGE, 
    run_config=RUN_CONFIG, 
    executor=LocalDaskExecutor(num_workers=8),
    schedule=PREFECT_FLOW_SCHEDULE,
    result=task_results
) as flow:
    creds = PrefectSecret('some_secret')
    app_creds = PrefectSecret('app_secret')
    # watermark = SnowflakeQuery()
    datasets = get_datasets(creds)
    actions_response = get_actions(creds)
    final = build_task(app_creds, actions_response, datasets)
    # more stuff...
I get the following error that I am missing 3 required positional arguments. Please note that the task fails, but then also succeeds. How might I be causing this?
Copy code
[2022-04-18 10:00:52+0545] INFO - prefect.TaskRunner | Task 'build_task': Starting task run...
[2022-04-18 10:00:52+0545] ERROR - prefect.TaskRunner | Task 'build_task': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/slloyd/projects/dwbi-orchestration/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/slloyd/projects/dwbi-orchestration/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
TypeError: build_task() missing 3 required positional arguments: 'creds', 'result', and 'datasets'
[2022-04-18 10:00:52+0545] INFO - prefect.TaskRunner | Task 'build_task': Starting task run...
[2022-04-18 10:00:53+0545] INFO - prefect.TaskRunner | Task 'build_task': Finished task run for task with final state: 'Failed'
[2022-04-18 10:01:00+0545] INFO - prefect.TaskRunner | Task 'build_task': Finished task run for task with final state: 'Success'
This is the graphviz representation. Not sure why there are two lines some places and not others.
And not sure why
build_task
is duplicated since I only call it once (and only refer to it once.
k
Hey @Stephen Lloyd, could you move some of the code to the thread when you get a chance to keep the main channel cleaner? I think you might have something like:
Copy code
with Flow(..) as flow:
    ...
    final = build_task(app_creds, actions_response, datasets)
    ...
    some_other_task(upstream_tasks=[build_task])
instead, make the upstream task
final
like:
Copy code
some_other_task(upstream_tasks=[final])
this will point to the instance of the task that you called. Otherwise, a new one will be created because the Flow block makes copies
s
I was doing things like
build_task.set_upstream('some_task')
later in my code. When I removed those, everything resolved.
k
Assign the upstream to the variable instead of the task definition and you should get the behavior you want