<@ULVA73B9P> how can i configure pull_steps for de...
# marvin-in-the-wild
t
@Marvin how can i configure pull_steps for deploying a flow when using the
.deploy
method on a flow?
m
To configure
pull_steps
for deploying a flow with the
.deploy
method, you'll need to define these steps in the
prefect.yaml
file or using code. ### Example
pull_steps
Configuration Here are some typical steps for the
pull
section: - Set Working Directory:
Copy code
yaml
  pull:
    - prefect.deployments.steps.set_working_directory:
        directory: /path/to/working_directory
- Clone a Git Repository:
Copy code
yaml
  pull:
    - prefect.deployments.steps.git_clone:
        repository: <https://github.com/org/repo.git>
        credentials: "{{ prefect.blocks.github-credentials.my-credentials }}"
- Pull from Cloud Storage:
Copy code
yaml
  pull:
    - prefect_aws.deployments.steps.pull_from_s3:
        bucket: my-bucket
        folder: "{{ push-code.folder }}"
        credentials: "{{ prefect.blocks.aws-credentials.dev-credentials }}"
### Using the
.deploy
Method with
pull_steps
If you are using Prefect's
.deploy
method in code, you can define the steps programmatically:
Copy code
python
from prefect.deployments import Deployment
from prefect.deployments.steps import set_working_directory, git_clone

flow = your_flow_definition_here

deployment = Deployment.build_from_flow(
    flow=flow,
    name="example-deployment",
    pull_steps=[
        set_working_directory(directory="/path/to/working_directory"),
        git_clone(repository="<https://github.com/org/repo.git>", credentials="my-credentials-block")
    ]
)
deployment.apply()
### References For more detailed information: - Prefect Deployment Guide
The use of
Deployment.build_from_flow
is deprecated in favor of using the
flow.deploy
method. Here is how you can update your code: ### Example Using
flow.deploy
1. Define Your Flow 2. Replace
build_from_flow
with
deploy
#### Original Code
Copy code
python
from prefect.deployments import Deployment
from prefect.deployments.steps import set_working_directory, git_clone

flow = your_flow_definition_here

deployment = Deployment.build_from_flow(
    flow=flow,
    name="example-deployment",
    pull_steps=[
        set_working_directory(directory="/path/to/working_directory"),
        git_clone(repository="<https://github.com/org/repo.git>", credentials="my-credentials-block")
    ]
)
deployment.apply()
#### Updated Code
Copy code
python
from prefect import flow

@flow(log_prints=True)
def your_flow_definition_here():
    # Your flow logic
    pass

if __name__ == "__main__":
    your_flow_definition_here.deploy(
        name="example-deployment",
        work_pool_name="my-work-pool",
        pull_steps=[
            {"set_working_directory": {"directory": "/path/to/working_directory"}},
            {"git_clone": {"repository": "<https://github.com/org/repo.git>", "credentials": "my-credentials-block"}}
        ]
    )
### Key Adjustments - Work Pool Name: Replace
infrastructure
with
work_pool_name
. - From Source: Use
flow.from_source
if your flow is stored remotely. - Job Variables: Replace
infra_overrides
with
job_variables
. ### References - Prefect Upgrade Guide - Agents to Workers - Prefect Deployment Guide By following these updates, your deployment should align with the newer practices recommended by Prefect.
t
@Marvin
Deployment
has this deprecation warning:
Copy code
@deprecated_class(
    start_date="Mar 2024",
    help="Use `flow.deploy` to deploy your flows instead."
    " Refer to the upgrade guide for more information:"
    " <https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/>.",
)
It says to use the
flow.deploy
method now but that doesn't have any option for specifying
pull_steps
. What else can i do? You can specify
job_variables
but thats not where you specify
pull_steps
?
m
To configure
pull_steps
when using the
flow.deploy
method, you should define these steps in the
prefect.yaml
file, as the
flow.deploy
method itself does not directly accept
pull_steps
. ### Steps to Configure
pull_steps
1. Define the Pull Steps in `prefect.yaml`: Ensure your
prefect.yaml
includes the
pull
section with the required steps:
Copy code
yaml
pull:
  - prefect.deployments.steps.set_working_directory:
      directory: /path/to/working_directory
  - prefect.deployments.steps.git_clone:
      repository: <https://github.com/org/repo.git>
      credentials: "{{ prefect.blocks.github-credentials.my-credentials }}"
  - prefect_aws.deployments.steps.pull_from_s3:
      bucket: my-bucket
      folder: "{{ push-code.folder }}"
      credentials: "{{ prefect.blocks.aws-credentials.dev-credentials }}"
