merlin
11/03/2022, 4:06 AMtmp_path
from pytest
The confusing part was the flow code was found via relative path, but when writing the output to text file on a relative path the output files landed at an ephemeral system temporary folder. Naturally this wouldn't be an issue writing to s3, but for development I like to work in a local environment and write files relative to my project root.
I don't think this is caused by the poetry environment, because the agent is running in the same environment but the distinction in behavior is between deployed flows and flows called from the shell:
# flow.py
import
@task
def write_output(dataframe, filepath):
dataframe.to_csv(filepath)
logger.debug(f"current working directory: {Path().cwd()}")
@flow
def write_dataframe():
write_output(df, 'data/csv/datafile.csv')
# shell invocation
poetry run python src/flow.py
# prefect logs:
| DEBUG | Task run... - current working directory: /Users/merlin/prefect-repo
# deployed flow invocation
poetry shell
prefect deployment run write_dataframe/depl-write
# prefect logs:
| DEBUG | Task run... - current working directory: /private/var/folders/gh/...4r3zq.../T/tmpXookj0s08prefect
My goal here is for my task to write to the same local file path where my project folder is stored.
Is there a standard way to specify the working directory for deployed flow code on a local environment? Maybe this is managed by the config defaults (I couldn't find a likely ENV variable). My next attempt will be to specify an absolute path for the write location, but this is not ideal.# build-file.yaml
infra_overrides:
env.PREFECT_LOCAL_PROJECT_ROOT: /Users/merlinr/zillpo/zhl-wbr
Access that environment variable and prepend to the relative paths I'm using to write output files.
local_filepath = Path(os.environ.get('PREFECT_LOCAL_PROJECT_ROOT')).joinpath(filepath)
local_filepath.parent.mkdir(parents=True, exist_ok=True)
data.to_csv(local_filepath)
This results in deployment flows writing to the expected locations.Christopher Boyd
11/03/2022, 2:20 PMmerlin
11/03/2022, 5:51 PMJarvis Stubblefield
11/06/2022, 9:25 PMProcess
if you specified it in your deployment.merlin
11/06/2022, 9:47 PMJarvis Stubblefield
11/08/2022, 2:54 AMmerlin
11/09/2022, 6:51 PMJarvis Stubblefield
11/10/2022, 7:54 AMFileSystemBlock
… I do have an example for the Process
… I do it upon Python API deployment. Example below… I’ve not worked out the full details yet of getting my environment setup through the Process run, but I am working to do so.
# -*- coding: utf-8
from __future__ import unicode_literals
import os
import subprocess
from pathlib import Path
from prefect.deployments import Deployment
from prefect.infrastructure import Process
from prefect.utilities.filesystem import tmpchdir
from prefect.orion.schemas.schedules import RRuleSchedule
from ..project_setup import prepare_project_for_flows
prepare_project_for_flows()
from ppower.base.flows.log_entry import logs_for_month, get_org_name_snakecase
from ppower.base.models import Organization
ORG_IDS = [
"55",
"111",
"92",
]
RRULE_MONTHLY_ON_10TH = "FREQ=MONTHLY;INTERVAL=1;BYMONTHDAY=10"
for org_id in ORG_IDS:
org = Organization.objects.get(pk=org_id)
org_name = get_org_name_snakecase.fn(org)
# For this to work by picking up environment variables, deployments would need to be executed from production.
# Otherwise, we will need a mechanism to indicate an "environment" path.
command = ["pipenv", "run", "python", "-m", "prefect.engine"]
process = Process(command=command, working_dir=Path(os.environ.get("SRCDIR")).absolute())
# venv_path = str(subprocess.run(["pipenv", "--venv"], capture_output=True).stdout).strip()
# venv_py_exec = Path(venv_path) + "bin/python"
#
# command = [venv_py_exec, "-m", "prefect.engine"]
#
# process = Process(command=command)
deployment = Deployment.build_from_flow(
flow=logs_for_month,
name=f"logs_for_month_{org_name}",
description=f"Create CSV of all Log Entries for {org.name}.",
tags=["logs_for_month", f"{org_name}"],
schedule=RRuleSchedule(rrule=RRULE_MONTHLY_ON_10TH),
work_queue_name="tenzinga_django_dev",
parameters={"org_id": org_id},
infrastructure=process,
)
deployment.apply()
Ensure that you have set the environment variable (SRCDIR
in the above example) before you run this deployment script. If the agent will pick it up on another server, ensure that path works on the server it’s actually going to run on. I hope that makes sense… It doesn’t feel like the best way to manage it at this point. This also hasn’t yet stopped me from needing to set the sys.path
to have my Django project imports working properly.working_dir
… 🙂working_dir
method on a Process. He may have a good example of how he’s using it.merlin
11/15/2022, 7:38 AMfrom pathlib import Path
from prefect.filesystems import LocalFileSystem
fs_block = LocalFileSystem(
basepath=Path.home().joinpath('.prefect/my-local-storage')
)
fs_block.save(name="my-local", overwrite=True)
The block is now visible in the UI. It helpfully reminds me to paste into my flow:
local_file_system_block = LocalFileSystem.load("my-local")
. But what do I do with this local_file_system_block variable? Anyway continuing...
Then I have a flow, deployed successfully previously, without an explicit local filesystem block. When I build a new deployment:
prefect deployment build src/trino_flows.py:bulk_extracts \
--name DAILY-BULK-EXTRACTS \
--work-queue prefect_all \
--storage-block 'local-file-system/my-local'
I get a Traceback, the last few lines here:
, line 728, in build_from_flow
await deployment.upload_to_storage(ignore_file=ignore_file)
File "/Users/merlinr/Library/Caches/pypoetry/virtualenvs/zhl-wbr-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/deployments.py", line 576, in upload_to_storage
file_count = await self.storage.put_directory(
File "/Users/merlinr/Library/Caches/pypoetry/virtualenvs/zhl-wbr-4hu7XBP5-py3.10/lib/python3.10/site-packages/prefect/filesystems.py", line 185, in put_directory
shutil.copytree(
File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/shutil.py", line 558, in copytree
return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/shutil.py", line 457, in _copytree
os.makedirs(dst, exist_ok=dirs_exist_ok)
File "/opt/homebrew/Cellar/python@3.10/3.10.8/Frameworks/Python.framework/Versions/3.10/lib/python3.10/os.py", line 225, in makedirs
mkdir(name, mode)
FileNotFoundError: [Errno 2] No such file or directory: ''
Zanie
11/15/2022, 3:45 PMPeyton Runyan
11/15/2022, 3:52 PM