Hello, I have gotten this error when executing a f...
# ask-community
b
Hello, I have gotten this error when executing a flow in fargate
Copy code
2019-12-13 1:40am	prefect.CloudFlowRunner	INFO	 Beginning Flow run for 'dbt_run' 
2019-12-13 1:40am	prefect.CloudFlowRunner	INFO	 Starting flow run. 
2019-12-13 1:40am	prefect.CloudFlowRunner	ERROR	 Unexpected error: TypeError("start() missing 1 required positional argument: 'self'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 400, in get_flow_run_state
    with executor.start():
  File "/usr/local/lib/python3.7/contextlib.py", line 239, in helper
    return _GeneratorContextManager(func, args, kwds)
  File "/usr/local/lib/python3.7/contextlib.py", line 82, in __init__
    self.gen = func(*args, **kwds)
TypeError: start() missing 1 required positional argument: 'self'
This was a fargate task started by the fargatetaskenvironment
j
At first glance it seems like something is happening where it’s calling
your_executor.start()
instead of
your_executor().start()
. As in the executor isn’t getting initialized
b
so in comparing environments remote vs fargate remote:
Copy code
# Load serialized flow from file and run it with a DaskExecutor
            flow = storage.get_flow(flow_location)
            with set_temporary_config({"engine.executor.default_class": self.executor}):
                executor = get_default_executor_class()

            executor = executor(**self.executor_kwargs)
            runner_cls = get_default_flow_runner_class()
            runner_cls(flow=flow).run(executor=executor)
fargatetask:
Copy code
# Load serialized flow from file and run it with the executor
            with open(
                prefect.context.get(
                    "flow_file_path", "/root/.prefect/flow_env.prefect"
                ),
                "rb",
            ) as f:
                flow = cloudpickle.load(f)

                runner_cls = get_default_flow_runner_class()
                executor_cls = get_default_executor_class()
                runner_cls(flow=flow).run(executor=executor_cls)
biggest different is this
Copy code
executor = executor(**self.executor_kwargs)
j
Have you ran into this before? I think that executor in the fargate task environment needs to be something like
executor=executor_cls()
b
yup
here is what the k8s job env has
Copy code
executor_cls = get_default_executor_class()()
j
Yeah that needs to be updated
I’ll submit a PR
b
this is the first time we have run into it
cool
I will add to our fork to unblock us
thanks!
b
so in our fork I added a new flag called
enable_task_revisions
for the fargate agent and fargate task environment that will leverage task definition revisions instead of creating a new task definition family every time
it uses tags to tie the the revision to the flow version
much cleaner than adding a new prefect-task-<id> everytime
j
😮 amazing @Braun Reyes!
Looks clean
b
we want to battle test a bit...wasn't sure if we should wait to try and PR this after its in prod a bot and just go for it now
yeah and if you unarchive an old flow it can go and fetch the original revision
guess it doesn't hurt to start getting feedback on the PR
j
Whatever works for you! I’m excited to try it 🙂