Patrick Tan
02/18/2022, 4:37 PMDaniel Nilsen
02/18/2022, 4:44 PMt1 = task1()
t2 = task2(t2)
while condition:
t3 = task3(a)
t4 = task4(t3)
t5 = task5(t4)
Kevin Kho
02/18/2022, 4:46 PMluther1337
02/18/2022, 6:08 PMdocker run
. however, when i deploy the flows to kube and use prefect cloud to trigger a run, i get the following error:
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'my_package\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
i'm using GCS as the storage. i'm also wondering if/why i need to use GCS as a storage -- doesn't the execution environment have access to the flows? why do they even need to be pickled?
thanks in advance! 🙂Florentino Bexiga
02/18/2022, 6:44 PMJacqueline Riley Garrahan
02/18/2022, 6:52 PMprefect.tasks.prefect.create_flow_run
to kick off some tasks. I've noticed that this isn't returning an id for the flow run as documented and instead returning a Task
object. Any advice on how to access the ids of created runs?Chris Reuter
02/18/2022, 7:45 PMhttps://youtu.be/4hIqYhRf6JY▾
Richard Hughes
02/18/2022, 8:28 PMWilliam Grim
02/18/2022, 8:39 PMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n AttributeError("Can\'t get attribute \'create_params_file\' on <module \'our_filename.py\' from \'/our_filename.py\'>")')
The signature of create_params_file
, which is not a task but a method that can be called looks like:
def create_params_file(base_filename: str, **kwargs) -> str:
Henning Holgersen
02/18/2022, 9:15 PMDexter Antonio
02/18/2022, 9:36 PMMY_RESULTS = S3Result(bucket='my_bucket_without_s3_prefix',location='my_output_folder')
prefect.config.flows.checkpointing = True
!export PREFECT__FLOWS__CHECKPOINTING=true
with Flow("please work", result=MY_RESULTS) as f:
t1 = my_task()
state = f.run()
!aws s3 ls <s3://my_bucket_name_witohut_s3_prefix/my_output_folder> # nothing is here
Is there something obvious, which I am missing?Brian Lorenz
02/19/2022, 12:27 AMline 282, in get_profile_context raise MissingContextError("No profile context found.")
Any suggestions on how to fix this?Heeje Cho
02/19/2022, 1:07 AMcreate_flow_run
to create a persistent scheduled flow run? Not a flow that runs only ones at a scheduled time but a flow that runs at intervals?Aric Huang
02/19/2022, 1:38 AMmap
- With the following sample flow, I was expecting the mapped task f
to run concurrently with wait
because there are no dependencies and LocalDaskExecutor
is being used. However, the behavior I see is that only `wait`'s mapped tasks get executed, so f
is not executed until all the wait
tasks return.
from prefect import Flow, task
import time
from prefect.executors import LocalDaskExecutor
@task
def f(x):
return x*2
@task
def wait(x):
time.sleep(x)
with Flow("test") as flow:
a = list(range(4))
wait.map(a)
result = f.map(a)
flow.executor = LocalDaskExecutor()
Kivanc Yuksel
02/19/2022, 3:18 PMtarget
, however, from time to time I want to re-run these tasks without manually deleting target files. Is there a way to "force" re-run for such tasks?Dexter Antonio
02/19/2022, 5:58 PMOmar Sultan
02/19/2022, 8:55 PMSamay Kapadia
02/20/2022, 2:35 PMpip install prefect[azure]
on my m1 mac 😞Brian Lorenz
02/21/2022, 1:09 AMMax Lei
02/21/2022, 4:39 AMAntonio Manuel BR
02/21/2022, 7:47 AMGuillaume Latour
02/21/2022, 9:20 AMMichael Hadorn
02/21/2022, 11:15 AMDotan Asselmann
02/21/2022, 12:09 PMiñigo
02/21/2022, 12:15 PMLucas Hosoya
02/21/2022, 12:54 PMArnaldo Russo
02/21/2022, 2:02 PMnew_flow_context=prefect.context.get('config')
Tomek Florek
02/21/2022, 2:29 PMMarwan Sarieddine
02/21/2022, 3:19 PMAqib Fayyaz
02/21/2022, 3:28 PMAqib Fayyaz
02/21/2022, 3:28 PMAnna Geller
02/21/2022, 4:14 PMKubernetesAgent
on GKE
• as well as setting up Server with Helm on GKE
and I remember we managed to do it (both), right? did something happen with your setup and you have to start from scratch?Aqib Fayyaz
02/21/2022, 4:15 PMAnna Geller
02/21/2022, 4:21 PMAqib Fayyaz
02/21/2022, 4:26 PMAnna Geller
02/21/2022, 4:31 PMAqib Fayyaz
02/22/2022, 11:29 AMhttps://www.youtube.com/watch?v=EwsMecjSYEU&t=2792s▾
Anna Geller
02/22/2022, 11:42 AMKubernetesAgent
with Prefect Cloud and once when you were setting up Server on GKE with helm chart and I remember you got it workingAqib Fayyaz
02/22/2022, 11:45 AMAnna Geller
02/22/2022, 11:52 AMAqib Fayyaz
02/22/2022, 11:53 AMAnna Geller
02/22/2022, 12:04 PMAqib Fayyaz
02/22/2022, 12:10 PMAnna Geller
02/22/2022, 12:19 PMAqib Fayyaz
02/22/2022, 12:29 PMAnna Geller
02/22/2022, 12:32 PMAqib Fayyaz
02/22/2022, 12:50 PMAnna Geller
02/22/2022, 12:54 PMAqib Fayyaz
02/22/2022, 12:58 PMAnna Geller
02/22/2022, 12:59 PMAqib Fayyaz
02/22/2022, 1:14 PMAnna Geller
02/22/2022, 1:24 PMAqib Fayyaz
02/22/2022, 3:32 PMCMD ["python3", "/usr/app/feat_post_flow_local.py"]
and it works i mean it runs the flow and flow does its job and sends the result to shared storage as well.
Server is also deployed on gke using helm chart so my question is can this server interact with the flow stored on gke as servcie?Anna Geller
02/23/2022, 9:41 AMAqib Fayyaz
02/23/2022, 9:45 AMAnna Geller
02/23/2022, 9:46 AMAqib Fayyaz
02/23/2022, 9:47 AMAnna Geller
02/23/2022, 9:52 AMAqib Fayyaz
02/23/2022, 9:59 AMAnna Geller
02/23/2022, 10:05 AMAqib Fayyaz
02/23/2022, 11:20 AMAnna Geller
02/23/2022, 11:22 AMAqib Fayyaz
02/23/2022, 11:25 AMAnna Geller
02/23/2022, 11:45 AM