Georg Raiser
08/21/2025, 10:13 AM@prefect.task(
name="a_dummy_task",
)
def dummy_task():
return 1
def dynamic_flow(name):
@prefect.flow(name=name)
def my_dynamic_flow():
return dummy_task.submit().result()
return my_dynamic_flow
outside of a docker container this works just fine:
if __name__ == "__main__":
static_flow()
flow = dynamic_flow("dynamic flow test")
flow()
on the dashboard of the local server, all looks well for both static and dynamic flow creation.
However, when I am using docker and containers and when I run it using prefect.deploy()
def deploy_flow(flow):
prefect.deploy(
flow.to_deployment(
name=flow.name,
job_variables={
"image_pull_policy": "Never",
},
),
work_pool_name="docker-testing",
build=False,
push=False,
image="my-dummy-docker:latest",
)
if __name__ == "__main__":
# works
deploy_flow(static_flow)
# doesn't
flow = dynamic_flow("deployed dynamic flow test")
deploy_flow(flow)
the dynamic task fails whereas the static task still works fine. I get this error message:
RuntimeError: Function with name 'dynamic_flow.<locals>.my_dynamic_flow' not found in 'dynamic_flow_fail_minimal_example.py'.
here is the complete code and full error for reproducing a minimal working example:
build_prefect_container.sh
docker buildx build . \
--pull \
--platform linux/amd64 \
--tag my-dummy-docker \
--file ./Dockerfile \
--no-cache \
Dockerfile
FROM prefecthq/prefect:3-python3.12
WORKDIR /home/user/code
ADD ./dynamic_flow_fail_minimal_example.py /home/user/code/dynamic_flow_fail_minimal_example.py
dynamic_flow_fail_minimal_example.py
import prefect
@prefect.task(
name="a_dummy_task",
log_prints=True,
)
def dummy_task():
return 1
@prefect.flow(name="static_flow")
def static_flow():
return dummy_task.submit().result()
def dynamic_flow(name):
@prefect.flow(name=name)
def my_dynamic_flow():
return dummy_task.submit().result()
return my_dynamic_flow
def deploy_flow(flow):
prefect.deploy(
flow.to_deployment(
name=flow.name,
job_variables={
"image_pull_policy": "Never",
},
),
work_pool_name="docker-testing",
build=False,
push=False,
image="my-dummy-docker:latest",
)
if __name__ == "__main__":
# this works
# static_flow()
# flow = dynamic_flow("dynamic flow test")
# flow()
# works
deploy_flow(static_flow)
# doesn't
flow = dynamic_flow("deployed dynamic flow test")
deploy_flow(flow)
full error traceback
Unexpected exception encountered when trying to load flow
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2538, in load_function_and_convert_to_flow
func = import_object(entrypoint) # pyright: ignore[reportRedeclaration]
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 174, in import_object
obj = getattr(obj, part)
^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute '<locals>'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/prefect/engine.py", line 112, in <module>
flow: "Flow[..., Any]" = load_flow(flow_run)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py", line 150, in load_flow
flow = run_coro_as_sync(
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync
return call.result()
^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 365, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 441, in _run_async
result = await coro
^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper
return await task
^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/client/utilities.py", line 69, in wrapper
return await func(client, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2832, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
result = await anyio.to_thread.run_sync(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 967, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
return call()
^^^^^^
File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2540, in load_function_and_convert_to_flow
raise RuntimeError(
RuntimeError: Function with name 'dynamic_flow.<locals>.my_dynamic_flow' not found in 'dynamic_flow_fail_minimal_example.py'.
thanks for any thoughtsBrendan Dalpe
08/21/2025, 2:04 PMGeorg Raiser
08/21/2025, 2:15 PMBrendan Dalpe
08/21/2025, 3:12 PMGeorg Raiser
08/22/2025, 8:58 AMdeployment = flow.to_deployment(
name="my-deployment",
job_variables=dict(
image_pull_policy="Never",
),
)
prefect.deploy(
deployment,
work_pool_name="docker-testing",
image="my-dummy-docker:latest",
build=False,
push=False,
)
run_deployment(
name=f"{flow.name}/my-deployment",
timeout=0,
but it leads to the same error. I assume I have to somehow specify the arguments to run_deployment
in a specific way? I can't figure out how.Brendan Dalpe
08/22/2025, 2:34 PMrun_deployment
in your @flow
method, not in the __main__
section:
Something like this:
import prefect
@prefect.task(
name="a_dummy_task",
log_prints=True,
)
def dummy_task(name):
return name
@prefect.flow(name="session_flow")
def session_flow(session = 5):
# The session
for i in range(session):
# Run each comutation independently separately
prefect.deployments.run_deployment(
name="dynamic_flow/dynamic_flow",
parameters={
"name": i
}
)
@prefect.flow(name="dynamic_flow")
def dynamic_flow(name):
return dummy_task(name)
def deploy_flow(flow):
prefect.deploy(
flow.to_deployment(
name=flow.name,
job_variables={
"image_pull_policy": "Never",
},
),
work_pool_name="docker-testing",
build=False,
push=False,
image="my-dummy-docker:latest",
)
if __name__ == "__main__":
deploy_flow(session_flow)
deploy_flow(dynamic_flow)
Georg Raiser
08/29/2025, 11:21 AMimport prefect
@prefect.task(
name="a_dummy_task",
log_prints=True,
)
def dummy_task(some_variable):
return some_variable
def dynamic_flow_creator(name):
@prefect.flow(name=name)
def dynamic_flow():
return dummy_task("test")
return dynamic_flow
if __name__ == "__main__":
# at runtime, create a flow
name = "programmatically-assigned-flow-name"
flow = dynamic_flow_creator(name=name)
prefect.deploy(
flow.to_deployment(
name=flow.name,
job_variables={
"image_pull_policy": "Never",
},
),
work_pool_name="docker-testing",
build=False,
push=False,
image="my-dummy-docker:latest",
)
this errors with:
File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2540, in load_function_and_convert_to_flow
raise RuntimeError(
RuntimeError: Function with name 'dynamic_flow_creator.<locals>.dynamic_flow' not found in 'dynamic_flow_slack_example.py'.
The flow is created, but when the deployment is triggered, I get the above error.
and just to explain what I am trying to do here and why: I want to dynamically create flows (and tasks) at runtime. Ultimately there will be more to it then just setting the name of course, but this serves as an example.