when trying to use an OAuth token with the Secrets API. i.e. The config.toml is located on my local machine with and has a section:
Failed to load and execute flow run: ValueError('Local Secret "<prefect.client.secrets.Secret object at 0x00000221D3B7B100>" was not found.')
The code that registers the flow is similar to this:
[context.secrets] GITLAB="<OAuth Token>"
I assume this is happening because the config.toml is not on the K8s cluster. If this is the case, is there a way I can attach this storage to the flow without storing OAuth tokens on the cluster itself?
secret = Secret("GITLAB") flow.storage = GitLab(host="path/to/host", repo="repo/address", path="flow/sample_flow.py", access_token_secret=secret) flow.register(project_name="test-project-name")
files to load secrets from
module import time? I want to use a
instead of an
in my code, and I don’t want to hack the code between Prefect/EnvVar for local dev. Local context secrets should work well for overriding the PrefectSecret values, but it’s not working the way I expect. My debugger is telling me the context and secrets are set once, during
, which means the secrets are fixed before I can load a dotenv file using my library of choice. The following pseudocode doesn’t work:
import prefect import environs # .env support. .env not loaded yet. with Flow() as flow: PrefectSecret("MY_SECRET") if __name__ == "__main__": # For local testing env = environs.Env() env.read_env(".env") # Load my custom PREFECT__CONTEXT__SECRETS into os.environ flow.run() # Ignores new os.environ
Laxman Singh Tomar
The request itself works fine, if I run it manually. However, as soon as I use prefect 2.0 (with prefect 2.0 cloud) to run the flow/task, I run into to following exception. The get request in the task takes about 1 minute and 10 seconds to return. The exception itself is not coming from the server or my client --> I changed my request.get() call in the task to a http.client request but still get the request-exception below - so I've the strong feeling it's somehow related to prefect. Exception summary: • requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out')) • followed by: 10:36:55.875 | ERROR | Flow run 'chocolate-starling' - Crash detected! Request to https://api-beta.prefect.io/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/29d89dc3-4d92-4c69-a143-44f164303819/set_state timed out. Exception details: See in thread Is there something wrong in how I use the requests module? Or is there a "hidden" timeout for prefect when a prefect-scheduled task runs for more than 1 minute? Edit: I run the flow currently only locally by running "python name_of_script.py" Edit2: I'm running the python env in WSL2 Edit3: I use GCS storage as my default storage. Maybe this causes the problem? Edit4: I was able to work around the issue, by zipping the content of the response before returning it in my flow. So if I change my flow to the following, it works. For me it looks really, as if the upload to GCS has a timeout of 1 minute and therefore the whole flow breaks, if the upload takes longer than this minute. I can live with this workaround for the moment, however I'd be happy to know, if my "theory" about GCS being the problem is correct.
@task(name="get_subscriptions", retries=2, retry_delay_seconds=5) def get_subscriptions(paper_code, logger: Logger): response = requests.get("my_url") return response
@task(name="get_subscriptions", retries=2, retry_delay_seconds=5) def get_subscriptions(paper_code, logger: Logger): response = requests.get("my_url") return zlib.compress(response.content)
functionality in the UI, thinking it would delete all resources related to the flow_run, including the kubernetes job etc. It did not remove the kubernetes job, so I removed this manually. The issue is concurrency-limits: The tasks launched by this flow has a tag, with a concurrency limit. It appears the task data associated with the deleted flow run was not removed from prefect storage. For instance, if I try:
It shows a bunch of active task ids, even though nothing is running in k8s. This causes an unfortunate issue where any new flow runs, for this flow, will never start tasks, because prefect thinks the concurrency-limit is hit, due to these zombie tasks. However, I can not seem to find a way to manually clean up these task ids, which means this flow is dead. Any help is appreciated!
prefect concurrency-limit inspect my-tag
, without using a task? Using a task is susceptible to infra issues (pre-emptible cloud compute nodes, etc) and hence will miss firing. Not sure if there’ a way to configure each flow to send a status into a AWS-SQS or GCS-PubSub? Or if not, have an HTTP api to poll for status for all of the flows in a tenant account? Thanks!
flow run's success or failure