Lawrence Finn
11/09/2021, 1:03 PM08:01:52.893 | Submitting flow run 'f37fba9f-4280-431d-9f43-889e53192f24'
08:01:52.894 | Completed submission of flow run 'f37fba9f-4280-431d-9f43-889e53192f24'
08:01:52.901 | Finished monitoring for late runs.
08:01:54.187 | Flow run 'f37fba9f-4280-431d-9f43-889e53192f24' exited with exception: KeyError('__main__')
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta
# note that deployment names are
# stored and referenced as '<flow name>/<deployment name>'
DeploymentSpec(
flow_location="/Users/lawrencefinn/orion_example_flow.py",
name="my-first-deployment",
parameters={"nums": [1, 2, 3, 4]},
schedule=IntervalSchedule(interval=timedelta(minutes=1)),
)
from prefect import flow, task
from prefect.executors import DaskExecutor, SequentialExecutor
@task
def say_it(stuff):
print(f"Saying {stuff}")
@flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
def my_favorite_function():
say_it("hello")
return 0
# if __name__ == "__main__":
# my_favorite_function()
08:06:53.106 | Completed submission of flow run 'bb8c741e-da03-4746-9b7b-57970c425745'
Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")>
Traceback (most recent call last):
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/asyncio/events.py", line 81, in _run
self._context.run(self._callback, *self._args)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/asyncio/subprocess.py", line 73, in pipe_data_received
reader.feed_data(data)
File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/asyncio/streams.py", line 472, in feed_data
assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
Anna Geller
if __name__ == "__main__":
my_favorite_function()
But when removing the executor, it works fine with the deployment.
Btw. your flow doesn’t have a flow name on the @flow decorator and your deployment specifies parameters that you don’t use in your flow - this shouldn’t be the problem here, but just to let you know.
Thanks for checking and reporting that.Lawrence Finn
11/09/2021, 1:51 PMAnna Geller
Zanie
DaskExecutor(cluster_kwargs=dict(processes=False))
-- well investigate a fix though!