Michal Zawadzki
06/20/2022, 10:52 AMflow_runner
in the deployment? I get `ValueError: Unregistered flow runner 'DockerFlowRunner'`when running prefect deployment create my_deployment.yaml
. My deployment looks like this:
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
to_print: "Hello from first deployment!"
tags:
- dev
flow_runner:
type: DockerFlowRunner
config:
image: viadot:orion
Unfortunately the flow_runner
config is not documented anywhere so it's hard for me to say if I'm specifying it incorrectly or it's not supported at all.Anna Geller
06/20/2022, 10:56 AMimport platform
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
...
@flow
def hello_flow():
hi = say_hi()
print_platform_info(wait_for=[hi])
DeploymentSpec(name="dev", flow=hello_flow, flow_runner=DockerFlowRunner())
if __name__ == "__main__":
hello_flow()
flow_runner: DockerFlowRunner
Michal Zawadzki
06/20/2022, 11:05 AMAnna Geller
06/20/2022, 11:08 AMfrom prefect.deployments import DeploymentSpec
from prefect.flows import Flow
from prefect.orion.schemas.schedules import SCHEDULE_TYPES
# from prefect.flow_runners import DockerFlowRunner
from typing import Any, Dict, List
from flows.async_flow import async_flow
from flows.crypto_prices_etl import crypto_prices_etl
from flows.repo_trending_check import repo_trending_check
def set_deployment_spec(
flow: Flow,
deployment_name_suffix: str = "dev",
schedule: SCHEDULE_TYPES = None,
parameters: Dict[str, Any] = None,
tags: List[str] = None,
) -> DeploymentSpec:
deploy_tags = (
[deployment_name_suffix] if tags is None else [deployment_name_suffix, *tags]
)
return DeploymentSpec(
flow=flow,
name=f"{flow.name}_{deployment_name_suffix}",
schedule=schedule,
tags=deploy_tags,
parameters=parameters,
# flow_runner=DockerFlowRunner()
)
set_deployment_spec(async_flow)
set_deployment_spec(crypto_prices_etl)
set_deployment_spec(repo_trending_check)
set_deployment_spec(
repo_trending_check,
deployment_name_suffix="orion_dev",
parameters=dict(repo="orion"),
)
set_deployment_spec(crypto_prices_etl)
Marvin
06/20/2022, 11:12 AMMichal Zawadzki
06/20/2022, 11:17 AMDeploymentSpec
inside.set_deployment_spec()
although it seems like you then need to check that user passed "dev" in the "deployment-dev.py" and "prod" in "deployment-prod.py"?Anna Geller
06/20/2022, 11:30 AMMichal Zawadzki
06/23/2022, 11:26 PMhttpx.HTTPStatusError: Client error '400 Bad Request'
when adding image: "my_image:my_tag"
under the config
key.
It's interesting because I can get that same flow manuallly from the agent's environment with
import prefect
c = prefect.client.get_client()
run = await c.read_flow_run("0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc")
print(run.flow_runner.config.get("image"))
without error.
Full Traceback:
23:00:21.814 | ERROR | prefect.engine - Engine execution of flow run '0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc' exited with unexpected exception
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 985, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 130, in enter_flow_run_engine_from_subprocess
return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 95, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 190, in retrieve_flow_then_begin_flow_run
flow_run = await client.read_flow_run(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 1204, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 233, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 736, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '400 Bad Request' for url '<https://api-beta.prefect.io/api/accounts/1d7a71e3-4d77-4615-b3cf-966c2cedb752/workspaces/9d26098f-f680-43c8-b327-a34ea72f15b2/flow_runs/0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc>'
For more information check: <https://httpstatuses.com/400>
01:00:22.596 | INFO | prefect.flow_runner.docker - Flow run container 'smooth-serval' has status 'exited'