Hi, I am facing an issue running flows in docker c...
# ask-community
g
Hi, I am facing an issue running flows in docker containers. Any leads to what I am doing conceptually wrong would be greatly appreciated. At the core of the problem seems to be that I am dynamically creating flows as such:
Copy code
@prefect.task(
    name="a_dummy_task",
)
def dummy_task():
    return 1
    
def dynamic_flow(name):
    @prefect.flow(name=name)
    def my_dynamic_flow():
        return dummy_task.submit().result()

    return my_dynamic_flow
outside of a docker container this works just fine:
Copy code
if __name__ == "__main__":
    static_flow()
    flow = dynamic_flow("dynamic flow test")
    flow()
on the dashboard of the local server, all looks well for both static and dynamic flow creation. However, when I am using docker and containers and when I run it using
prefect.deploy()
Copy code
def deploy_flow(flow):
    prefect.deploy(
        flow.to_deployment(
            name=flow.name,
            job_variables={
                "image_pull_policy": "Never",
            },
        ),
        work_pool_name="docker-testing",
        build=False,
        push=False,
        image="my-dummy-docker:latest",
    )


if __name__ == "__main__":
    # works
    deploy_flow(static_flow)

    # doesn't
    flow = dynamic_flow("deployed dynamic flow test")
    deploy_flow(flow)
the dynamic task fails whereas the static task still works fine. I get this error message:
Copy code
RuntimeError: Function with name 'dynamic_flow.<locals>.my_dynamic_flow' not found in 'dynamic_flow_fail_minimal_example.py'.
here is the complete code and full error for reproducing a minimal working example:
build_prefect_container.sh
Copy code
docker buildx build . \
--pull \
--platform linux/amd64 \
--tag my-dummy-docker \
--file ./Dockerfile \
--no-cache \
Dockerfile
Copy code
FROM prefecthq/prefect:3-python3.12
WORKDIR /home/user/code
ADD ./dynamic_flow_fail_minimal_example.py /home/user/code/dynamic_flow_fail_minimal_example.py
dynamic_flow_fail_minimal_example.py
Copy code
import prefect

@prefect.task(
    name="a_dummy_task",
    log_prints=True,
)
def dummy_task():
    return 1


@prefect.flow(name="static_flow")
def static_flow():
    return dummy_task.submit().result()


def dynamic_flow(name):
    @prefect.flow(name=name)
    def my_dynamic_flow():
        return dummy_task.submit().result()

    return my_dynamic_flow


def deploy_flow(flow):
    prefect.deploy(
        flow.to_deployment(
            name=flow.name,
            job_variables={
                "image_pull_policy": "Never",
            },
        ),
        work_pool_name="docker-testing",
        build=False,
        push=False,
        image="my-dummy-docker:latest",
    )


if __name__ == "__main__":
    # this works
    # static_flow()
    # flow = dynamic_flow("dynamic flow test")
    # flow()

    # works
    deploy_flow(static_flow)

    # doesn't
    flow = dynamic_flow("deployed dynamic flow test")
    deploy_flow(flow)
full error traceback
Copy code
Unexpected exception encountered when trying to load flow
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2538, in load_function_and_convert_to_flow
    func = import_object(entrypoint)  # pyright: ignore[reportRedeclaration]
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/importtools.py", line 174, in import_object
    obj = getattr(obj, part)
          ^^^^^^^^^^^^^^^^^^
