Fabrice Toussaint
08/11/2021, 11:57 AMValueError("Cycle found; flows must be acyclic!")
? Or to rephrase the question: "Why is it cyclic at all?"
Minimum example:
from prefect import Flow, apply_map, task
@task()
def do_something_one(item):
return item + 1
@task()
def do_something_two(item, item_one):
return item + item_one
@task()
def do_something_three(item_combination):
return item_combination[0] + item_combination[1]
# mapping function one
def do_something_with_one_two(item):
result_one = do_something_one(item)
result_two = do_something_two(item, result_one)
return (result_one, result_two)
# mapping function two
def do_something_with_three(item):
return do_something_three(item)
with Flow("test flow") as flow:
l = [1, 2, 3]
results = apply_map(do_something_with_one_two, l)
results_results = apply_map(do_something_with_three, results)
flow.run()
The visualization can be seen below.Billy McMonagle
08/11/2021, 3:19 PMNivi Mukka
08/11/2021, 3:19 PMmultiprocessing
inside my task code and how is that expected to behave in Prefect+Dask?
Code looks like this inside the task:
from tqdm import tqdm
import multiprocessing
pool = multiprocessing.Pool(processes=16)
func_output = list(tqdm(pool.imap(some_func, some_functions_input)))
I am getting this error:
AssertionError: daemonic processes are not allowed to have children
YD
08/11/2021, 5:17 PMfrom datetime import timedelta
from prefect import task, Flow
from prefect.schedules import CronSchedule
from time import sleep
@task(max_retries=1, retry_delay=timedelta(minutes=10), timeout=2000)
def tast_1():
sleep(10)
print('Do task 1')
return True
@task(max_retries=1, retry_delay=timedelta(minutes=10), timeout=2000)
def tast_2():
sleep(10)
print('Do task 2')
return True
def main():
schedule = CronSchedule("0 15 * * *")
with Flow("parallel tasks", schedule=schedule) as flow:
r1 = tast_1()
r2 = tast_2()
flow.register(project_name="parallel tasks")
if __name__ == "__main__":
main()
Kyle McChesney
08/11/2021, 8:12 PMGustavo de Paula
08/11/2021, 8:51 PMpython >= 3.8
. When running it with the local agent, it works fine, but when I try to run it with the kubernetes agent, using as image one that extends prefecthq/prefect:latest-python3.8
, I get a python version mismatch error saying that it is trying to run the flow with a python 3.7.10. Is there a way I can change this python version?Nadav
08/11/2021, 10:26 PMOpen-Issue Prefect Team 1
08/11/2021, 10:44 PMYD
08/11/2021, 11:16 PMWilliam Grim
08/11/2021, 11:56 PMIvan Indjic
08/12/2021, 8:48 AMNewskooler
08/12/2021, 9:24 AMfrom prefect.executors import LocalDaskExecutor
from prefect import Flow
with Flow(name=name, executor=LocalDaskExecutor(scheduler="threads", num_workers=1)) as flow:
...
Do I need to use another executer or where can I learn more about this?Nishtha Varshney
08/12/2021, 10:50 AMSamuel Kohlleffel
08/12/2021, 12:38 PMflow.run(parameters=dict(rebuild="true"))
but when I try to run
python -m python.module --param rebuild=true
the parameter is not recognized and it runs the flow with the default value. What am I doing wrong here?marios
08/12/2021, 2:05 PMItalo Barros
08/12/2021, 3:06 PMNicholas Chammas
08/12/2021, 3:10 PMRuslan Aliev
08/12/2021, 3:23 PMhunter
08/12/2021, 4:32 PMKyle McChesney
08/12/2021, 4:43 PM/etc/profile.d/
that exports the necessary env vars, but it does not seem to be loaded by default. Likely due to ENTRYPOINT/CMD issues (no login shell?). Wondering if anyone has found a way to do this. I am aware that I could configure the agent to pass env vars in OR update the run config for the flow with them but for a number of reasons, I am hoping to avoid this. The goal is to get env vars set somehow in docker build that are loaded through a normal “source” type process on container startJoe Schmid
08/12/2021, 5:30 PMCharles Liu
08/12/2021, 6:44 PMAnh Nguyen
08/12/2021, 8:02 PMNivi Mukka
08/12/2021, 9:12 PMLazarus
service? https://docs.prefect.io/orchestration/concepts/services.html#lazarusDanny Vilela
08/12/2021, 10:15 PMsnapshot_date = Parameter(name="snapshot_date", default=dt.datetime.today())
and pass that into another (class-based) task, how should we do that? Should we pass it to the task initialization (e.g., my_task = MyTask(snapshot_date=snapshot_date)
) or task call (i.e., my_task(snapshot_date=snapshot_date)
)?
This is assuming that MyTask
uses snapshot_date
within its run
method.Aric Huang
08/12/2021, 10:28 PMslack_notifier
(https://docs.prefect.io/core/advanced_tutorials/slack-notifications.html#slack-notifications) or creating your own state handler to attach to a flow (https://docs.prefect.io/core/concepts/notifications.html#state-handlers) but don't see a straightforward way to change the Slack message based on some runtime value other than old_state
and new_state
. Is there any recommended way to do this? I think if there's some way for a state handler to read a Parameter value that would work for me, but not sure if that's possible.Simon Gasse
08/13/2021, 12:48 PMKien Nguyen
08/13/2021, 1:22 PMThomas Weatherston
08/13/2021, 2:21 PMClientError('An error occurred (AccessDenied) when calling the PutObject operation: Access Denied')
Kien Nguyen
08/13/2021, 2:55 PM