Another orion question, im now playing with deploy...
# prefect-server
l
Another orion question, im now playing with deployments but the runs dont seem to be executing correctly. I’m seeing
Copy code
08:01:52.893 | Submitting flow run 'f37fba9f-4280-431d-9f43-889e53192f24'
08:01:52.894 | Completed submission of flow run 'f37fba9f-4280-431d-9f43-889e53192f24'
08:01:52.901 | Finished monitoring for late runs.
08:01:54.187 | Flow run 'f37fba9f-4280-431d-9f43-889e53192f24' exited with exception: KeyError('__main__')
my deployment:
Copy code
from prefect.deployments import DeploymentSpec
from prefect.orion.schemas.schedules import IntervalSchedule
from datetime import timedelta


# note that deployment names are 
# stored and referenced as '<flow name>/<deployment name>'
DeploymentSpec(
    flow_location="/Users/lawrencefinn/orion_example_flow.py",
    name="my-first-deployment",
    parameters={"nums": [1, 2, 3, 4]}, 
    schedule=IntervalSchedule(interval=timedelta(minutes=1)),
)
my flow:
Copy code
from prefect import flow, task
from prefect.executors import DaskExecutor, SequentialExecutor

@task
def say_it(stuff):
    print(f"Saying {stuff}")

@flow(executor=DaskExecutor(cluster_kwargs={"n_workers": 4, "threads_per_worker": 1}))
def my_favorite_function():
    say_it("hello")
    return 0

# if __name__ == "__main__":
#     my_favorite_function()
I also see things like
Copy code
08:06:53.106 | Completed submission of flow run 'bb8c741e-da03-4746-9b7b-57970c425745'
Exception in callback SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")
handle: <Handle SubprocessStreamProtocol.pipe_data_received(1, b"EXCEPTION 8...XCEPTION 81\n")>
Traceback (most recent call last):
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/asyncio/subprocess.py", line 73, in pipe_data_received
    reader.feed_data(data)
  File "/Users/lawrencefinn/.pyenv/versions/3.8.5/lib/python3.8/asyncio/streams.py", line 472, in feed_data
    assert not self._eof, 'feed_data after feed_eof'
AssertionError: feed_data after feed_eof
a
I tried to reproduce - using the same flow file you shared, I also got KeyError(’__main__‘) when a flow is scheduled and it uses Dask executor. I tried it with both, once with and once without main:
Copy code
if __name__ == "__main__":
    my_favorite_function()
But when removing the executor, it works fine with the deployment. Btw. your flow doesn’t have a flow name on the @flow decorator and your deployment specifies parameters that you don’t use in your flow - this shouldn’t be the problem here, but just to let you know. Thanks for checking and reporting that.
l
ah dask causes the issue, got it
👍 1
does orion get rid of the agent model?
a
Agents still exist in Orion, but they are no longer required to run your flows. Agents will only be required for automatically scheduled/API-triggered deployment runs.
z
This will work if you disable subprocesses for the dask workers
DaskExecutor(cluster_kwargs=dict(processes=False))
-- well investigate a fix though!
upvote 1