https://prefect.io logo
c

Christopher Schmitt

02/18/2022, 3:09 PM
Hi Prefect community, I recently started to use Prefect and so far I mostly like it. Sometimes I can't wrap my head around how to apply the Prefect concepts. As in this problem I'm having now: I want to write a script that queries database tables and writes the data to a blob storage. The tables have to be queried with a number of different parameters (dates). So what I need is a product of tables x dates. I solved this by using a task named "create_input_tuples" that maps on the list of dates and gets the unmapped(tables) as an argument, too. Inside the create_input_tuples I loop over the tables and return a list of tuples (for each date) which I flatten then. Question 1: Is this the best approach or are there other ways to achieve that? After the queries have run, I filter for empty tables (which can happen) with a FilterTask and want to write the data from these tables to the blob. For this I need the table_name and date (for the naming). Question 2: Is there a way to keep track of the names and dates (also taking into account the filtering)? Usually I had a task returning a list of tables, to which I mapped the querying task, to which I mapped the load_blob task and the list of tables from the beginning. Without the filter this has always worked since the table_names and the tasks with the data from database aligned and could be mapped to the blob task. To make it a little clearer (I hope) I drew this picture, I wasn't sure if the Prefect visualization was really helping. Maybe you could help me find the prefect solution for this! Christopher
k

Kevin Kho

02/18/2022, 3:32 PM
Hi @Christopher Schmitt, awesome effort in the diagram! Yes to question1 , you need an intermediate task to accomplish the cross product of tables and dates. I think what you have sounds good with the FilterTask, but you can also
raise SKIP
on a certain condition and it will propagate
That way it will always align because there is no FilterTask that changes the shape right?
c

Christopher Schmitt

02/18/2022, 3:57 PM
Thanks for your answers @Kevin Kho. I think my first impulse was to extent what I am returning, i.e. instead of returning the data, returning the data, the table_name and the date in a dict. But I would have to change all tasks involved. I was wondering if it is possible to get the arguments that a task was called with from another task? Not sure if the
raise SKIP
would solve it, though, or not getting it yet at least. I mean it is probably equivalent to the FilterTask, but how would I filter the list of table_names?
k

Kevin Kho

02/18/2022, 3:59 PM
Let me draft an example to express my thoughts
👍 1
Check this:
Copy code
from prefect import Flow, task
from prefect.engine.signals import SKIP

@task
def numbers():
    return [1,2]

@task
def letters():
    return ["a","b", "c"]

@task
def cross_product(numbers, letters):
    res = []
    for letter in letters:
        for num in numbers:
            res.append({"num":num, "letter":letter})
    return res

@task(nout=2)
def pull_numbers_and_letters(items):
    numbers = [x["num"] for x in items]
    letter = [x["letter"] for x in items]
    return numbers, letter

@task
def pull_numbers():
    return ["a","b", "c"]

@task
def step1(item):
    if item["num"] == 1:
        raise SKIP("Skipping based on condition")
    return item

@task
def step2(item):
    return item

@task
def step3(item, number, letter):
    assert number == item["num"]
    assert letter == item["letter"]
    return item


with Flow('...') as flow:
    n = numbers()
    l = letters()
    map_items = cross_product(n,l)
    mapped_numbers, mapped_letters = pull_numbers_and_letters(map_items)
    first = step1.map(map_items)
    second = step2.map(first)
    third = step3.map(second, mapped_numbers, mapped_letters)

flow.run()
I created the cross product, made an intermediate task to pull out the inputs, and passed those inputs back in later. Because I used
SKIP
instead of
Filter
, the lists remain a fixed length throughout and my assert statements in
step3
work
I dont know yet if this is the optimal approach, but I think it’s a good place to start thinking
c

Christopher Schmitt

02/18/2022, 4:31 PM
Looks good, I'll need to dig into that and understand it. Thanks a lot, @Kevin Kho!
4 Views