2. Use
flow.deploy
in Your Script:
Update your script to use
flow.deploy
, specifying the
work_pool_name
and other necessary parameters:
Copy code
python
from prefect import flow

@flow(log_prints=True)
def your_flow_definition_here():
    # Your flow logic
    pass

if __name__ == "__main__":
    your_flow_definition_here.deploy(
        name="example-deployment",
        work_pool_name="my-work-pool",
        # No direct pull_steps parameter, as this is handled via prefect.yaml
    )
### References For more detailed information: - Prefect Deployment Guide - Prefect Upgrade Guide - Agents to Workers By leveraging the
prefect.yaml
for
pull_steps
and using
flow.deploy
in your script, you can effectively configure your deployments as needed.
t
@Marvin If you're using the
flow.deploy
method you usually just run the python script like
python my_flow.py
, then what would be the new deployment mechanism that involves the
prefect.yaml
too?
m
To leverage the
prefect.yaml
file with the
flow.deploy
method and manage your deployments effectively, you should follow these steps: ### 1. Initialize the
prefect.yaml
File The
prefect.yaml
file describes your deployment settings, build steps, and environment preparation instructions. You can initialize this file by running:
Copy code
bash
prefect init
This command will set up the
prefect.yaml
file in your directory. ### 2. Configure the
prefect.yaml
File Define your deployment configurations within the
prefect.yaml
file. Here, you can specify
pull_steps
,
work_pool_name
, and other necessary settings. For example:
Copy code
yaml
name: example-deployment
version: 1

work_pool:
  name: my-work-pool
  work_queue_name: null

deployments:
  - name: buy-deploy
    description: "Deployment for buy flow"
    work_pool_name: my-dev-work-pool
    job_variables:
      image: "my-registry/my-image:dev"
    pull:
      - prefect.deployments.steps.set_working_directory:
          directory: /path/to/working_directory
      - prefect.deployments.steps.git_clone:
          repository: <https://github.com/org/repo.git>
          credentials: "{{ prefect.blocks.github-credentials.my-credentials }}"

  - name: sell-deploy
    description: "Deployment for sell flow"
    work_pool_name: my-dev-work-pool
    job_variables:
      image: "my-registry/my-image:dev"
### 3. Use
flow.deploy
in Your Python Script You will still use the
flow.deploy
method within your Python script, but the detailed deployment configurations will be managed via the
prefect.yaml
file. For example:
Copy code
python
from prefect import flow

@flow(log_prints=True)
def your_flow_definition_here():
    # Your flow logic
    pass

if __name__ == "__main__":
    your_flow_definition_here.deploy(
        name="example-deployment",
        work_pool_name="my-work-pool",
        # No direct pull_steps parameter, as this is handled via prefect.yaml
    )
### 4. Run Your Deployment To deploy your flow along with the configurations in the
prefect.yaml
, you simply run:
Copy code
bash
python your_flow_script.py
or, for deploying directly using the Prefect CLI:
Copy code
bash
prefect deploy --name example-deployment
### References - Prefect Deployment Guide - Prefect Upgrade Guide - Agents to Workers By organizing your deployments with the
prefect.yaml
file alongside the
flow.deploy
method, you can ensure that
pull_steps
and other configurations are correctly applied without cluttering your Python scripts.
k
flow.from_source().deploy() where the inputs to from_source are used to construct your pull step
t
Thanks Kevin 💪 If i'm using this at the moment to pull the code from github:
Copy code
Flow.from_source(  # type: ignore[reportReturnType]
        source=GitRepository(
            url=constants.GITHUB_REPO_URL,
            branch=branch,
            credentials={"access_token": Secret.load("github-read-access-token")},
        ),
        entrypoint=entrypoint,
    )
