https://prefect.io logo
#prefect-community
Title
# prefect-community
m

Michal Zawadzki

06/20/2022, 10:39 AM
Prefect 2.0: Hi, I can't seem to be able to run a deployed flow with tags -- it's not being picked up by the work queue/agent that's supposed to be handling that tag. 1. I created a work queue named
dev_queue
with the label
dev
and supporting all flow runners. It's the only work queue I have set up. 2. I ran an agent with
prefect agent start dev_queue
3. I created a deployment with the tag `dev`:
Copy code
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
  to_print: "Hello from first deployment!"
tags:
  - dev
I verified in the UI that it has the right tag and flow runner. However, when I run the deployment, the flow run is never picked up. One suspicious thing I noticed is that the flow run doesn't inherit the
dev
tag from the deployment (although I don't know if it's a bug or a feature). When I remove all labels from the work queue, the flow runs are picked up correctly.
1
a

Anna Geller

06/20/2022, 11:20 AM
I wasn't able to reproduce using Python, let me try YAML :)
Copy code
import platform
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import SubprocessFlowRunner


@task
def say_hi():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello world!")


@task
def print_platform_info():
    logger = get_run_logger()
    <http://logger.info|logger.info>(
        "Platform information: IP = %s, Python = %s, Platform type = %s, System Version = %s",
        platform.node(),
        platform.python_version(),
        platform.platform(),
        platform.version(),
    )


@flow
def hello_flow():
    hi = say_hi()
    print_platform_info(wait_for=[hi])


DeploymentSpec(
    name="dev", flow=hello_flow, tags=["local"], flow_runner=SubprocessFlowRunner()
)


if __name__ == "__main__":
    hello_flow()
Copy code
prefect work-queue create dev_queue -t dev
YAML:
Copy code
name: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
tags:
  - dev
flow:
Copy code
import platform
from prefect import task, flow
from prefect import get_run_logger

@task
def say_hi():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Hello world!")


@task
def print_platform_info():
    logger = get_run_logger()
    <http://logger.info|logger.info>(
        "Platform information: IP = %s, Python = %s, Platform type = %s, System Version = %s",
        platform.node(),
        platform.python_version(),
        platform.platform(),
        platform.version(),
    )


@flow
def hello():
    hi = say_hi()
    print_platform_info(wait_for=[hi])


if __name__ == "__main__":
    hello()
this worked - let me try with DockerflowRunner then
you may specify your flow_location as an absolute path
m

Michal Zawadzki

06/20/2022, 11:35 AM
yes the YAML without runner worked
👍 1
also the Python deployment worked
👍 1
a

Anna Geller

06/20/2022, 11:38 AM
ok, so this works:
Copy code
name: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
flow_runner:
  type: "docker"
tags:
  - dev
😮 1
the only problem is that it requires remote storage e.g. S3, GCS, ABS so you need to also provide credentials
m

Michal Zawadzki

06/20/2022, 11:39 AM
I configured a default storage so it should be good
a

Anna Geller

06/20/2022, 11:43 AM
this works 🎉
Copy code
name: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
flow_runner:
  type: "docker"
  config:
    volumes:
      - "/Users/anna/.aws:/root/.aws"
tags:
  - dev
I configured a default storage so it should be good
you configured S3 as default storage? in that case, your flow run container needs credentials and attaching .aws as volume makes those available to the container
m

Michal Zawadzki

06/20/2022, 11:46 AM
Ah right, yes, it's Azure in my case but I see what you mean. Eventually I'd use Git storage for the flow code so it will be different anyway.
👍 1
for Azure I think I'll need to pass some env vars
a

Anna Geller

06/20/2022, 11:49 AM
nice, just for full transparency: we don't have Git storage just yet, but it's on the roadmap
👍 1
let me know if the issue persists - using the config as a dictionary + type as a string "docker" seems to work here
m

Michal Zawadzki

06/20/2022, 11:57 AM
Sure. Thanks a lot 🙂 Also the Docker container requiring storage credentials note was great, I'd have to figure that one out right after getting this to work if you didn't mention it haha
🙌 1
It seems Prefect doesn't parse env vars in the YAML so I'm actually not sure how to allow it to connect to ADLS 😢 Can I somehow pass env vars to the agent as in Prefect 1.0?
a

Anna Geller

06/20/2022, 2:20 PM
Can you share your Azure YAML deployment configuration? I can try to replicate in my Azure Blob
m

Michal Zawadzki

06/20/2022, 2:22 PM
For now I'm actually getting
httpx.HTTPStatusError: Client error '400 Bad Request' for url '<https://api-beta.prefect.io/api/accounts/1d7a71e3-4d77-4615-b3cf-966c2cedb752/workspaces/9d26098f-f680-43c8-b327-a34ea72f15b2/flow_runs/b7d2cfdc-53b2-4794-8095-27cea72fe18c>'
but I noticed in
prefect deployment inspect
that the env is not expanded so I quickly checked the
deployments.py
code and it seems env vars are not handled there
a

Anna Geller

06/20/2022, 2:55 PM
this has worked fine for me with Azure:
Copy code
name: dev
flow_location: /Users/anna/repos/gha/community.py
flow_name: hello
flow_runner:
  type: "docker"
  config:
    env:
      AZURE_STORAGE_CONNECTION_STRING: xxx
tags:
  - dev
👍 1
I think you were looking at DockerFlowRunner and env is defined in Universal- which the Docker- FR inherits from
btw @Michal Zawadzki the right way to create a work queue with DockerFlowRunner type is:
Copy code
prefect work-queue create queuename -fr docker
rather than
Copy code
prefect work-queue create queuename -fr DockerFlowRunner
2 Views