https://prefect.io logo
b

Braun Reyes

12/13/2019, 3:41 PM
Hello, I have gotten this error when executing a flow in fargate
Copy code
2019-12-13 1:40am	prefect.CloudFlowRunner	INFO	 Beginning Flow run for 'dbt_run' 
2019-12-13 1:40am	prefect.CloudFlowRunner	INFO	 Starting flow run. 
2019-12-13 1:40am	prefect.CloudFlowRunner	ERROR	 Unexpected error: TypeError("start() missing 1 required positional argument: 'self'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 400, in get_flow_run_state
    with executor.start():
  File "/usr/local/lib/python3.7/contextlib.py", line 239, in helper
    return _GeneratorContextManager(func, args, kwds)
  File "/usr/local/lib/python3.7/contextlib.py", line 82, in __init__
    self.gen = func(*args, **kwds)
TypeError: start() missing 1 required positional argument: 'self'
This was a fargate task started by the fargatetaskenvironment
j

josh

12/13/2019, 3:46 PM
At first glance it seems like something is happening where it’s calling
your_executor.start()
instead of
your_executor().start()
. As in the executor isn’t getting initialized
b

Braun Reyes

12/13/2019, 3:55 PM
so in comparing environments remote vs fargate remote:
Copy code
# Load serialized flow from file and run it with a DaskExecutor
            flow = storage.get_flow(flow_location)
            with set_temporary_config({"engine.executor.default_class": self.executor}):
                executor = get_default_executor_class()

            executor = executor(**self.executor_kwargs)
            runner_cls = get_default_flow_runner_class()
            runner_cls(flow=flow).run(executor=executor)
fargatetask:
Copy code
# Load serialized flow from file and run it with the executor
            with open(
                prefect.context.get(
                    "flow_file_path", "/root/.prefect/flow_env.prefect"
                ),
                "rb",
            ) as f:
                flow = cloudpickle.load(f)

                runner_cls = get_default_flow_runner_class()
                executor_cls = get_default_executor_class()
                runner_cls(flow=flow).run(executor=executor_cls)
biggest different is this
Copy code
executor = executor(**self.executor_kwargs)
j

josh

12/13/2019, 4:03 PM
Have you ran into this before? I think that executor in the fargate task environment needs to be something like
executor=executor_cls()
b

Braun Reyes

12/13/2019, 4:04 PM
yup
here is what the k8s job env has
Copy code
executor_cls = get_default_executor_class()()
j

josh

12/13/2019, 4:04 PM
Yeah that needs to be updated
I’ll submit a PR
b

Braun Reyes

12/13/2019, 4:04 PM
this is the first time we have run into it
cool
I will add to our fork to unblock us
thanks!
b

Braun Reyes

12/13/2019, 5:13 PM
so in our fork I added a new flag called
enable_task_revisions
for the fargate agent and fargate task environment that will leverage task definition revisions instead of creating a new task definition family every time
it uses tags to tie the the revision to the flow version
much cleaner than adding a new prefect-task-<id> everytime
j

josh

12/13/2019, 5:16 PM
😮 amazing @Braun Reyes!
Looks clean
b

Braun Reyes

12/13/2019, 5:16 PM
we want to battle test a bit...wasn't sure if we should wait to try and PR this after its in prod a bot and just go for it now
yeah and if you unarchive an old flow it can go and fetch the original revision
guess it doesn't hurt to start getting feedback on the PR
j

josh

12/13/2019, 5:19 PM
Whatever works for you! I’m excited to try it 🙂