What kind of storage object should i use to pull from github with custom pull steps?
k
hm I'm not sure exactly what you mean by custom pull steps?
t
I want to be able to pull my code from storage, then run a bash script / bash command before the flow is imported etc
k
ahh then you probably want to customize the
command
job variable on the deployment
if you've never done that before, leaving it empty means the command is
prefect flow-run execute
so whatever you customize it to, it should end with
&& prefect flow-run execute
t
Thanks! So to the job_variables dict i would just add this key and value?
Copy code
{
  "command": "<custom_command> && prefect flow-run execute"
}
k
yep!
gratitude thank you 1
t
I found my kubernetes pod would never start when i use a custom command but i've realised that what i really need is a step that i can run immediately after the clone step
Copy code
INFO    | prefect.deployment - Cloned repository ... into ...
Do you have any suggestions on how i could do that using prefect build from source and the .deploy approach?
k
you're depending on a file being present from the cloned repo for that step?
t
pretty much yea 👍
k
you could do an on_running hook that runs a function before your flow code executes
t
its re-installing my deps again because they're packages in my monorepo and we use editable installs for development. Im trying to avoid re-building and pushing an 8gb image everytime someone makes a change to an internal package
k
hm our other deployment method, the yaml file one, lets you define arbitrary pull steps
t
thanks i'll check out the on_running hook 👌 I'm specifically looking to slot this in between the clone step and when prefect uses importlib to load all of the code
k
I do think the command one should work, I've seen it done to install packages before
t
Yea sorry to be a pain 😬 we've doubled down on the deploy approach and i've built some thin wrappers around them to manage how my teams deploy flows. I think the command one might not work still as the install i do basically links to the latest code pulled so it would need to run after the clone step 🤔
Is there anyway to combine the prefect.yaml deployment and .deploy approaches? e.g. just use the prefect.yaml for defining the extra pull steps?
k
I don't think so
what if the command had an env var in it, and that env var was an authed url to your private repo?
🤔 1
t
Thanks i'll have a look at that too and see if i can figure out why my container immediately crashes when i use a custom command 👍
I ended up creating a custom deploy function which did the job:
Copy code
async def deploy_with_custom_post_clone_pull_steps(
    flow: Flow,
    name: str,
    image: str,
    work_pool: infra.PrefectWorkpool = infra.PrefectWorkpool.CPU,
    description: str | None = None,
    paused: bool | None = None,
    job_variables: dict = {},
    post_clone_pull_steps: list | None = None,
    **deploy_kwargs,
) -> UUID:

    if post_clone_pull_steps is None:
        post_clone_pull_steps = []

    work_pool_name = work_pool.value

    runner_deployment = await flow.to_deployment(
        name=name,
        description=description,
        work_pool_name=work_pool_name,
        paused=paused,
        job_variables=job_variables,
        **deploy_kwargs,
    )

    async with get_client() as client:
        assert runner_deployment.flow_name is not None
        flow_id = await client.create_flow_from_name(runner_deployment.flow_name)

        create_payload = dict(
            flow_id=flow_id,
            name=runner_deployment.name,
            work_queue_name=runner_deployment.work_queue_name,
            work_pool_name=work_pool_name,
            version=runner_deployment.version,
            paused=runner_deployment.paused,
            schedules=runner_deployment.schedules,
            parameters=runner_deployment.parameters,
            description=runner_deployment.description,
            tags=runner_deployment.tags,
            path=runner_deployment._path,
            entrypoint=runner_deployment.entrypoint,
            storage_document_id=None,
            infrastructure_document_id=None,
            parameter_openapi_schema=runner_deployment._parameter_openapi_schema.dict(),
            enforce_parameter_schema=runner_deployment.enforce_parameter_schema,
        )

        if work_pool_name:
            create_payload["job_variables"] = runner_deployment.job_variables
            if image:
                create_payload["job_variables"]["image"] = image
            create_payload["path"] = (
                None if runner_deployment.storage else runner_deployment._path
            )
            create_payload["pull_steps"] = (
                [
                    runner_deployment.storage.to_pull_step(),
                    *post_clone_pull_steps,
                ]
                if runner_deployment.storage
                else []
            )

        try:
            deployment_id = await client.create_deployment(**create_payload)  # type: ignore
        except Exception as exc:
            raise DeploymentApplyError(
                f"Error while applying deployment: {str(exc)}"
            ) from exc

        console = Console()
        if PREFECT_UI_URL:
            message = (
                "\nYou can also run your flow via the Prefect UI:"
                f" [blue]{PREFECT_UI_URL.value()}/deployments/deployment/{deployment_id}[/]\n"
            )
            console.print(message, soft_wrap=True)

        return deployment_id
Is there anywhere it would be possible for me to put in a feature request to allow passing
pull_steps
to the
Flow.deploy
or
deploy
methods?