Jason
06/15/2022, 9:03 PMRobert Esteves
06/15/2022, 9:19 PMVictoria Alvarez
06/15/2022, 10:10 PMShaoyi Zhang
06/15/2022, 10:22 PMCA Lee
06/15/2022, 11:48 PMFaheem Khan
06/16/2022, 2:44 AMFaheem Khan
06/16/2022, 5:56 AMAmro
06/16/2022, 6:27 AMGuangSheng Liu
06/16/2022, 7:16 AMexport PREFECT__SERVICES__SERVICES__TOWEL__MAX__SCHEDULED__RUNS__PER__FLOW=1
or
export PREFECT__MAX__SCHEDULED__RUNS__PER__FLOW=1
Surya
06/16/2022, 10:27 AMRoger Webb
06/16/2022, 1:30 PMEd Burroughes
06/16/2022, 2:36 PMdef build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs):
with Flow(name, **flow_kwargs) as flow:
repeat_task_output = repeat_task()
return flow, repeat_task_output
@contextmanager
def FullRefreshFlow(name, s3_dir_prefix, **flow_kwargs):
flow = build_full_refresh_base(name, s3_dir_prefix, **flow_kwargs)
try:
yield flow
finally:
print("do something")
if __name__ == "__main__":
@task(log_stdout=True)
def some_task(repeat_task_output):
print(repeat_task_output)
with FullRefreshFlow("hello", "some_dir") as (flow, repeat_task_output):
some_task(repeat_task_output)
flow.run()
Joshua Allen
06/16/2022, 2:36 PMdocker
module for Python instead? Or is there a better way?Matthew Millendorf
06/16/2022, 3:51 PMDavid Yak
06/16/2022, 4:16 PMSlackbot
06/16/2022, 4:39 PMJosh
06/16/2022, 5:44 PMDavid Yak
06/16/2022, 5:52 PMFina Silva-Santisteban
06/16/2022, 5:59 PMXavier Babu
06/16/2022, 7:55 PMDavid Anderson
06/16/2022, 9:02 PMAirbyteConnectionTask
seeing a new error starting ~last week? all of my runs are failing with Error during execution of task: KeyError('schedule')
. haven't changed a thing in my prefect flow configuration. thought maybe it was related to an airbyte upgrade (im running the self-hosted version), but im struggling to troubleshoot. any ideas?Matt Alhonte
06/16/2022, 10:47 PMOverview
tab in the GUI for a given flow always display the actual date instead of "Last Sunday" or whatever?Matt Alhonte
06/16/2022, 11:02 PMBen Muller
06/16/2022, 11:59 PMprefect run -p flows/harness_racing_victoria/harness_racing_victoria_results.py
error trace in 🧵
Any ideas?George Shishorin
06/17/2022, 2:05 AMNo module named 'pandas'
occurred. Flow B is ok.
So the question is: what is the best practice to register flows with different environments and dependencies?
Hope for your support and thank you!Marius Haberstock
06/17/2022, 8:20 AMmarque
06/17/2022, 9:09 AMMichael Maletich
06/17/2022, 10:55 AMDung Khuc
06/17/2022, 12:05 PMTaskA: resultA -> TaskB: resultB -> TaskC: resultC
if task B fails, I want to run:
UndoTaskA (resultA)
if task C fails, I want to run:
UndoTaskB (resultB) -> UndoTaskA (result A)
Michiel Verburg
06/17/2022, 12:53 PM@task
def for_loop_B(category):
for item_i in category.items:
load_item(item_i)
store_item(item_i)
@flow
def for_loop_A()
categories = retrieve_all_categories()
for category_i in categories:
for_loop_B(category_i)
Doing the above seemed wrong, because also (by default at least) tasks or flows would fail just because one internal step failed. Additionally, the processing of the categories is fully independent. So I thought, for_loop_B
should also be a flow, and load_item
and store_item
should be tasks for example. However, I got confused because of what the docs mention: “Unlike tasks, subflows will block until completion with all task runners.“.
I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen? Also, can tasks be nested within tasks for that matter?