CA Lee
07/12/2021, 2:40 AMKevin Kho
if
statement missing a `=there? It should be
==righrt?
Using
.run()` to use a task inside a task should work, as it just calls the underlying function underneath. similar to using transform
, load
, and extract
as functions, you will not get the visibility.
So you can actually use an if-else
type of logic with the case
task. You can read more on this here . For your specific use case, you could do a case
inside the Flow do see if selection
is all
, if it is, run all the reports. If not, just run the selection. This requires the list of reports to be defined beforehand, but I think you have that so this could work.
But actually, given that the extract
-transform
-load
pattern is the same, this seems like a prime use case for mapping? You can do something like:
with Flow('test_flow') as flow:
selection = Parameter('selection', default=reports.keys())
values = task(lambda x: reports[x]).map(selection)
extracted = extract.map(selection, values)
transformed = transform.map(extracted)
load.map(transformed)
This logic is now agnostic to if the flow is running 1 or more reports as long as your input is a List.CA Lee
07/12/2021, 4:45 AMCA Lee
07/12/2021, 4:46 AMfrom prefect import task, Flow, Parameter
reports = {
'1': report_one,
'2': report_two
}
@task
def extract(report_number, report):
do_something()
@task
def transform(extracted):
do_something()
@task
def load(transformed):
do_something()
@task
def selector(selection):
# Loop through all items in the dict
if selection == 'all':
for report_number, report in reports.items():
extracted = extract(report_number=report_number, report=report)
transformed = transform(extracted)
load(transformed)
# Select a single item from the dict
else:
extracted = extract(report_number=selection, report=reports[selection])
transformed = transform(extracted)
load(transformed)
with Flow('test_flow') as flow:
selection = Parameter('selection', default='all')
selector(selection=selection)
But when the flow is run, the error encountered is:
ValueError: Could not infer an active Flow context while creating edge to <Task: extract>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `extract.run(...)`
Have tried:
• Appending .run()
to extract()
, transform()
, load()
in selector()
• Removing the @task
for the extract, transform and load functions. This works, but in case of any failure, I would not know at which step the script failed
• Could try calling the @task
functions within the Flow context, but not sure how to use the Parameter without writing a function for it
with Flow('test_flow') as flow:
# How to use Parameter here to control select all or one?
extracted = extract(report_number=report_number, report=report)
transformed = transform(extracted)
load(transformed)
Kevin Kho
CA Lee
07/12/2021, 4:48 AMKevin Kho
Kevin Kho
Kevin Kho
from prefect import task, Flow, Parameter
import prefect
reports = {
'1': "report_one",
'2': "report_two"
}
@task
def extract(report_number, report):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"extract number {report_number}")
<http://logger.info|logger.info>(f"extract report {report}")
return report_number
@task
def transform(extracted):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"transform number {extracted}")
return extracted
@task
def load(transformed):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"load number {transformed}")
return transformed
with Flow('test_flow') as flow:
selection = Parameter('selection', default=list(reports.keys()))
values = task(lambda x: reports[x]).map(selection)
extracted = extract.map(selection, values)
transformed = transform.map(extracted)
load.map(transformed)
# flow.run()
flow.run(parameters={"selection": ["1"]})
CA Lee
07/15/2021, 2:15 AM.map()
instead of if/else (I've tried to generalize use cases in the script below):
from prefect import task, Flow, Parameter, unmapped
selections = {
"01": selection_one,
"02": selection_two
}
@task
def extract(arg_one, arg_two, constant):
return do_something(arg_one, arg_two, constant)
with Flow('test_flow') as flow:
# Get the keys of the selections dict
selection = Parameter('selection', default=list(selections.keys()))
# Map over those keys to get the dict values
selected = task(lambda x: selections[x]).map(selection)
extract.map(
# Pass in the keys / values using .map()
arg_one=selection,
arg_two=selected,
# Sends the input(constant) to all mapped tasks
unmapped(constant)
)
# flow.run()
flow.run(parameters={"selection": ["01"]})
CA Lee
07/15/2021, 2:23 AMfrom prefect.executors import LocalDaskExecutor
# Uses the default scheduler (threads)
flow.executor = LocalDaskExecutor()