Hi! In my flow I call my tasks like this. I am exp...
# prefect-community
w
Hi! In my flow I call my tasks like this. I am experiencing a problem where the snowflake_load is running before the upload_to_s3 task.
Copy code
today, dir = get_data(endpoints=api_endpoints)
    upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints)
k
Hey @Wei Mei, you can find the syntax to explicitly define upstream tasks here
gratitude thank you 1
w
this looks to have solved it!
Copy code
today, dir = get_data(endpoints=api_endpoints)
    upload = upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints).set_upstream(upload)
👍 1
Hi kevin, I found the schematic page and it looks cool; it shows that my flow is definitely not what I thought it would be.
the endpoints=api_endpoints is a list of three strings that I use in loops. I think I should be using the map feature but I still don’t fully understand it yet.
In the tasks I see three tasks Get Data (FunctionTask), Get Data [0] Getitem, and Get Data [1] Getitem.
in the logs it looks the Getitems didnt do anything, the FunctionTask printed out the 3 lines of log for the 3 items in my list.
k
Can I see the code?
w
sure!
Copy code
# returned as pandas df's
    for endpoint in endpoints:
        file = f"{endpoint}-{today}.csv"
        df = get_viewer(seg, endpoint)
        if not df.empty:
            <http://logger.info|logger.info>(f"{endpoint} has data")
            df.to_csv(f"{dir}/{file}", index=False)
            <http://logger.info|logger.info>(f"{file} created.")
            
        else:
            <http://logger.info|logger.info>(f"{endpoint} is empty: {today}")
k
is this inside the
Flow
?
w
this is a task get_data()
this is whats in my flow:
Copy code
api_endpoints = [ "site1", "site2", "site3" ]
    today, dir = get_data(endpoints=api_endpoints)
    upload = upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints).set_upstream(upload)
k
I think this works though right? Or is there something wrong? I think the GetItem is a task Prefect is adding right?
w
yes, it working so far.
was curious what the two Getitem tasks were.
Thanks for your time Kevin. My POC is running now.
thank you and everyone else for the hand holding.
k
Prefect adds intermediate tasks to put stuff in list or unpack them sometimes. I don’t know exactly where in this case but I think it’s related to the api-endpoints being a list
w
gotcha. I was worried based on the schematic that the snowflake_load task might run before the upload_to_s3 because of the way it is drawn, but the .set_upstream() should take care of that so I wont ask about the magic behind this one 😄.
k
Mapping perfectly fits your use case though so you don’t need to loop inside the task btw, you should try it out
w
i definitely will!
Have a great weekend!