Hi! In my flow I call my tasks like this. I am exp...
# prefect-community
Hi! In my flow I call my tasks like this. I am experiencing a problem where the snowflake_load is running before the upload_to_s3 task.
Copy code
today, dir = get_data(endpoints=api_endpoints)
    upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints)
Hey @Wei Mei, you can find the syntax to explicitly define upstream tasks here
gratitude thank you 1
this looks to have solved it!
Copy code
today, dir = get_data(endpoints=api_endpoints)
    upload = upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints).set_upstream(upload)
👍 1
Hi kevin, I found the schematic page and it looks cool; it shows that my flow is definitely not what I thought it would be.
the endpoints=api_endpoints is a list of three strings that I use in loops. I think I should be using the map feature but I still don’t fully understand it yet.
In the tasks I see three tasks Get Data (FunctionTask), Get Data [0] Getitem, and Get Data [1] Getitem.
in the logs it looks the Getitems didnt do anything, the FunctionTask printed out the 3 lines of log for the 3 items in my list.
Can I see the code?
Copy code
# returned as pandas df's
    for endpoint in endpoints:
        file = f"{endpoint}-{today}.csv"
        df = get_viewer(seg, endpoint)
        if not df.empty:
            <http://logger.info|logger.info>(f"{endpoint} has data")
            df.to_csv(f"{dir}/{file}", index=False)
            <http://logger.info|logger.info>(f"{file} created.")
            <http://logger.info|logger.info>(f"{endpoint} is empty: {today}")
is this inside the
this is a task get_data()
this is whats in my flow:
Copy code
api_endpoints = [ "site1", "site2", "site3" ]
    today, dir = get_data(endpoints=api_endpoints)
    upload = upload_to_s3(today, dir, endpoints=api_endpoints)
    snowflake_load(today, schema="statistics", endpoints=api_endpoints).set_upstream(upload)
I think this works though right? Or is there something wrong? I think the GetItem is a task Prefect is adding right?
yes, it working so far.
was curious what the two Getitem tasks were.
Thanks for your time Kevin. My POC is running now.
thank you and everyone else for the hand holding.
Prefect adds intermediate tasks to put stuff in list or unpack them sometimes. I don’t know exactly where in this case but I think it’s related to the api-endpoints being a list
gotcha. I was worried based on the schematic that the snowflake_load task might run before the upload_to_s3 because of the way it is drawn, but the .set_upstream() should take care of that so I wont ask about the magic behind this one 😄.
Mapping perfectly fits your use case though so you don’t need to loop inside the task btw, you should try it out
i definitely will!
Have a great weekend!