Scott Asher
10/24/2020, 4:23 AMexecute() missing 1 required positional argument: 'flow_location'
Traceback (most recent call last):
File "/usr/local/bin/prefect", line 11, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 34, in flow_run
return _execute_flow_run()
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 99, in _execute_flow_run
raise exc
File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 93, in _execute_flow_run
environment.execute(flow)
TypeError: execute() missing 1 required positional argument: 'flow_location'
[2020-10-24 04:14:03,330] INFO - agent | Process PID 23114 returned non-zero exit code
This error confuses me, primarily because I don’t see my custom executor anywhere in the stack, nor do I understand what was supposed to be passing around location.
The code runs fine when run locally using `flow.run()`:
[2020-10-24 04:01:44] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow'
[2020-10-24 04:01:44] INFO - prefect.TaskRunner | Task 'random_number': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'random_number': finished task run for task with final state: 'Success'
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': finished task run for task with final state: 'Success'
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': finished task run for task with final state: 'Success'
[2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': Starting task run...
[2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': finished task run for task with final state: 'Success'
[2020-10-24 04:01:50] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Code is (imports removed to save space):
@task(name="random_number")
def random_number():
time.sleep(5)
return random.randint(0, 100)
@task(name="random_number")
def random_number():
time.sleep(5)
return random.randint(0, 100)
@task(name="plus_one")
def plus_one(x):
return x + 1
@task(name="plus_two")
def plus_two(x):
return x + 2
@task(name="plus_three")
def plus_three(x):
return x + 3
#with Flow('test_flow', environment=LocalEnvironment(executor=PipelineExecutor())) as flow:
flow = Flow('test_flow', storage=Local(), environment=PipelineEnvironment())
plus_three.bind(plus_one, flow=flow)
plus_two.set_upstream(plus_one, flow=flow)
plus_two.bind(plus_one, flow=flow)
plus_one.set_upstream(random_number, flow=flow)
plus_one.bind(random_number, flow=flow)