Stephen Lloyd
04/18/2022, 4:22 AMbuild_task
which accepts 3 parameters.
task_results = S3Result(bucket=F'mybucket-{prefect_config.RUN_ENV}', location="prefect/results/{task_name}.txt")
@task
def build_task(creds, result, datasets):
#builds a list
...
actions_data = task_results.write(task_list, **prefect.context)
return actions_data
I execute it as follows:
with Flow(
PREFECT_FLOW_NAME,
storage=STORAGE,
run_config=RUN_CONFIG,
executor=LocalDaskExecutor(num_workers=8),
schedule=PREFECT_FLOW_SCHEDULE,
result=task_results
) as flow:
creds = PrefectSecret('some_secret')
app_creds = PrefectSecret('app_secret')
# watermark = SnowflakeQuery()
datasets = get_datasets(creds)
actions_response = get_actions(creds)
final = build_task(app_creds, actions_response, datasets)
# more stuff...
I get the following error that I am missing 3 required positional arguments. Please note that the task fails, but then also succeeds. How might I be causing this?
[2022-04-18 10:00:52+0545] INFO - prefect.TaskRunner | Task 'build_task': Starting task run...
[2022-04-18 10:00:52+0545] ERROR - prefect.TaskRunner | Task 'build_task': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/slloyd/projects/dwbi-orchestration/.venv/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/slloyd/projects/dwbi-orchestration/.venv/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
TypeError: build_task() missing 3 required positional arguments: 'creds', 'result', and 'datasets'
[2022-04-18 10:00:52+0545] INFO - prefect.TaskRunner | Task 'build_task': Starting task run...
[2022-04-18 10:00:53+0545] INFO - prefect.TaskRunner | Task 'build_task': Finished task run for task with final state: 'Failed'
[2022-04-18 10:01:00+0545] INFO - prefect.TaskRunner | Task 'build_task': Finished task run for task with final state: 'Success'
build_task
is duplicated since I only call it once (and only refer to it once.Kevin Kho
04/18/2022, 1:44 PMwith Flow(..) as flow:
...
final = build_task(app_creds, actions_response, datasets)
...
some_other_task(upstream_tasks=[build_task])
instead, make the upstream task final
like:
some_other_task(upstream_tasks=[final])
this will point to the instance of the task that you called. Otherwise, a new one will be created because the Flow block makes copiesStephen Lloyd
04/19/2022, 10:22 AMbuild_task.set_upstream('some_task')
later in my code. When I removed those, everything resolved.Kevin Kho
04/19/2022, 1:33 PM