Joshua Greenhalgh
07/11/2022, 2:06 PMOctopus
07/11/2022, 2:14 PMfrom prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import StartFlowRun
import time
from datetime import timedelta
# @task
# def wait_and_succeed(ref, action_id):
# time.sleep(10)
# print(f"children task success for ref {ref} and action {action_id}")
# if action_id == "write":
# print(f"[SUCCESS] {ref} Second level reached !!!")
# if action_id == "delete":
# print(f"[SUCCESS] {ref} Third level reached !!!")
@task
def call_children_flow(ref):
print(f"{ref} ref")
actions_id = ["read","write","delete"]
for action_id in actions_id:
start_flow_run = StartFlowRun(flow_name="Generic Children flow")
print(f"start_flow_run {start_flow_run}")
child_id = start_flow_run.run(parameters={
"reference": ref,
"action_id": action_id,
}, project_name="Playground")
wait_for_flow_run.run(child_id)
@task
def run_action(action_id, ref):
start_flow_run = StartFlowRun(flow_name="Generic Children flow")
print(f"start_flow_run {start_flow_run}")
child_id = start_flow_run.run(parameters={
"reference": ref,
"action_id": action_id,
}, project_name="Playground")
return child_id
with Flow("Generic Parent flow") as parent_flow:
fake_refs = ["ref1", "ref2", "ref3"]
call_children_flow.map(fake_refs)
if __name__ == "__main__":
parent_flow.register(
project_name="Playground"
)
parent_flow.executor = LocalDaskExecutor(num_workers=20)
parent_flow.run()
Ievgenii Martynenko
07/11/2022, 2:16 PMAbin Joseph
07/11/2022, 2:22 PMMadison Schott
07/11/2022, 2:40 PMJehan Abduljabbar
07/11/2022, 3:37 PMMars
07/11/2022, 4:06 PMAmol Shirke
07/11/2022, 5:38 PMMichał Augoff
07/11/2022, 6:00 PMDeployment
’s KubernetesFlowRunner properties?Joe Goldbeck
07/11/2022, 7:15 PMConnor Parish
07/11/2022, 7:43 PMprefect 2.0b7-python3.8
. I'm trying to orchestrate the images through DeploymentSpecs
with DockerFlowRunners
as the flow_runners. Currently what happens is the sub flow runs on the same image as the main flow unless the sub flow is deployed independently. Would greatly appreciate any ideas or insights into current feasibility!Vaikath Job
07/11/2022, 8:29 PMflurven
07/11/2022, 8:34 PMVaikath Job
07/11/2022, 8:36 PMFailed to load and execute flow run: ValueError('Local Secret "<prefect.client.secrets.Secret object at 0x00000221D3B7B100>" was not found.')
when trying to use an OAuth token with the Secrets API. i.e. The config.toml is located on my local machine with and has a section:
[context.secrets]
GITLAB="<OAuth Token>"
The code that registers the flow is similar to this:
secret = Secret("GITLAB")
flow.storage = GitLab(host="path/to/host", repo="repo/address", path="flow/sample_flow.py", access_token_secret=secret)
flow.register(project_name="test-project-name")
I assume this is happening because the config.toml is not on the K8s cluster. If this is the case, is there a way I can attach this storage to the flow without storing OAuth tokens on the cluster itself?Kevin Grismore
07/11/2022, 9:05 PMMars
07/11/2022, 9:09 PM.env
files to load secrets from os.environ
after prefect
module import time? I want to use a PrefectSecret
instead of an EnvVarSecret
in my code, and I don’t want to hack the code between Prefect/EnvVar for local dev. Local context secrets should work well for overriding the PrefectSecret values, but it’s not working the way I expect.
My debugger is telling me the context and secrets are set once, during import prefect
, which means the secrets are fixed before I can load a dotenv file using my library of choice. The following pseudocode doesn’t work:
import prefect
import environs # .env support. .env not loaded yet.
with Flow() as flow:
PrefectSecret("MY_SECRET")
if __name__ == "__main__":
# For local testing
env = environs.Env()
env.read_env(".env") # Load my custom PREFECT__CONTEXT__SECRETS into os.environ
flow.run() # Ignores new os.environ
Laxman Singh Tomar
07/12/2022, 5:01 AMAndreas Nigg
07/12/2022, 8:41 AM@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
response = requests.get("my_url")
return response
The request itself works fine, if I run it manually. However, as soon as I use prefect 2.0 (with prefect 2.0 cloud) to run the flow/task, I run into to following exception.
The get request in the task takes about 1 minute and 10 seconds to return.
The exception itself is not coming from the server or my client --> I changed my request.get() call in the task to a http.client request but still get the request-exception below - so I've the strong feeling it's somehow related to prefect.
Exception summary:
• requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out'))
• followed by: 103655.875 | ERROR | Flow run 'chocolate-starling' - Crash detected! Request to https://api-beta.prefect.io/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/29d89dc3-4d92-4c69-a143-44f164303819/set_state timed out.
Exception details: See in thread
Is there something wrong in how I use the requests module? Or is there a "hidden" timeout for prefect when a prefect-scheduled task runs for more than 1 minute?
Edit: I run the flow currently only locally by running "python name_of_script.py"
Edit2: I'm running the python env in WSL2
Edit3: I use GCS storage as my default storage. Maybe this causes the problem?
Edit4: I was able to work around the issue, by zipping the content of the response before returning it in my flow. So if I change my flow to the following, it works. For me it looks really, as if the upload to GCS has a timeout of 1 minute and therefore the whole flow breaks, if the upload takes longer than this minute.
I can live with this workaround for the moment, however I'd be happy to know, if my "theory" about GCS being the problem is correct.
@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
response = requests.get("my_url")
return zlib.compress(response.content)
Emil Østergaard
07/12/2022, 10:01 AMdelete
functionality in the UI, thinking it would
delete all resources related to the flow_run, including the kubernetes job etc.
It did not remove the kubernetes job, so I removed this manually.
The issue is concurrency-limits: The tasks launched by this flow has a tag, with a concurrency limit.
It appears the task data associated with the deleted flow run was not removed from prefect storage.
For instance, if I try:
prefect concurrency-limit inspect my-tag
It shows a bunch of active task ids, even though nothing is running in k8s.
This causes an unfortunate issue where any new flow runs, for this flow, will never start tasks,
because prefect thinks the concurrency-limit is hit, due to these zombie tasks.
However, I can not seem to find a way to manually clean up these task ids, which means this flow is dead.
Any help is appreciated!Slackbot
07/12/2022, 10:16 AMiñigo
07/12/2022, 10:31 AMiñigo
07/12/2022, 10:35 AMJK
07/12/2022, 11:48 AMflow run's success or failure
, without using a task? Using a task is susceptible to infra issues (pre-emptible cloud compute nodes, etc) and hence will miss firing.
Not sure if there’ a way to configure each flow to send a status into a AWS-SQS or GCS-PubSub? Or if not, have an HTTP api to poll for status for all of the flows in a tenant account? Thanks!Andreas Nigg
07/12/2022, 12:50 PMJoshua Greenhalgh
07/12/2022, 2:07 PMAlan Ning
07/12/2022, 2:47 PMJacob Bedard
07/12/2022, 3:33 PMFailed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'snowflake\'")
I'm running this as a local agent. The flow runs ok when I do a flow.run() on the machine running the agent. What am I missing here?sravani jammula
07/12/2022, 3:52 PM{prefect.context.task_name}
failedjack
07/12/2022, 4:16 PMMarcin Grzybowski
07/12/2022, 4:39 PM