CA Lee
07/12/2021, 2:40 AMKevin Kho
if statement missing a `=there? It should be==righrt?
Using.run()` to use a task inside a task should work, as it just calls the underlying function underneath. similar to using transform, load, and extract as functions, you will not get the visibility.
So you can actually use an if-else type of logic with the case task. You can read more on this here . For your specific use case, you could do a case inside the Flow do see if selection is all, if it is, run all the reports. If not, just run the selection. This requires the list of reports to be defined beforehand, but I think you have that so this could work.
But actually, given that the extract-transform-load pattern is the same, this seems like a prime use case for mapping? You can do something like:
with Flow('test_flow') as flow:
selection = Parameter('selection', default=reports.keys())
values = task(lambda x: reports[x]).map(selection)
extracted = extract.map(selection, values)
transformed = transform.map(extracted)
load.map(transformed)
This logic is now agnostic to if the flow is running 1 or more reports as long as your input is a List.CA Lee
07/12/2021, 4:45 AMCA Lee
07/12/2021, 4:46 AMfrom prefect import task, Flow, Parameter
reports = {
'1': report_one,
'2': report_two
}
@task
def extract(report_number, report):
do_something()
@task
def transform(extracted):
do_something()
@task
def load(transformed):
do_something()
@task
def selector(selection):
# Loop through all items in the dict
if selection == 'all':
for report_number, report in reports.items():
extracted = extract(report_number=report_number, report=report)
transformed = transform(extracted)
load(transformed)
# Select a single item from the dict
else:
extracted = extract(report_number=selection, report=reports[selection])
transformed = transform(extracted)
load(transformed)
with Flow('test_flow') as flow:
selection = Parameter('selection', default='all')
selector(selection=selection)
But when the flow is run, the error encountered is:
ValueError: Could not infer an active Flow context while creating edge to <Task: extract>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `extract.run(...)`
Have tried:
• Appending .run() to extract(), transform(), load() in selector()
• Removing the @task for the extract, transform and load functions. This works, but in case of any failure, I would not know at which step the script failed
• Could try calling the @task functions within the Flow context, but not sure how to use the Parameter without writing a function for it
with Flow('test_flow') as flow:
# How to use Parameter here to control select all or one?
extracted = extract(report_number=report_number, report=report)
transformed = transform(extracted)
load(transformed)Kevin Kho
CA Lee
07/12/2021, 4:48 AMKevin Kho
Kevin Kho
Kevin Kho
from prefect import task, Flow, Parameter
import prefect
reports = {
'1': "report_one",
'2': "report_two"
}
@task
def extract(report_number, report):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"extract number {report_number}")
<http://logger.info|logger.info>(f"extract report {report}")
return report_number
@task
def transform(extracted):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"transform number {extracted}")
return extracted
@task
def load(transformed):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"load number {transformed}")
return transformed
with Flow('test_flow') as flow:
selection = Parameter('selection', default=list(reports.keys()))
values = task(lambda x: reports[x]).map(selection)
extracted = extract.map(selection, values)
transformed = transform.map(extracted)
load.map(transformed)
# flow.run()
flow.run(parameters={"selection": ["1"]})CA Lee
07/15/2021, 2:15 AM.map() instead of if/else (I've tried to generalize use cases in the script below):
from prefect import task, Flow, Parameter, unmapped
selections = {
"01": selection_one,
"02": selection_two
}
@task
def extract(arg_one, arg_two, constant):
return do_something(arg_one, arg_two, constant)
with Flow('test_flow') as flow:
# Get the keys of the selections dict
selection = Parameter('selection', default=list(selections.keys()))
# Map over those keys to get the dict values
selected = task(lambda x: selections[x]).map(selection)
extract.map(
# Pass in the keys / values using .map()
arg_one=selection,
arg_two=selected,
# Sends the input(constant) to all mapped tasks
unmapped(constant)
)
# flow.run()
flow.run(parameters={"selection": ["01"]})CA Lee
07/15/2021, 2:23 AMfrom prefect.executors import LocalDaskExecutor
# Uses the default scheduler (threads)
flow.executor = LocalDaskExecutor()