Hello, trying to find a better way to use paramete...
# ask-community
c
Hello, trying to find a better way to use parameters for control flow using a dictionary (code in thread)
k
Hey @CA Lee, thanks for the detailed writeup! Could you move the code block to this thread just so we don't crowd the main channel too much? Couple of options here, but before we do there, is your
if
statement missing a `=
there? It should be
==
Copy code
righrt?

Using
.run()` to use a task inside a task should work, as it just calls the underlying function underneath. similar to using
transform
,
load
, and
extract
as functions, you will not get the visibility. So you can actually use an
if-else
type of logic with the
case
task. You can read more on this here . For your specific use case, you could do a
case
inside the Flow do see if
selection
is
all
, if it is, run all the reports. If not, just run the selection. This requires the list of reports to be defined beforehand, but I think you have that so this could work. But actually, given that the
extract
-
transform
-
load
pattern is the same, this seems like a prime use case for mapping? You can do something like:
Copy code
with Flow('test_flow') as flow:
    selection = Parameter('selection', default=reports.keys())
    values = task(lambda x: reports[x]).map(selection)
    extracted = extract.map(selection, values)
    transformed = transform.map(extracted)
    load.map(transformed)
This logic is now agnostic to if the flow is running 1 or more reports as long as your input is a List.
c
Sure, one sec, I'll move the code
Copy code
from prefect import task, Flow, Parameter

reports = {
    '1': report_one,
    '2': report_two
}

@task
def extract(report_number, report):
    do_something()

@task
def transform(extracted):
    do_something()

@task
def load(transformed):
    do_something()        

@task
def selector(selection):
    
    # Loop through all items in the dict
    if selection == 'all':
        for report_number, report in reports.items():
            extracted = extract(report_number=report_number, report=report)
            transformed = transform(extracted)
            load(transformed)
    
    # Select a single item from the dict
    else:
        extracted = extract(report_number=selection, report=reports[selection])
        transformed = transform(extracted)
        load(transformed)

with Flow('test_flow') as flow:
    selection = Parameter('selection', default='all')
    selector(selection=selection)
But when the flow is run, the error encountered is:
Copy code
ValueError: Could not infer an active Flow context while creating edge to <Task: extract>. This often means you called a task outside a `with Flow(...)` block. If you're trying to run this task outside of a Flow context, you need to call `extract.run(...)`
Have tried: • Appending
.run()
to
extract()
,
transform()
,
load()
in
selector()
• Removing the
@task
for the extract, transform and load functions. This works, but in case of any failure, I would not know at which step the script failed • Could try calling the
@task
functions within the Flow context, but not sure how to use the Parameter without writing a function for it
Copy code
with Flow('test_flow') as flow:
    # How to use Parameter here to control select all or one?
    extracted = extract(report_number=report_number, report=report)
    transformed = transform(extracted)
    load(transformed)
k
Thanks!
c
No worries. I'll take a bit of time to digest your comments @Kevin Kho, will reach out if I need further help, thank you for taking the time
k
Of course! Just respond here and I'll see it tom.
Just a note to make sure you map over lists. Mapping over strings will work because it's an iterable. It will run letter by letter.
I just tested this. This is fully working code (had to edit a bit):
Copy code
from prefect import task, Flow, Parameter
import prefect

reports = {
    '1': "report_one",
    '2': "report_two"
}

@task
def extract(report_number, report):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"extract number {report_number}")
    <http://logger.info|logger.info>(f"extract report {report}")
    return report_number

@task
def transform(extracted):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"transform number {extracted}")
    return extracted

@task
def load(transformed):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(f"load number {transformed}")
    return transformed
      

with Flow('test_flow') as flow:
    selection = Parameter('selection', default=list(reports.keys()))
    values = task(lambda x: reports[x]).map(selection)
    extracted = extract.map(selection, values)
    transformed = transform.map(extracted)
    load.map(transformed)

# flow.run()
flow.run(parameters={"selection": ["1"]})
c
@Kevin Kho has been really helpful on this one. If anyone else is looking to parameterize selection using a dict, you can consider Prefect's
.map()
instead of if/else (I've tried to generalize use cases in the script below):
Copy code
from prefect import task, Flow, Parameter, unmapped

selections = {
    "01": selection_one,
    "02": selection_two
}

@task
def extract(arg_one, arg_two, constant):
    return do_something(arg_one, arg_two, constant)

with Flow('test_flow') as flow: 

    # Get the keys of the selections dict
    selection = Parameter('selection', default=list(selections.keys()))

    # Map over those keys to get the dict values
    selected = task(lambda x: selections[x]).map(selection)
    
    extract.map(
        
        # Pass in the keys / values using .map()
        arg_one=selection,
        arg_two=selected,
        
        # Sends the input(constant) to all mapped tasks
        unmapped(constant) 
    )

# flow.run()
flow.run(parameters={"selection": ["01"]})
Your code can also run in parallel after adding the below 2 lines. Amazing:
Copy code
from prefect.executors import LocalDaskExecutor

# Uses the default scheduler (threads)
flow.executor = LocalDaskExecutor()
101 Views