https://prefect.io logo
Title
a

Adam Gold

02/20/2023, 3:58 PM
Hello everyone! Prefect looks awesome — but I’ve been having some issues with flow runs. I am running the agent as a kubernetes pod with
prefect agent start --pool "$PREFECT_ENV" --work-queue default
. 1. It takes really long for the flow to be submitted. Notice the time here is more than 30 seconds for the task to be created, before even running:
15:25:43.676 | INFO    | prefect.agent - Submitting flow run '0dda37d3-87e4-46e2-9266-920e7dae9113'
15:25:44.499 | INFO    | prefect.infrastructure.process - Opening process 'congenial-falcon'...
15:25:44.998 | INFO    | prefect.agent - Completed submission of flow run '0dda37d3-87e4-46e2-9266-920e7dae9113'
<frozen runpy>:128: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
15:26:01.483 | INFO    | Flow run 'congenial-falcon' - Downloading flow code from storage at '/app'
15:26:16.402 | INFO    | Flow run 'congenial-falcon' - Created task run 'return_value-0' for task 'return_value'
2. It downloads the code for every flow, making the pod go out of memory very quickly:
Pod ephemeral local storage usage exceeds the total limit
I am probably missing something here, but would love some help 🙏
t

Thet Naing

02/20/2023, 4:02 PM
How did you define the flow entrypoint? My flows always say
Importing flow code from 'flows/flow_file.py:flow_function'
. It seems you are passing in an entire directory? That may be what's causing issue #2 here
You can deploy your flow with
prefect deployment build flows/flow_file.py:flow_function
Where the flow code looks like this:
# flows/flow_file.py
from prefect import flow

@flow
def flow_function():
    # do stuff
    
if __name__ == "__main__":
    flow_function()
a

Adam Gold

02/20/2023, 4:18 PM
Thanks @Thet Naing! We deploy flows programmatically:
async def _base_build_from_flow(flow_function):
    return await Deployment.build_from_flow(
        flow=flow_function,
        name=settings.PREFECT_ENV,
        work_pool_name=settings.PREFECT_ENV,
        work_queue_name="default",
        apply=True,
        tags=[settings.PREFECT_ENV],
    )
When inspecting the flows with
poetry run prefect deployment inspect test-flow/prod
I get:
'path': '/app',
    'entrypoint': 'src/correlation/flows/prefect_test.py:test_flow',