Is there a way to specify `flow_runner` in the dep...
# prefect-community
m
Is there a way to specify
flow_runner
in the deployment? I get `ValueError: Unregistered flow runner 'DockerFlowRunner'`when running
prefect deployment create my_deployment.yaml
. My deployment looks like this:
Copy code
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
  to_print: "Hello from first deployment!"
tags:
  - dev
flow_runner: 
  type: DockerFlowRunner
  config:
    image: viadot:orion
Unfortunately the
flow_runner
config is not documented anywhere so it's hard for me to say if I'm specifying it incorrectly or it's not supported at all.
1
a
Can you try the same using Python? much easier than YAML:
Copy code
import platform
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner


...


@flow
def hello_flow():
    hi = say_hi()
    print_platform_info(wait_for=[hi])


DeploymentSpec(name="dev", flow=hello_flow, flow_runner=DockerFlowRunner())


if __name__ == "__main__":
    hello_flow()
afaik, YAML is pretty much only for non-Python DevOps admins maybe you can try this?
Copy code
flow_runner: DockerFlowRunner
m
I'll check and get back to you but I'd prefer to use YAML eventually, I don't feel comfortable storing configs in executable files, I feel like analysts will eventually abuse this somehow 😅 and YAML files are very easy and safe to parse and check for policy in CI/CD.
👀 1
a
how would they abuse DeploymentSpec, but wouldn't abuse YAML? 🤔 it's the same config that gets sent to the backend
But I totally understand what you mean with respect to ensuring standards, specifying deployments via Python code allows to build some extra abstraction/function allowing you to avoid boilerplate (which YAML forces you to have) - an example:
Copy code
from prefect.deployments import DeploymentSpec
from prefect.flows import Flow
from prefect.orion.schemas.schedules import SCHEDULE_TYPES

# from prefect.flow_runners import DockerFlowRunner
from typing import Any, Dict, List
from flows.async_flow import async_flow
from flows.crypto_prices_etl import crypto_prices_etl
from flows.repo_trending_check import repo_trending_check


def set_deployment_spec(
    flow: Flow,
    deployment_name_suffix: str = "dev",
    schedule: SCHEDULE_TYPES = None,
    parameters: Dict[str, Any] = None,
    tags: List[str] = None,
) -> DeploymentSpec:
    deploy_tags = (
        [deployment_name_suffix] if tags is None else [deployment_name_suffix, *tags]
    )
    return DeploymentSpec(
        flow=flow,
        name=f"{flow.name}_{deployment_name_suffix}",
        schedule=schedule,
        tags=deploy_tags,
        parameters=parameters,
        # flow_runner=DockerFlowRunner()
    )


set_deployment_spec(async_flow)
set_deployment_spec(crypto_prices_etl)
set_deployment_spec(repo_trending_check)
set_deployment_spec(
    repo_trending_check,
    deployment_name_suffix="orion_dev",
    parameters=dict(repo="orion"),
)
🙌 1
when using the same default spec, creating deployment is as simple as a single line and passing the flow to it as in here:
Copy code
set_deployment_spec(crypto_prices_etl)
@Marvin open "Add examples to the docs showing how to specify various Deployment attributes in a YAML config"
m
They could add custom code in their deployment.py file, something they can't do in a YAML file. Since it's a Python file, you can do anything inside it, you don't have to only have
DeploymentSpec
inside.
I like the
set_deployment_spec()
although it seems like you then need to check that user passed "dev" in the "deployment-dev.py" and "prod" in "deployment-prod.py"?
a
it would be up to your design - it's easier to adopt the same standards if it's easy to do the right thing
m
Seems like the last piece of puzzle is to to be able to provide the image I want to use. I'm getting
httpx.HTTPStatusError: Client error '400 Bad Request'
when adding
image: "my_image:my_tag"
under the
config
key. It's interesting because I can get that same flow manuallly from the agent's environment with
import prefect
c = prefect.client.get_client()
run = await c.read_flow_run("0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc")
print(run.flow_runner.config.get("image"))
without error. Full Traceback:
23:00:21.814 | ERROR   | prefect.engine - Engine execution of flow run '0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc' exited with unexpected exception
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 985, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 130, in enter_flow_run_engine_from_subprocess
return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 95, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 190, in retrieve_flow_then_begin_flow_run
flow_run = await client.read_flow_run(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 1204, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 233, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 736, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '400 Bad Request' for url '<https://api-beta.prefect.io/api/accounts/1d7a71e3-4d77-4615-b3cf-966c2cedb752/workspaces/9d26098f-f680-43c8-b327-a34ea72f15b2/flow_runs/0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc>'
For more information check: <https://httpstatuses.com/400>
01:00:22.596 | INFO    | prefect.flow_runner.docker - Flow run container 'smooth-serval' has status 'exited'