Ram Vuppaladadiyam
01/25/2023, 4:20 PMa >> b >> c >> d
. After the flow has run, what is the best way to run tasks b
to d
?
2) On a related note, in the following thread, it seems that there was a PR for flow restarts. Has there been any follow-up there by any chance? Can you also clarify the difference between a flow retry and a flow restart? https://discourse.prefect.io/t/how-can-i-restart-a-flow-in-prefect-2-0/1332/9
3) After a flow run is canceled, what's the best way to resume the flow run? When we cancel a flow run, it seems that the flow run is indeed canceled. On the other hand, any tasks that were previously running are interrupted but left in the running state. So, when we attempt to retry the flow, those tasks that are still in the running state cannot be restarted.
4) What is the purpose behind deleting a task run? How should we use this feature?
Thanks for your help!Vishnu Duggirala
01/25/2023, 4:25 PMClient Error '404 Not Found' for the url
, but this happened only once before my flows resumed normal operation.Stephen Herron
01/25/2023, 5:04 PMdeployment run state
in addition to flow run state
and work queue health
?
I have a flow trigger_dbt
in multiple deployments and I only want to trigger for specific conditions (i.e. the deployment conditions.. specific params etc).Ethienne Marcelin
01/25/2023, 5:23 PMBlake Stefansen
01/25/2023, 9:44 PMMike Logaciuk
01/25/2023, 10:50 PMZachary Lee
01/25/2023, 10:53 PM2.7.8
) and I'm having some trouble with a 'permission denied' error when my pod (run via KubernetesJob
) tries to run my deployment (using s3 storage block) The short version of the error is:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
...
File "/opt/env/lib/python3.9/site-packages/s3fs/core.py", line 1141, in _get_file
with open(lpath, "wb") as f0:
PermissionError: [Errno 13] Permission denied: '/workdir/./src/hvac/__init__.py'
I can get it to go away if I give that rw
permissions for every user before it gets uploaded to s3, but it seems strange that I should have to do this, so I feel like I must be missing something...
Any help is appreciated, but also I'm just looking to understand better what prefect is actually doing when it spins up my pod. I understand that it downloads code from the storage block (S3 in my case), but I'm fuzzy on a few details:
• What determines where the code gets downloaded to on the pod? Is it the .path
of the Deployment?
• Why does prefect need to attempt to write to the file in my error? My Deployment's .entrypoint
is a completely different file (containing only my simple test flow) which has no outside dependencies beyond prefect. My understanding was that other files would only be read if they were imported in the entrypoint file?
• What user on the pod is executing this code that is downloading from Storage, retrieving the flow, etc?
Thanks for any info you can give!Samuel Kohlleffel
01/25/2023, 11:37 PMhttpx.HTTPStatusError: Client error '422 Unprocessable Entity' for url '***/deployments/'
For more information check: <https://httpstatuses.com/422>
A version upgrade to Prefect 2.7.9 appears to have solved the issue. However, it's strange that our CI/CD for deployments started failing seemingly out of no where. Does anyone know the cause of this issue?Tomás Emilio Silva Ebensperger
01/26/2023, 1:30 AMjack
01/26/2023, 3:28 AMStefan
01/26/2023, 9:42 AMEmma Rizzi
01/26/2023, 10:06 AMOria
01/26/2023, 10:59 AMMatheus Rocha
01/26/2023, 12:58 PMtask_a >> task_b
task_b >> [task_c, task_d] >> task_e
Jens
01/26/2023, 2:15 PMclass TestClass:
def __init__(self, atr_a, atr_b):
self.atr_a=atr_a
self.atr_b=atr_b
@task()
def test_method(self, text):
texts = self.atr_a(text)
...
...
Than i have a other script whicht defines my flow
@flow()
def my_test_flow():
test_class = TestClass(atr_a=..., atr_b=...)
test.class.test_method(text=text)
If I run this example I get the error: Function 'test_method' has signature 'self, text' but received args: (....... )
Is there a way, I can call a Task inside a class?Pierre Vogler-Finck
01/26/2023, 2:16 PMCrash detected! Execution was cancelled by the runtime environment.
and then Crash detected! Execution was interrupted by an unexpected exception: httpx.HTTPStatusError: Server error '500 Internal Server Error' for url 'http://[our instance url]/api/task_runs/'
For more information check: <https://httpstatuses.com/500>
)
A minimum running example is (in Python 3.10 with prefect 2.7.9) :
import time
from prefect import unmapped, flow, task
@task(name="Sleeper {id}")
def sleeper(id, seconds_to_wait: int):
print(f"Start Sleeper {id}")
time.sleep(seconds_to_wait)
print(f"End Sleeper {id}")
@flow(
name="Prefect 2 concurrency checker",
)
def flow(n_runs: int = 128, seconds_to_wait: int = 2):
res = sleeper.map(list(range(n_runs)), seconds_to_wait=unmapped(seconds_to_wait))
return res
if __name__ == "__main__":
res = flow()
print("done")
In Prefect 1, we had a way to limit the number of task run creations, but that's apparently not available in Prefect 2 AFAIK (note: we've tried tag-limiting, but that's not helping as it's the creation of the many tasks that kills it, not their actual run ; we've also tried an async approach, which fails in the same way).
My questions to you:
1- Have you ever experienced this too?
2- Did you find a walk-around?
3- If not, do you have an idea of how to manage this?Andrew Lawlor
01/26/2023, 2:43 PMFailed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]) Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers state = self.client.set_task_run_state( File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1622, in set_task_run_state result = self.graphql( File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 465, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]
Kelby
01/26/2023, 3:30 PMproject
flows
flow1.py
task1.py
common
db.py
utils.py
I can get that to work if I have the proper PYTHONPATH in place, but that seems wrong because the flow/task code is run from the /tmp
directory, but the common code would be running in the original source location.
Any hints? Thanks!Tumi Tran
01/26/2023, 3:54 PMDavid Elliott
01/26/2023, 4:26 PMread_task_runs()
endpoint and count the number of task runs returned? And RE the max limit of 200 task runs per API call, am I right to loop over it in the way that I’m doing in the thread, or is there a neater pattern? Thanks!Ram Vuppaladadiyam
01/26/2023, 4:26 PMa >> b >> c >> d
. After the flow has run, what is the best way to run tasks b
to d
?Mia
01/26/2023, 4:46 PMKeyError: "No class found for dispatch key 'gitlab-repository' in registry for type 'Block'."
Am I missing a step somewhere?Triet Le
01/26/2023, 5:03 PMprefect_bigquery
lib, what should I fill in the location
param if I’ve specified the region for my GCP project as asia-southeast1?
I’ve tried to leave it as default (“US”),fill it as None
,Asia
, ASIA
, and asia-southeast1
,but it’s kept showing the same error as below
File <my_path_to_venv>/lib/python3.10/site-packages/prefect_gcp/bigquery.py", line 223, in bigquery_create_table
client = gcp_credentials.get_bigquery_client(project=project, location=location)
AttributeError: 'SecretDict' object has no attribute 'get_bigquery_client'
Aaron Guan
01/26/2023, 5:12 PMGuy Altman
01/26/2023, 6:30 PMTony Alfonse
01/26/2023, 7:09 PMAdam Eury
01/26/2023, 8:41 PMprefect.exceptions.UpstreamTaskError: Upstream task run 'None' did not reach a 'COMPLETED' state.
It doesn't appear to happen consistently, so it has been difficult to debug. We are currently on Prefect 2.1.1
.Jack
01/26/2023, 9:45 PMTimo Vink
01/26/2023, 10:10 PMpersist_result=True
programmatically in Prefect 2? e.g. initialize a local flow run with persisted state/results from some remote flow run?
Usecase: Flow fails in remote environment a few hours into a run. It'd be nice if a developer could run that flow locally in their debugger, but initialize the flow using the persisted results from the remote run to skip straight to the tasks that failed.Leo Kacenjar
01/26/2023, 10:19 PMprefecthq/prefect:2-latest
Thanks!