alex
09/18/2020, 12:50 AMresults = []
for op in operations:
op = task(op)
most_recent = op
.. a few more conditional tasks defined at compile time
if do_recovery:
recovery_task = recover(trigger=any_failed_else_raise_skip_signal)
most_recent = recovery_task
results.append(most_recent)
backup_task = Backup('mydatabasename', upstream_tasks=[results], skip_on_upstream_skip=False)
mapped = Aggregate.map(
results,
target=unmapped('mydatabasename')
)
mapped.set_upstream([backup_task])
As you can see from the image, instead of getting one list, I have 2 lists and due to this, my Aggregate task is being skipped even though it is downstream of a skip_on_upstream_skip
task. Is there a way to cleanly resolve this (ie. only get 1 list) ?nicholas
09/18/2020, 2:13 PMop
, most_recent
, and result
)alex
09/18/2020, 4:22 PMimport copy
import datetime
import prefect
from prefect import Flow, task
from prefect.triggers import _get_all_states_as_set
class Feed:
def __init__(self, name, do_alpha, raise_an_err=None):
self.name = name
self.do_alpha = do_alpha
self.do_bravo = True
self.date = datetime.datetime.today().strftime("%Y%m%d")
self.raise_an_err = raise_an_err
def update_date(self, datenew):
self.date = datenew
def get_collection(self):
return self.name + "_collection_" + self.date
@task(log_stdout=True)
def do_alpha(feed):
print(feed.get_collection() + " did alpha")
return feed
@task(log_stdout=True)
def do_bravo(feed):
if feed.raise_an_err:
raise Exception(feed.get_collection() + " could not do bravo")
print(feed.get_collection() + " did bravo")
return feed
@task(log_stdout=True)
def refresh_date(feed):
feed.update_date(prefect.context.today_nodash)
return feed
@task(log_stdout=True)
def aggregate(feed):
print(f"aggregated {feed.get_collection()}")
@task(log_stdout=True)
def prepare_for_aggregation():
print("prepared")
def run_on_any_failed_or_skip_trigger(upstream_states):
"""
If any of the upstream tasks failed, run this task.
If not, raise skip signal
"""
if any(s.is_failed() for s in _get_all_states_as_set(upstream_states)):
return True
else:
raise prefect.engine.signals.SKIP()
@task(log_stdout=True, trigger=run_on_any_failed_or_skip_trigger)
def failure_recovery(feed):
print(f"recovered {feed.get_collection()}")
return feed
@task(log_stdout=True)
def copy_task(o):
return copy.deepcopy(o)
feed_a, feed_b = Feed("a", do_alpha=False), Feed("b", do_alpha=True, raise_an_err=True)
params = {
feed_a: {"do_alpha": feed_a.do_alpha, "do_bravo": feed_a.do_bravo},
feed_b: {"do_alpha": feed_b.do_alpha, "do_bravo": feed_b.do_bravo},
}
feeds = [feed_a, feed_b]
recovery_enabled = True
def get_flow():
with Flow("Demo Flow") as f:
results = []
for feed in feeds:
run_params = params[feed]
original_feed = copy_task(feed)
feed = refresh_date(feed)
if run_params["do_alpha"]:
feed = do_alpha(feed)
if run_params["do_bravo"]:
feed = do_bravo(feed)
# some more similar logic (do_charlie etc..)
if recovery_enabled:
feed = failure_recovery(
original_feed, upstream_tasks=[feed]
) # cant just pass feed, as if any of above tasks failed, feed's value will be a FailedTrigger(?)
results.append(feed)
# wait till all feeds have ran
prep = prepare_for_aggregation(
upstream_tasks=[results],
task_args=dict(
skip_on_upstream_skip=False
), # want to make sure we still run if we skipped failure recovery
)
agg = aggregate.map(results)
agg.set_upstream([prep])
return f
if __name__ == "__main__":
flow = get_flow()
# flow.run()
flow.register('cf')
"""
This used to be a cli application before I transitioned to prefect.
Feeds contain information about how to run them and are defined at compile time (ie. they won't be changed through
eg.parameters, and also provide a method used by all tasks: get_collection, which is based on the current date,
so that needs to be injected every time a flow runs, otherwise it will still use the
same date every day.
At the end, I want to backup my final collection and insert data from all feed collections into it.
"""
nicholas
09/18/2020, 5:14 PMresults = []
for feed in feeds:
run_params = params[feed]
original_feed = copy_task(feed)
feed = refresh_date(feed)
if run_params["do_alpha"]:
feed = do_alpha(feed)
if run_params["do_bravo"]:
feed = do_bravo(feed)
# some more similar logic (do_charlie etc..)
if recovery_enabled:
feed = failure_recovery(
original_feed, upstream_tasks=[feed]
) # cant just pass feed, as if any of above tasks failed, feed's value will be a FailedTrigger(?)
results.append(feed)
This section of your flow context is appending to results
at build time. What this means is that it would look something like this:
# results
[<Task: failure_recovery>, <Task: failure_recovery>]
This means that downstream tasks that try to access that that at runtime are trying to work with task objects, instead of the results of your tasks as you expect.
Typically the way I think about building flows is that anything in the with Flow() as flow:
block should read as if you're describing the structure of your application; anything in a task block should read as describing runtime logic.
So you might build your flow something like this:
from prefect import Flow, task, unmapped
@task
def get_feeds():
return [
{"name": "a", "do_alpha": False},
{"name": "a", "do_alpha": False, "raise_an_err": True},
]
@task
def get_collection(feed, date):
return feed.name + "_collection_" + date
@task
def get_formatted_date():
return datetime.datetime.today().strftime("%Y%m%d")
@task(log_stdout=True)
def do_alpha(feed, collection):
print(collection + " did alpha")
return feed
@task(log_stdout=True)
def do_bravo(feed, collection):
if feed.raise_an_err:
raise Exception(collection + " could not do bravo")
print(collection + " did bravo")
return feed
@task
def filter_feeds(feeds, ref):
return filter(lambda feed: feed.name == ref, feeds)
@task
def merge_feeds(*args):
feeds = []
for feed in args:
feeds.extend(feed)
return feeds
with Flow("Demo flow") as flow:
# get the feeds we want to iterate over
feeds = get_feeds()
# get the formatted date
# (note that this is also in the flow run context that you can access in a task with prefect.context)
today = get_formatted_date()
collections = get_collection.map(feeds, unmapped(get_formatted_date))
# create branches from your feeds that you perform some operations on
alpha_feeds = filter_feeds(feeds, "alpha")
bravo_feeds = filter_feeds(feeds, "bravo")
# merge the data sets back together
merged_feeds = merge_feeds(alpha_feeds=alpha_feeds, bravo_feeds=bravo_feeds)
# prepare_for_aggregation(upstream_tasks=[merged_feeds])
# etc etc
alex
09/22/2020, 4:53 PMnicholas
09/22/2020, 5:05 PM