Matt Delacour
02/14/2023, 3:07 PMmy_project.git
/src/
/src/dbt/
/src/dbt/Earthfile # Equivalent to Dockerfile
/src/dbt/my_prefect_job.py
/src/dbt/...
And my Earthfile is like
FROM prefecthq/prefect:2.7-python3.10
WORKDIR /code
...
COPY . .
And so my_prefect_job.py
is under /code/my_prefect_job.py
in my docker image and under /src/dbt/
in my gitlab repository
But once deployed, the run output the following
Downloading flow code from storage at 'src/dbt'
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 879, in exec_module
File "<frozen importlib._bootstrap_external>", line 1016, in get_code
File "<frozen importlib._bootstrap_external>", line 1073, in get_data
FileNotFoundError: [Errno 2] No such file or directory: '/code/prefect_job.py'
And so I don't understand why it cannot find my flow in gitlab under the right subdirectory nor the flow in the docker contain under the right path 🤷
Did not find any good example about github/gitlab subdir storage strategy.
Thanks in advance 🙏Leon Kozlowski
02/14/2023, 4:12 PMKuberenetsJob
blocks to run flows on k8s - is there a benefit to saving rather than having what looks to be a temporary block called anonymous-<UUID>
?Siva Balusu
02/14/2023, 4:27 PMFlow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 879, in exec_module
File "<frozen importlib._bootstrap_external>", line 1016, in get_code
File "<frozen importlib._bootstrap_external>", line 1073, in get_data
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmppy44ikxrprefect/sample_flow.py'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 266, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 186, in load_flow_from_flow_run
flow = await run_sync_in_worker_thread(load_flow_from_entrypoint, str(import_path))
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.10/site-packages/prefect/flows.py", line 772, in load_flow_from_entrypoint
flow = import_object(entrypoint)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 195, in import_object
module = load_script_as_module(script_path)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 158, in load_script_as_module
raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'sample_flow.py' encountered an exception: FileNotFoundError(2, 'No such file or directory')
Here is the deployment file:
from sample_flow import start_sample_flow
from prefect.deployments import Deployment
from prefect.filesystems import GitHub
deployment = Deployment.build_from_flow(
flow=start_sample_flow,
name="sample-pipeline",
version="2",
tags=["VA_PIPELINE"],
storage=GitHub.load("dev"),
skip_upload=True,
infra_overrides={"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"}},
path="cloud/prefectflows"
)
if __name__ == "__main__":
deployment.apply()
any help here is much appreciatedBrandon Reid
02/14/2023, 4:40 PMLeon Kozlowski
02/14/2023, 6:28 PMMappingLengthMismatch
with the following traceback
Parameters for map must all be the same length. Got lengths: {'args': 0, 'kwargs': 2}
However, all of the parameters for the task do have the same lengthConstantino Schillebeeckx
02/14/2023, 7:55 PMautomation
Matthew Scanlon
02/14/2023, 8:09 PMMeng Si
02/14/2023, 8:57 PMnick vazquez
02/14/2023, 10:42 PMRonil
02/14/2023, 10:56 PMChandan Purbia
02/15/2023, 5:04 AMSamuel Bunce
02/15/2023, 8:40 AMAgent stopped!
- is there any reason why agents would just be stopping? I thought they were meant to run indefinitely. I do not know where to access any logging to see why the agent was stopped.Tolga Karahan
02/15/2023, 10:05 AMNic
02/15/2023, 10:59 AMMark NS
02/15/2023, 12:56 PMSean Davis
02/15/2023, 3:07 PMTOMAS IGNACIO ACUÑA RUZ
02/15/2023, 4:53 PMTOMAS IGNACIO ACUÑA RUZ
02/15/2023, 4:54 PMTomás Emilio Silva Ebensperger
02/15/2023, 6:51 PMDhruv
02/15/2023, 10:50 PMFederico Zambelli
02/15/2023, 11:03 PMtarget
functionality that seemed to fit my case, but it doesn't exist in prefect 2.0. I tried in so many different ways but I can't figure out how. Reason I'm saving locally as an intermediate step is because I run OOM otherwise.
My code (simplified) goes like this:
@task
def download(url, filename):
file_path = f'/absolute/path/to/{filename}.csv'
with open(file_path, 'wb') as file:
res = requests.get(url+filename)
file.write(res.content)
return file_path
@task
def write_to_bq(file_path):
df = pd.read_csv(file_path)
df.to_gbq(...)
@flow
def download_all(url_list):
paths = []
for url in url_list:
file_path = download(url)
paths.append(file_path)
return paths
@flow
def upload_all(paths):
for path in paths:
write_to_bq(path)
@flow
def main(url_list):
paths = download_all(url_list)
upload_all(paths)
Tolga Karahan
02/16/2023, 5:55 AMTim Galvin
02/16/2023, 6:30 AMTask run '511abff8-faa3-4efc-94e6-f4be435db16e' received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state
As far as I can tell it is inconsistent (work flow sometimes works, sometimes does not).
I am running a DaskTaskRunner
back by a SLURMCluster
. The stage that is crashing is trying to read in a set of large-ish files, and I believe the GIL is not being released as the data is being accessed. At the moment there is a high load on the disk I/O, and my best guess is that the dask nanny is somehow failing a health check, and prefect in turn is causing some round-about error like this.
Any ideas that make more sense?Mahesh Kumar
02/16/2023, 7:06 AMCan't connect to Orion API at https://<url>/api. Check that it's accessible from your machine.
Tolga Karahan
02/16/2023, 7:27 AMGhassan Hallaq
02/16/2023, 9:13 AMJustin Trautmann
02/16/2023, 9:46 AMPreventRedundantTransitions
orchestration policy prevents a repeated task execution as the task remains in status Running
when the instance is terminated and a new replacement instance will try to start the task from scratch, including proposing the state Running
again which leads to:
Task run <task_run_id> received abort during orchestration: This run cannot transition to the RUNNING state from the RUNNING state. Task run is in RUNNING state
I feel like this policy has a legitimate reason of preventing multiple agents from attempting to orchestrate the same run but also catches the case where the same agent tries to execute the same task multiple times which should be allowed imo.
Anyone having experience with Prefect + Ray on AWS spot and ideas on how to handle this case?
PS: @Tim Galvin not sure about Dask and SLURM but maybe your issue is somewhat relatedKolapo Obajuluwa
02/16/2023, 5:32 PMKolapo Obajuluwa
02/16/2023, 5:34 PMChris Reuter
02/16/2023, 6:00 PM