AttributeError: 'function' object has no attribute '<locals>'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/engine.py", line 112, in <module>
    flow: "Flow[..., Any]" = load_flow(flow_run)
                             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/flow_engine.py", line 150, in load_flow
    flow = run_coro_as_sync(
           ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync
    return call.result()
           ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 365, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 441, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper
    return await task
           ^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/client/utilities.py", line 69, in wrapper
    return await func(client, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2832, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
    result = await anyio.to_thread.run_sync(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2476, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 967, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
    return call()
           ^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2540, in load_function_and_convert_to_flow
    raise RuntimeError(
RuntimeError: Function with name 'dynamic_flow.<locals>.my_dynamic_flow' not found in 'dynamic_flow_fail_minimal_example.py'.
thanks for any thoughts
b
Hey @Georg Raiser, I'd like to understand more about what you're trying to solve with a "dynamic" flow here.
g
the use case can be roughly described as this: I have a number of computations of which subsets can be logically grouped into let's call them "sessions". Each session contains (variable) number of computations and some of those have to be run in mutually exclusive environments. Hence I have one flow per comutation (let's call those sub-flows), with a single task inside each sub-flow. Those flows are then grouped by a super-flow representing the "session". My approach would be to deploy the sub-flows to respective docker containers solving the environment issue. As not every session has the same requirements for computations, I was trying to generate the individual sub-flows programatically based on the different requirements for each session.
b
Ah, yeah that is actually a common Prefect architectural pattern. To dispatch the sub-flows, you can call `run_deployment` which will provision a new runner for the sub-flow and execute the code. Take a look at this example, which I think could be adapted for your use case: https://github.com/PrefectHQ/prefect-recipes/blob/main/flows-advanced/parent-orchestrator/pokemon_weight.py
g
thank you for your reply @Brendan Dalpe! I am still struggling here, but maybe I am not getting something simple and obvious. I am using now a pattern as such
Copy code
deployment = flow.to_deployment(
        name="my-deployment",
        job_variables=dict(
            image_pull_policy="Never",
        ),
    )

    prefect.deploy(
        deployment,
        work_pool_name="docker-testing",
        image="my-dummy-docker:latest",
        build=False,
        push=False,
    )

    run_deployment(
        name=f"{flow.name}/my-deployment",
        timeout=0,
but it leads to the same error. I assume I have to somehow specify the arguments to
run_deployment
in a specific way? I can't figure out how.
b
@Georg Raiser you would call
run_deployment
in your
@flow
method, not in the
__main__
section: Something like this:
Copy code
import prefect

@prefect.task(
    name="a_dummy_task",
    log_prints=True,
)
def dummy_task(name):
    return name


@prefect.flow(name="session_flow")
def session_flow(session = 5):
    # The session 
    for i in range(session):
        # Run each comutation independently separately
        prefect.deployments.run_deployment(
            name="dynamic_flow/dynamic_flow",
            parameters={
                "name": i
            }
        )

@prefect.flow(name="dynamic_flow")
def dynamic_flow(name):
    return dummy_task(name)


def deploy_flow(flow):
    prefect.deploy(
        flow.to_deployment(
            name=flow.name,
            job_variables={
                "image_pull_policy": "Never",
            },
        ),
        work_pool_name="docker-testing",
        build=False,
        push=False,
        image="my-dummy-docker:latest",
    )


if __name__ == "__main__":
    deploy_flow(session_flow)
    deploy_flow(dynamic_flow)
g
Hi @Brendan Dalpe , thank you very much for your code example! It runs, but it doesn't solve my underlying issue. I have put together another example here:
Copy code
import prefect


@prefect.task(
    name="a_dummy_task",
    log_prints=True,
)
def dummy_task(some_variable):
    return some_variable


def dynamic_flow_creator(name):
    @prefect.flow(name=name)
    def dynamic_flow():
        return dummy_task("test")

    return dynamic_flow


if __name__ == "__main__":
    # at runtime, create a flow
    name = "programmatically-assigned-flow-name"
    flow = dynamic_flow_creator(name=name)

    prefect.deploy(
        flow.to_deployment(
            name=flow.name,
            job_variables={
                "image_pull_policy": "Never",
            },
        ),
        work_pool_name="docker-testing",
        build=False,
        push=False,
        image="my-dummy-docker:latest",
    )
this errors with:
Copy code
File "/usr/local/lib/python3.12/site-packages/prefect/flows.py", line 2540, in load_function_and_convert_to_flow
    raise RuntimeError(
RuntimeError: Function with name 'dynamic_flow_creator.<locals>.dynamic_flow' not found in 'dynamic_flow_slack_example.py'.
The flow is created, but when the deployment is triggered, I get the above error. and just to explain what I am trying to do here and why: I want to dynamically create flows (and tasks) at runtime. Ultimately there will be more to it then just setting the name of course, but this serves as an example.