https://prefect.io logo
#prefect-community
Title
# prefect-community
m

merlin

11/03/2022, 4:06 AM
Deployment Environment Variables There's something confusing about executing flows from the command line vs. deployed flow runs in a local execution environment. Deployed flow runs are writing to files in system temp directory, rather than to the local file system relative to the flow invocation. • Running prefect orion and flow code from a poetry managed environment, locally • CLI invocation of flow code runs in local $PWD, ie the local python project folder • Deployment runs flow code in a system temp folder, like
tmp_path
from
pytest
The confusing part was the flow code was found via relative path, but when writing the output to text file on a relative path the output files landed at an ephemeral system temporary folder. Naturally this wouldn't be an issue writing to s3, but for development I like to work in a local environment and write files relative to my project root. I don't think this is caused by the poetry environment, because the agent is running in the same environment but the distinction in behavior is between deployed flows and flows called from the shell:
Copy code
# flow.py
import
@task 
def write_output(dataframe, filepath):
    dataframe.to_csv(filepath)
    logger.debug(f"current working directory: {Path().cwd()}")

@flow
def write_dataframe():
    write_output(df, 'data/csv/datafile.csv')

# shell invocation
poetry run python src/flow.py
    # prefect logs:
    | DEBUG   | Task run...  - current working directory: /Users/merlin/prefect-repo

# deployed flow invocation
poetry shell
prefect deployment run write_dataframe/depl-write
    # prefect logs:
    | DEBUG   | Task run...  - current working directory: /private/var/folders/gh/...4r3zq.../T/tmpXookj0s08prefect
