Suresh R
07/11/2022, 10:07 AMIevgenii Martynenko
07/11/2022, 10:14 AMIevgenii Martynenko
07/11/2022, 10:30 AMBogdan Serban
07/11/2022, 10:51 AMmap
function in an unmapped
call.
I am receiving an warning related to the size of the ml model that is being shared in the task graph (more details in the thread).
Is there an issue if I am receiving that error message? And is there a better way to share that ML model across the tasks?Mathieu Cayssol
07/11/2022, 11:20 AMJan
07/11/2022, 11:56 AMfrom prefect import task, flow
import time
@flow
def mijn_flow ():
return("just a string")
print(mijn_flow())
The output is:
13:54:51.255 | INFO | prefect.engine - Created flow run 'brainy-parakeet' for flow 'mijn-flow'
13:54:51.271 | INFO | Flow run 'brainy-parakeet' - Using task runner 'ConcurrentTaskRunner'
13:54:51.302 | WARNING | Flow run 'brainy-parakeet' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
13:54:51.443 | INFO | Flow run 'brainy-parakeet' - Finished in state Completed()
Completed()
Can you point me into the correct direction?
I'm trying to access the return object of my flow (which is a string in this case).Alexandru Anghel
07/11/2022, 12:58 PMRuntimeError(f"Cluster failed to start: {e}") from e RuntimeError: Cluster failed to start: 503, message='Service Unavailable', url=URL('<http://proxy-ip-here>:proxy-port-here')
Any ideas on how to fix this? I've tried adding a NO_PROXY env variable alongside HTTPS_PROXY but it doesn't work.
I am using Prefect 1.2
Thanks!Black Spy
07/11/2022, 1:06 PMBinoy Shah
07/11/2022, 1:42 PM#dagster-*
channels compared to other communities and and it shows in the recommendations chart below under the light of our existing infra setup.
But I wanted to add more “fairness” to the evaluation ratings and I’d highly appreciate some constructive feedback from the community on where I can improve this rating.Florian Guily
07/11/2022, 1:55 PMJoshua Greenhalgh
07/11/2022, 2:06 PMOctopus
07/11/2022, 2:14 PMfrom prefect import Flow, Parameter, task, unmapped
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import StartFlowRun
import time
from datetime import timedelta
# @task
# def wait_and_succeed(ref, action_id):
# time.sleep(10)
# print(f"children task success for ref {ref} and action {action_id}")
# if action_id == "write":
# print(f"[SUCCESS] {ref} Second level reached !!!")
# if action_id == "delete":
# print(f"[SUCCESS] {ref} Third level reached !!!")
@task
def call_children_flow(ref):
print(f"{ref} ref")
actions_id = ["read","write","delete"]
for action_id in actions_id:
start_flow_run = StartFlowRun(flow_name="Generic Children flow")
print(f"start_flow_run {start_flow_run}")
child_id = start_flow_run.run(parameters={
"reference": ref,
"action_id": action_id,
}, project_name="Playground")
wait_for_flow_run.run(child_id)
@task
def run_action(action_id, ref):
start_flow_run = StartFlowRun(flow_name="Generic Children flow")
print(f"start_flow_run {start_flow_run}")
child_id = start_flow_run.run(parameters={
"reference": ref,
"action_id": action_id,
}, project_name="Playground")
return child_id
with Flow("Generic Parent flow") as parent_flow:
fake_refs = ["ref1", "ref2", "ref3"]
call_children_flow.map(fake_refs)
if __name__ == "__main__":
parent_flow.register(
project_name="Playground"
)
parent_flow.executor = LocalDaskExecutor(num_workers=20)
parent_flow.run()
Ievgenii Martynenko
07/11/2022, 2:16 PMAbin Joseph
07/11/2022, 2:22 PMMadison Schott
07/11/2022, 2:40 PMJehan Abduljabbar
07/11/2022, 3:37 PMMars
07/11/2022, 4:06 PMAmol Shirke
07/11/2022, 5:38 PMMichał Augoff
07/11/2022, 6:00 PMDeployment
’s KubernetesFlowRunner properties?Joe Goldbeck
07/11/2022, 7:15 PMConnor Parish
07/11/2022, 7:43 PMprefect 2.0b7-python3.8
. I'm trying to orchestrate the images through DeploymentSpecs
with DockerFlowRunners
as the flow_runners. Currently what happens is the sub flow runs on the same image as the main flow unless the sub flow is deployed independently. Would greatly appreciate any ideas or insights into current feasibility!Vaikath Job
07/11/2022, 8:29 PMflurven
07/11/2022, 8:34 PMVaikath Job
07/11/2022, 8:36 PMFailed to load and execute flow run: ValueError('Local Secret "<prefect.client.secrets.Secret object at 0x00000221D3B7B100>" was not found.')
when trying to use an OAuth token with the Secrets API. i.e. The config.toml is located on my local machine with and has a section:
[context.secrets]
GITLAB="<OAuth Token>"
The code that registers the flow is similar to this:
secret = Secret("GITLAB")
flow.storage = GitLab(host="path/to/host", repo="repo/address", path="flow/sample_flow.py", access_token_secret=secret)
flow.register(project_name="test-project-name")
I assume this is happening because the config.toml is not on the K8s cluster. If this is the case, is there a way I can attach this storage to the flow without storing OAuth tokens on the cluster itself?Kevin Grismore
07/11/2022, 9:05 PMMars
07/11/2022, 9:09 PM.env
files to load secrets from os.environ
after prefect
module import time? I want to use a PrefectSecret
instead of an EnvVarSecret
in my code, and I don’t want to hack the code between Prefect/EnvVar for local dev. Local context secrets should work well for overriding the PrefectSecret values, but it’s not working the way I expect.
My debugger is telling me the context and secrets are set once, during import prefect
, which means the secrets are fixed before I can load a dotenv file using my library of choice. The following pseudocode doesn’t work:
import prefect
import environs # .env support. .env not loaded yet.
with Flow() as flow:
PrefectSecret("MY_SECRET")
if __name__ == "__main__":
# For local testing
env = environs.Env()
env.read_env(".env") # Load my custom PREFECT__CONTEXT__SECRETS into os.environ
flow.run() # Ignores new os.environ
Laxman Singh Tomar
07/12/2022, 5:01 AMAndreas Nigg
07/12/2022, 8:41 AM@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
response = requests.get("my_url")
return response
The request itself works fine, if I run it manually. However, as soon as I use prefect 2.0 (with prefect 2.0 cloud) to run the flow/task, I run into to following exception.
The get request in the task takes about 1 minute and 10 seconds to return.
The exception itself is not coming from the server or my client --> I changed my request.get() call in the task to a http.client request but still get the request-exception below - so I've the strong feeling it's somehow related to prefect.
Exception summary:
• requests.exceptions.ConnectionError: ('Connection aborted.', timeout('The write operation timed out'))
• followed by: 10:36:55.875 | ERROR | Flow run 'chocolate-starling' - Crash detected! Request to https://api-beta.prefect.io/api/accounts/bd169b15-9cf0-41df-9e46-2233ca3fcfba/workspaces/f507fe51-4c9f-400d-8861-ccfaf33b13e4/task_runs/29d89dc3-4d92-4c69-a143-44f164303819/set_state timed out.
Exception details: See in thread
Is there something wrong in how I use the requests module? Or is there a "hidden" timeout for prefect when a prefect-scheduled task runs for more than 1 minute?
Edit: I run the flow currently only locally by running "python name_of_script.py"
Edit2: I'm running the python env in WSL2
Edit3: I use GCS storage as my default storage. Maybe this causes the problem?
Edit4: I was able to work around the issue, by zipping the content of the response before returning it in my flow. So if I change my flow to the following, it works. For me it looks really, as if the upload to GCS has a timeout of 1 minute and therefore the whole flow breaks, if the upload takes longer than this minute.
I can live with this workaround for the moment, however I'd be happy to know, if my "theory" about GCS being the problem is correct.
@task(name="get_subscriptions",
retries=2,
retry_delay_seconds=5)
def get_subscriptions(paper_code, logger: Logger):
response = requests.get("my_url")
return zlib.compress(response.content)
Emil Østergaard
07/12/2022, 10:01 AMdelete
functionality in the UI, thinking it would
delete all resources related to the flow_run, including the kubernetes job etc.
It did not remove the kubernetes job, so I removed this manually.
The issue is concurrency-limits: The tasks launched by this flow has a tag, with a concurrency limit.
It appears the task data associated with the deleted flow run was not removed from prefect storage.
For instance, if I try:
prefect concurrency-limit inspect my-tag
It shows a bunch of active task ids, even though nothing is running in k8s.
This causes an unfortunate issue where any new flow runs, for this flow, will never start tasks,
because prefect thinks the concurrency-limit is hit, due to these zombie tasks.
However, I can not seem to find a way to manually clean up these task ids, which means this flow is dead.
Any help is appreciated!Slackbot
07/12/2022, 10:16 AMSlackbot
07/12/2022, 10:16 AM