Michal Zawadzki
06/20/2022, 10:39 AMdev_queue
with the label dev
and supporting all flow runners. It's the only work queue I have set up.
2. I ran an agent with prefect agent start dev_queue
3. I created a deployment with the tag `dev`:
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
to_print: "Hello from first deployment!"
tags:
- dev
I verified in the UI that it has the right tag and flow runner.
However, when I run the deployment, the flow run is never picked up.
One suspicious thing I noticed is that the flow run doesn't inherit the dev
tag from the deployment (although I don't know if it's a bug or a feature).
When I remove all labels from the work queue, the flow runs are picked up correctly.Anna Geller
06/20/2022, 11:20 AMimport platform
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner
@task
def say_hi():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello world!")
@task
def print_platform_info():
logger = get_run_logger()
<http://logger.info|logger.info>(
"Platform information: IP = %s, Python = %s, Platform type = %s, System Version = %s",
platform.node(),
platform.python_version(),
platform.platform(),
platform.version(),
)
@flow
def hello_flow():
hi = say_hi()
print_platform_info(wait_for=[hi])
DeploymentSpec(
name="dev", flow=hello_flow, tags=["local"], flow_runner=SubprocessFlowRunner()
)
if __name__ == "__main__":
hello_flow()
prefect work-queue create dev_queue -t dev
YAML:
name: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
tags:
- dev
flow:
import platform
from prefect import task, flow
from prefect import get_run_logger
@task
def say_hi():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello world!")
@task
def print_platform_info():
logger = get_run_logger()
<http://logger.info|logger.info>(
"Platform information: IP = %s, Python = %s, Platform type = %s, System Version = %s",
platform.node(),
platform.python_version(),
platform.platform(),
platform.version(),
)
@flow
def hello():
hi = say_hi()
print_platform_info(wait_for=[hi])
if __name__ == "__main__":
hello()
Michal Zawadzki
06/20/2022, 11:35 AMAnna Geller
06/20/2022, 11:38 AMname: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
flow_runner:
type: "docker"
tags:
- dev
Michal Zawadzki
06/20/2022, 11:39 AMAnna Geller
06/20/2022, 11:43 AMname: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
flow_runner:
type: "docker"
config:
volumes:
- "/Users/anna/.aws:/root/.aws"
tags:
- dev
I configured a default storage so it should be goodyou configured S3 as default storage? in that case, your flow run container needs credentials and attaching .aws as volume makes those available to the container
Michal Zawadzki
06/20/2022, 11:46 AMAnna Geller
06/20/2022, 11:49 AMMichal Zawadzki
06/20/2022, 11:57 AMAnna Geller
06/20/2022, 2:20 PMMichal Zawadzki
06/20/2022, 2:22 PMhttpx.HTTPStatusError: Client error '400 Bad Request' for url '<https://api-beta.prefect.io/api/accounts/1d7a71e3-4d77-4615-b3cf-966c2cedb752/workspaces/9d26098f-f680-43c8-b327-a34ea72f15b2/flow_runs/b7d2cfdc-53b2-4794-8095-27cea72fe18c>'
prefect deployment inspect
that the env is not expanded so I quickly checked the deployments.py
code and it seems env vars are not handled thereAnna Geller
06/20/2022, 2:55 PMname: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
flow_runner:
type: "docker"
config:
env:
AZURE_STORAGE_CONNECTION_STRING: xxx
tags:
- dev
prefect work-queue create queuename -fr docker
rather than
prefect work-queue create queuename -fr DockerFlowRunner