My goal here is for my task to write to the same local file path where my project folder is stored. Is there a standard way to specify the working directory for deployed flow code on a local environment? Maybe this is managed by the config defaults (I couldn't find a likely ENV variable). My next attempt will be to specify an absolute path for the write location, but this is not ideal.
Here's my solution:
Copy code
# build-file.yaml
infra_overrides:
  env.PREFECT_LOCAL_PROJECT_ROOT: /Users/merlinr/zillpo/zhl-wbr
Access that environment variable and prepend to the relative paths I'm using to write output files.
Copy code
local_filepath = Path(os.environ.get('PREFECT_LOCAL_PROJECT_ROOT')).joinpath(filepath)
local_filepath.parent.mkdir(parents=True, exist_ok=True)
data.to_csv(local_filepath)
This results in deployment flows writing to the expected locations.
c

Christopher Boyd

11/03/2022, 2:20 PM
Hi Merlin, This should just be a local filesystem storage block
🙏 1
m

merlin

11/03/2022, 5:51 PM
Thanks a lot, I hadn't gotten into this part yet. Glad to see that just while testing Prefect out and learning the components one by one, it is still a useful tool in my local environment as I orchestrate the work for my current project. Not sure if this was planned, but the learning ramp is a nice gradual slope up and Prefect is useful immediately after the first 'getting tutorial'.
🙌 1
j

Jarvis Stubblefield

11/06/2022, 9:25 PM
I know I’m late to the party, but you could also specify the working directory for the
Process
if you specified it in your deployment.
m

merlin

11/06/2022, 9:47 PM
thanks! so process has a parameter for working dorectory, does it limit permissions? In this scenario i would not need to specify a filesystem block?
(im a very noob python programmer, so im learning less from reading the api than i should)
j

Jarvis Stubblefield

11/08/2022, 2:54 AM
When the process is started and it would have whatever permissions are available to the process. It would ensure the process was within the directory you expected it to be, instead of a temporary directory. Don’t think you would need the block, but testing is the sure fire way to find out
m

merlin

11/09/2022, 6:51 PM
I cant find an example of how to use either process or filesystem block to set a path for writing files, can you share a snippet?
j

Jarvis Stubblefield

11/10/2022, 7:54 AM
I don’t have an example I have run across for the
FileSystemBlock
… I do have an example for the
Process
… I do it upon Python API deployment. Example below… I’ve not worked out the full details yet of getting my environment setup through the Process run, but I am working to do so.
Copy code
# -*- coding: utf-8
from __future__ import unicode_literals

import os
import subprocess
from pathlib import Path
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect.utilities.filesystem import tmpchdir
from prefect.orion.schemas.schedules import RRuleSchedule


from ..project_setup import prepare_project_for_flows
prepare_project_for_flows()


from ppower.base.flows.log_entry import logs_for_month, get_org_name_snakecase
from ppower.base.models import Organization


ORG_IDS = [
    "55",
    "111",
    "92",
]
RRULE_MONTHLY_ON_10TH = "FREQ=MONTHLY;INTERVAL=1;BYMONTHDAY=10"


for org_id in ORG_IDS:
    org = Organization.objects.get(pk=org_id)
    org_name = get_org_name_snakecase.fn(org)

    # For this to work by picking up environment variables, deployments would need to be executed from production.
    # Otherwise, we will need a mechanism to indicate an "environment" path.
    command = ["pipenv", "run", "python", "-m", "prefect.engine"]
    process = Process(command=command, working_dir=Path(os.environ.get("SRCDIR")).absolute())

    # venv_path = str(subprocess.run(["pipenv", "--venv"], capture_output=True).stdout).strip()
    # venv_py_exec = Path(venv_path) + "bin/python"
    #
    # command = [venv_py_exec, "-m", "prefect.engine"]
    #
    # process = Process(command=command)

    deployment = Deployment.build_from_flow(
        flow=logs_for_month,
        name=f"logs_for_month_{org_name}",
        description=f"Create CSV of all Log Entries for {org.name}.",
        tags=["logs_for_month", f"{org_name}"],
        schedule=RRuleSchedule(rrule=RRULE_MONTHLY_ON_10TH),
        work_queue_name="tenzinga_django_dev",
        parameters={"org_id": org_id},
        infrastructure=process,
    )
    deployment.apply()
Ensure that you have set the environment variable (
SRCDIR
in the above example) before you run this deployment script. If the agent will pick it up on another server, ensure that path works on the server it’s actually going to run on. I hope that makes sense… It doesn’t feel like the best way to manage it at this point. This also hasn’t yet stopped me from needing to set the
sys.path
to have my Django project imports working properly.
I’m excited to hear how your exploits go with
working_dir
🙂
I was reading more on #prefect-community and noticed @Dan Wise mentioned using the
working_dir
method on a Process. He may have a good example of how he’s using it.
m

merlin

11/15/2022, 7:38 AM
Turns out its really important to specify a local storage basepath if you are using local, otherwise your git branches become meaningless. I'm failing to build a deployment with local file system block. First I'm running this python to save the block to my oriondb instance:
Copy code
from pathlib import Path
from prefect.filesystems import LocalFileSystem

fs_block = LocalFileSystem(
    basepath=Path.home().joinpath('.prefect/my-local-storage')
    )
fs_block.save(name="my-local", overwrite=True)
The block is now visible in the UI. It helpfully reminds me to paste into my flow:
local_file_system_block = LocalFileSystem.load("my-local")
. But what do I do with this local_file_system_block variable? Anyway continuing... Then I have a flow, deployed successfully previously, without an explicit local filesystem block. When I build a new deployment:
Copy code
prefect deployment build src/trino_flows.py:bulk_extracts \
    --name DAILY-BULK-EXTRACTS \
    --work-queue prefect_all \
    --storage-block 'local-file-system/my-local'
I get a Traceback, the last few lines here:
Copy code
, line 728, in build_from_flow
    await deployment.upload_to_storage(ignore_file=ignore_file)
  File "/Users/merlinr/Library/Caches/pypoetry/virtualenvs/zhl-wbr-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/deployments.py", line 576, in upload_to_storage
    file_count = await self.storage.put_directory(
  File "/Users/merlinr/Library/Caches/pypoetry/virtualenvs/zhl-wbr-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/filesystems.py", line 185, in put_directory
    shutil.copytree(
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/shutil.py", line 558, in copytree
    return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/shutil.py", line 457, in _copytree
    os.makedirs(dst, exist_ok=dirs_exist_ok)
  File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/os.py", line 225, in makedirs
    mkdir(name, mode)
FileNotFoundError: [Errno 2] No such file or directory: ''
z

Zanie

11/15/2022, 3:45 PM
The “empty path” error you’re getting there is being addressed in https://github.com/PrefectHQ/prefect/pull/7477
And I believe there’s another PR coming that resolves other issues here, cc @Peyton Runyan
🙏 2
👍 1
p

Peyton Runyan

11/15/2022, 3:52 PM
Yup - PR should be up today!
5 Views