Michal Zawadzki
06/20/2022, 10:52 AMflow_runner
in the deployment? I get `ValueError: Unregistered flow runner 'DockerFlowRunner'`when running prefect deployment create my_deployment.yaml
. My deployment looks like this:
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
to_print: "Hello from first deployment!"
tags:
- dev
flow_runner:
type: DockerFlowRunner
config:
image: viadot:orion
Unfortunately the flow_runner
config is not documented anywhere so it's hard for me to say if I'm specifying it incorrectly or it's not supported at all.Anna Geller
import platform
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
...
@flow
def hello_flow():
hi = say_hi()
print_platform_info(wait_for=[hi])
DeploymentSpec(name="dev", flow=hello_flow, flow_runner=DockerFlowRunner())
if __name__ == "__main__":
hello_flow()
flow_runner: DockerFlowRunner
Michal Zawadzki
06/20/2022, 11:05 AMAnna Geller
from prefect.deployments import DeploymentSpec
from prefect.flows import Flow
from prefect.orion.schemas.schedules import SCHEDULE_TYPES
# from prefect.flow_runners import DockerFlowRunner
from typing import Any, Dict, List
from flows.async_flow import async_flow
from flows.crypto_prices_etl import crypto_prices_etl
from flows.repo_trending_check import repo_trending_check
def set_deployment_spec(
flow: Flow,
deployment_name_suffix: str = "dev",
schedule: SCHEDULE_TYPES = None,
parameters: Dict[str, Any] = None,
tags: List[str] = None,
) -> DeploymentSpec:
deploy_tags = (
[deployment_name_suffix] if tags is None else [deployment_name_suffix, *tags]
)
return DeploymentSpec(
flow=flow,
name=f"{flow.name}_{deployment_name_suffix}",
schedule=schedule,
tags=deploy_tags,
parameters=parameters,
# flow_runner=DockerFlowRunner()
)
set_deployment_spec(async_flow)
set_deployment_spec(crypto_prices_etl)
set_deployment_spec(repo_trending_check)
set_deployment_spec(
repo_trending_check,
deployment_name_suffix="orion_dev",
parameters=dict(repo="orion"),
)
set_deployment_spec(crypto_prices_etl)
Marvin
06/20/2022, 11:12 AMMichal Zawadzki
06/20/2022, 11:17 AMDeploymentSpec
inside.set_deployment_spec()
although it seems like you then need to check that user passed "dev" in the "deployment-dev.py" and "prod" in "deployment-prod.py"?Anna Geller
Michal Zawadzki
06/23/2022, 11:26 PMhttpx.HTTPStatusError: Client error '400 Bad Request'
when adding image: "my_image:my_tag"
under the config
key.
It's interesting because I can get that same flow manuallly from the agent's environment with
import prefect
c = prefect.client.get_client()
run = await c.read_flow_run("0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc")
print(run.flow_runner.config.get("image"))
without error.
Full Traceback:
23:00:21.814 | ERROR | prefect.engine - Engine execution of flow run '0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc' exited with unexpected exception
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 985, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 130, in enter_flow_run_engine_from_subprocess
return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 95, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 190, in retrieve_flow_then_begin_flow_run
flow_run = await client.read_flow_run(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 1204, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 233, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 736, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '400 Bad Request' for url '<https://api-beta.prefect.io/api/accounts/1d7a71e3-4d77-4615-b3cf-966c2cedb752/workspaces/9d26098f-f680-43c8-b327-a34ea72f15b2/flow_runs/0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc>'
For more information check: <https://httpstatuses.com/400>
01:00:22.596 | INFO | prefect.flow_runner.docker - Flow run container 'smooth-serval' has status 'exited'