Alejandro Sanchez Losa
12/22/2021, 8:51 PMLucas Hosoya
12/22/2021, 9:27 PMDanny Vilela
12/22/2021, 9:34 PM@task
-decorated function with max_retries=8, retry_delay=dt.timedelta(minutes=15)
. However, I know that for certain kinds of errors, I’d actually want it to wait 30 minutes (or even an hour). Is there a way to implement this? I know I can probably just check for that exception then time.sleep
for the extra time (for example, to wait for an hour I’d catch the exception, time.sleep
for 45 minutes, then raise the error so that retry_delay
kicks in), but I’m wondering if there’s a cleaner way to approach this.
The above feels like a code smell but I’m not sure how I’d otherwise set context-specific retry delays on a task 🤔Leon Kozlowski
12/22/2021, 9:44 PMprefect.context
information at time of failure? For example scheduled_start_time
?Alejandro Sanchez Losa
12/22/2021, 10:59 PMAlejandro Sanchez Losa
12/22/2021, 11:06 PMRyan Sattler
12/23/2021, 5:24 AMSuresh R
12/23/2021, 7:37 AMAlejandro Sanchez Losa
12/23/2021, 10:31 AMHenrietta Salonen
12/23/2021, 1:23 PMJason Raede
12/23/2021, 4:09 PMsrc/tasks
and the tasks need access to stuff in src/utils
• src
◦ flows
▪︎ my_flow.py
▪︎ my_other_flow.py
◦ tasks
▪︎ shared_task_1.py
▪︎ shared_task_2.py
◦ utils
▪︎ shared_lib_1.py
Thanks!Leon Kozlowski
12/23/2021, 9:07 PMchelseatroy
12/23/2021, 9:24 PMPrefectSecret('PRIVATE_KEY_PASS_PHRASE').encode()
because the encode method is on str
and PrefectSecret
returns a PrefectSecret
. Anyone know how to do this?Chun Shen Wong
12/24/2021, 5:03 AMAnyelin Calderon
12/24/2021, 12:24 PMDohyeok Kim
12/24/2021, 4:36 PMHammad Ahmed
12/24/2021, 7:53 PMJason Raede
12/24/2021, 9:08 PM@task
def task1():
return Object(foo='bar')
@task
def task2(foo: str):
// Do something
@task
def getfoo(o: Object):
return o.foo
with Flow('my-flow'):
task1_result = task1()
// This fails
task2(task1_result.foo)
// This works, but is annoying
task2(getfoo(task1_result))
tas
12/25/2021, 1:53 PMAmruth VVKP
12/26/2021, 6:40 PMAndrey Tatarinov
12/26/2021, 8:25 PMVaruna Bamunusinghe
12/27/2021, 4:32 AMAkharin Sukcharoen
12/27/2021, 6:38 AMM. Siddiqui
12/27/2021, 12:46 PMEduardo Apolinario
12/27/2021, 11:52 PMJason Raede
12/28/2021, 1:58 AMLocalAgent
can run using the LocalDaskExecutor
? It’s parallelizing stuff fine locally when I just run flow.run
but on a server running a local agent it looks like it’s going one task at a time (I’m just watching the logs in prefect cloud). I’m setting flow.executor
before registering it, if that matters. I don’t see any indication in Prefect Cloud of what the flow’s executor is.Andrea Nerla
12/28/2021, 12:53 PMTom Shaffner
12/28/2021, 6:34 PMBradley Hurley
12/28/2021, 8:58 PMmicrok8s
for local development alongside localstack
. Ideally, this would allow us to port our existing prefect deployment to k8Chris McLaughlin
12/28/2021, 9:48 PMChris McLaughlin
12/28/2021, 9:48 PMKevin Kho
12/28/2021, 9:51 PMfrom prefect.tasks... import SomeTask
some = SomeTask(state_handlers=[mystatehandler])
with Flow(..) as flow:
some(x=1, y=2)
Chris McLaughlin
12/28/2021, 9:52 PMKevin Kho
12/28/2021, 9:54 PM@task()
. If your task has inputs, you can still stick them in there.
some = SomeTask(x=1, y=2, state_handlers=[mystatehandler], triggers=...)
with Flow(...) as flow:
some()