https://prefect.io logo
s

Scott Asher

10/24/2020, 4:23 AM
I have a custom executor/environment (PipelineExecutor/PipelineEnvironment) to execute code using my on-prem scheduler. I’m seeing the following error, which I’m trying to debug:
Copy code
execute() missing 1 required positional argument: 'flow_location'
Traceback (most recent call last):
  File "/usr/local/bin/prefect", line 11, in <module>
    sys.exit(cli())
  File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.6/dist-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 34, in flow_run
    return _execute_flow_run()
  File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 99, in _execute_flow_run
    raise exc
  File "/usr/local/lib/python3.6/dist-packages/prefect/cli/execute.py", line 93, in _execute_flow_run
    environment.execute(flow)
TypeError: execute() missing 1 required positional argument: 'flow_location'
[2020-10-24 04:14:03,330] INFO - agent | Process PID 23114 returned non-zero exit code
This error confuses me, primarily because I don’t see my custom executor anywhere in the stack, nor do I understand what was supposed to be passing around location. The code runs fine when run locally using `flow.run()`:
Copy code
[2020-10-24 04:01:44] INFO - prefect.FlowRunner | Beginning Flow run for 'test_flow'
[2020-10-24 04:01:44] INFO - prefect.TaskRunner | Task 'random_number': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'random_number': finished task run for task with final state: 'Success'
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_one': finished task run for task with final state: 'Success'
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': Starting task run...
[2020-10-24 04:01:49] INFO - prefect.TaskRunner | Task 'plus_two': finished task run for task with final state: 'Success'
[2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': Starting task run...
[2020-10-24 04:01:50] INFO - prefect.TaskRunner | Task 'plus_three': finished task run for task with final state: 'Success'
[2020-10-24 04:01:50] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Code is (imports removed to save space):
Copy code
@task(name="random_number")
def random_number():
    time.sleep(5)
    return random.randint(0, 100)

@task(name="random_number")
def random_number():
    time.sleep(5)
    return random.randint(0, 100)

@task(name="plus_one")
def plus_one(x):
    return x + 1

@task(name="plus_two")
def plus_two(x):
    return x + 2

@task(name="plus_three")
def plus_three(x):
    return x + 3

#with Flow('test_flow', environment=LocalEnvironment(executor=PipelineExecutor())) as flow:
flow = Flow('test_flow', storage=Local(), environment=PipelineEnvironment())
plus_three.bind(plus_one, flow=flow)
plus_two.set_upstream(plus_one, flow=flow)
plus_two.bind(plus_one, flow=flow)
plus_one.set_upstream(random_number, flow=flow)
plus_one.bind(random_number, flow=flow)