https://prefect.io logo
Title
w

Wei Mei

04/01/2022, 3:30 PM
Hi! In my flow I call my tasks like this. I am experiencing a problem where the snowflake_load is running before the upload_to_s3 task.
today, dir = get_data(endpoints=api_endpoints)
    upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints)
k

Kevin Kho

04/01/2022, 3:32 PM
Hey @Wei Mei, you can find the syntax to explicitly define upstream tasks here
:gratitude-thank-you: 1
w

Wei Mei

04/01/2022, 3:36 PM
this looks to have solved it!
today, dir = get_data(endpoints=api_endpoints)
    upload = upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints).set_upstream(upload)
👍 1
Hi kevin, I found the schematic page and it looks cool; it shows that my flow is definitely not what I thought it would be.
the endpoints=api_endpoints is a list of three strings that I use in loops. I think I should be using the map feature but I still don’t fully understand it yet.
In the tasks I see three tasks Get Data (FunctionTask), Get Data [0] Getitem, and Get Data [1] Getitem.
in the logs it looks the Getitems didnt do anything, the FunctionTask printed out the 3 lines of log for the 3 items in my list.
k

Kevin Kho

04/02/2022, 7:19 PM
Can I see the code?
w

Wei Mei

04/02/2022, 7:21 PM
sure!
# returned as pandas df's
    for endpoint in endpoints:
        file = f"{endpoint}-{today}.csv"
        df = get_viewer(seg, endpoint)
        if not df.empty:
            <http://logger.info|logger.info>(f"{endpoint} has data")
            df.to_csv(f"{dir}/{file}", index=False)
            <http://logger.info|logger.info>(f"{file} created.")
            
        else:
            <http://logger.info|logger.info>(f"{endpoint} is empty: {today}")
k

Kevin Kho

04/02/2022, 7:25 PM
is this inside the
Flow
?
w

Wei Mei

04/02/2022, 7:27 PM
this is a task get_data()
this is whats in my flow:
api_endpoints = [ "site1", "site2", "site3" ]
    today, dir = get_data(endpoints=api_endpoints)
    upload = upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints).set_upstream(upload)
k

Kevin Kho

04/02/2022, 7:31 PM
I think this works though right? Or is there something wrong? I think the GetItem is a task Prefect is adding right?
w

Wei Mei

04/02/2022, 7:33 PM
yes, it working so far.
was curious what the two Getitem tasks were.
Thanks for your time Kevin. My POC is running now.
thank you and everyone else for the hand holding.
k

Kevin Kho

04/02/2022, 7:46 PM
Prefect adds intermediate tasks to put stuff in list or unpack them sometimes. I don’t know exactly where in this case but I think it’s related to the api-endpoints being a list
w

Wei Mei

04/02/2022, 7:50 PM
gotcha. I was worried based on the schematic that the snowflake_load task might run before the upload_to_s3 because of the way it is drawn, but the .set_upstream() should take care of that so I wont ask about the magic behind this one 😄.
k

Kevin Kho

04/02/2022, 7:52 PM
Mapping perfectly fits your use case though so you don’t need to loop inside the task btw, you should try it out
w

Wei Mei

04/02/2022, 7:54 PM
i definitely will!
Have a great weekend!