Hello all, We have a situation where we are running two Snowflake "COPY INTO" statements sequential...
b
Hello all, We have a situation where we are running two Snowflake "COPY INTO" statements sequentially using the prefect_snowflake package and it's a taking a while. Since there is no dependency between these statements, we would like to run them in parallel. What is the best way to implement this pattern? For added context, we tried using the snowflake_multiquery function which does run both statements but still in sequential order. Any help is greatly appreciated. Thank you in advance!
n
hi @Bryan!
do you have a task thats responsible for running the
COPY INTO
statements? if so, it should be pretty easy to just
run_copy_into.map([args1, args2])
b
hi @Nate, I am flexible as I do not have anything running right now. So, I can setup the copy into's to run as a subflow or task.
I'm not sure I understand the ".map" function. Can you point me to some docs on that?
n
great! i would write a task e.g.
run_copy_into
that uses the snowflake lib however you need to run the COPY INTO then you can define a list of parameters that this task would accept e.g.
Copy code
In [1]: from prefect import flow, task

In [2]: @task
   ...: def run_copy_into(sql: str):
   ...:     print(f"Running {sql}")
   ...:     return True if "Successful" else False
   ...:

In [3]: @flow(log_prints=True)
   ...: def pipeline():
   ...:     futures = run_copy_into.map(["COPY INTO a...", "COPY INTO b..."])
   ...:     assert all(f.result() for f in futures)
   ...:

In [4]: pipeline()
16:39:16.176 | INFO    | prefect.engine - Created flow run 'laughing-mamba' for flow 'pipeline'
16:39:16.516 | INFO    | Flow run 'laughing-mamba' - Created task run 'run_copy_into-0' for task 'run_copy_into'
16:39:16.517 | INFO    | Flow run 'laughing-mamba' - Submitted task run 'run_copy_into-0' for execution.
16:39:16.529 | INFO    | Flow run 'laughing-mamba' - Created task run 'run_copy_into-1' for task 'run_copy_into'
16:39:16.529 | INFO    | Flow run 'laughing-mamba' - Submitted task run 'run_copy_into-1' for execution.
16:39:16.711 | INFO    | Task run 'run_copy_into-0' - Running COPY INTO a...
16:39:16.829 | INFO    | Task run 'run_copy_into-0' - Finished in state Completed()
16:39:16.995 | INFO    | Task run 'run_copy_into-1' - Running COPY INTO b...
16:39:17.094 | INFO    | Task run 'run_copy_into-1' - Finished in state Completed()
16:39:17.235 | INFO    | Flow run 'laughing-mamba' - Finished in state Completed('All states completed.')
Out[4]:
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `bool`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `bool`'))]
šŸ‘ 1
b
n
if you want to avoid mapping over some inputs,
unmapped
is your friend
b
I will give it a try. Thank you for the help and info. I appreciate it!
n
šŸ‘
b
@Nate, that looks like it will work quite nicely. I have a question since I am new to whole parallel processing scenario and have little to no experience with packages like asyncio. I am assuming the
assert all(f.result() for f in futures)
is ensuring that both copy into's finished successfully?
n
no worries! when you wrap a python function in a
@task
, you get a new method on that thing called
.submit
which returns a future right away (before the task runs) which you can call
.result()
on sometime else to get the value from
Copy code
@task
def passthrough(value):
  return value

future: PrefectFuture = foo.submit(42)
assert future.result() == 42
this is nice because if you want to do stuff at the same time, you can submit a bunch of things together
.map
is another method you get on tasks, and that is just doing
submit()
for each thing in the list, so you get a list of futures back in my original example, i was just returning
bool
from the demo task to signal "hey did this sql finish correctly?" which you'd implement in your task, and
all
is only true if every future's result turned out
True
does that help? here's another example where i have lots of comments if thats helpful
b
Ok. I believe I understand that. What happens if one of the tasks fail out of the bunch. How is that handled?
Also, I see the use of future.wait() for the task. Is there such a thing as a wait() for the map? Meaning, I want to run all the tasks in the map and wait for all of those to finish before moving on.
n
this will wait for all of them to finish and give you all the results from your mapped tasks
Copy code
results = [f.result() for f in futures]
let me put together an example for handling failure, one minute
šŸ‘ 1
b
I think that makes sense
n
yeah basically each
some_future.result()
call is going to wait for the associated tasks's result, so that snippet above is doing that in a row for each task's future now to handle failure, you can use another thing you get wrap a function in a
@task
which is a bool flag
return_state=True
Copy code
from prefect import flow, task, unmapped


@task
def fails_above_threshold(value: int, threshold: int) -> int:
    if value > threshold:
        raise ValueError("Value is above threshold")
    return value


@flow
def my_flow():
    list_of_numbers = list(range(10))

    states = fails_above_threshold.map(
        list_of_numbers, threshold=unmapped(5), return_state=True # same threshold of 5, for each mapped task
    )

    failed_states = [state for state in states if state.is_failed()]
    print(f"Oh no! {len(failed_states)} states failed!")

    completed_states = [state for state in states if state.is_completed()]

    results = []
    for state in completed_states:
        result = state.result()
        print(f"Finished successfully with value {result}")
        results.append(result)

    return results


if __name__ == "__main__":
    assert my_flow() == [0, 1, 2, 3, 4, 5]
this example should be copy-pastable for you if you want to try it out but essentially you'll get
Copy code
Oh no! 4 states failed!
Finished successfully with value 0
Finished successfully with value 1
Finished successfully with value 2
Finished successfully with value 3
Finished successfully with value 4
Finished successfully with value 5
18:15:55.701 | INFO    | Flow run 'loutish-ringtail' - Finished in state Completed()
šŸ™ 1
even though some of those tasks fail, you can do logic on the
states
you get back - so that downstream, you can only operate on results associated with
Completed
task runs if you want
b
That's perfect. I will build some of that logic into my scenario. This is much appreciated!
catjam 1
n
glad I could help!
b
@Nate, one last question. I have all that logic you provided implemented. In my case my Task returns a dictionary of a few values. The approach you provided:
Copy code
completed_states = [state for state in states if state.is_completed()]

    results = []
    for state in completed_states:
        result = state.result()
        print(f"Finished successfully with value {result}")
        results.append(result)
I don't see how I can get access to the returned dict from my task. For me, the .result() is always None.
n
are you able to share your code? seems like there are two likely possibilities: • your task is actually returning
None
• you are calling
.result()
on a
state
outside the context of a flow run where you did not persist the result
b
Hey @Nate, didn't mean to leave you hanging yesterday. I had to take care of a few things. Here is the gist of what I have: Flow:
Copy code
@flow(
    name="Load_Daily_Extract",
    flow_run_name=f"",
    description="Runs the daily load from AWS S3.",
    log_prints=env_vars.Print_Logging,
)
def load_daily_extract():
    function_name = load_daily_extract.__name__ + "()"
    logger = get_run_logger()
    logger.info("Starting daily data extract & load.")

    try:
        extract_start_date = set_extract_start_date()

        # get/generate list of copy into statements and metadata (i.e. {"COPY INTO...", "Policy Data"})
        statements = assemble_statements(extract_start_date)

        states = run_copy_into.map(list(statements.values()), list(statements.keys()), return_state=True)

        failed_states = [state for state in states if state.is_failed()]
        completed_states = [state for state in states if state.is_completed()]

        for state in failed_states:
            result = state.result()
            logger.error(result['description'])
            raise Exception(result)

        for state in completed_states:
            result = state.result()
            logger.info(result['description'])
Task:
Copy code
@task
def run_copy_into(sql_query: str, query_name: str) -> dict:
    logger = get_run_logger()

    try:
        result = snowflake.snowflake_execute(
            snowflake_block=env_vars.Snowflake_Connector_Block,
            database=env_vars._Db,
            query=sql_query,
            schema=env_vars.EXTRACT_SCHEMA,
            warehouse=env_vars.Snowflake_Warehouse_Daily)
                
            # print(results)
        return result

        logger.info("S3 files have been loaded.")
        logger.info("------------------------------------------------------------")
        return { "status": "success", "description": f"{query_name} from S3 have been loaded."}
    except Exception as ex:
        logger.error(
            f"ERROR: {query_name} COPY INTO failed - {str(ex)}"
        )
        return { "status": "failed", "description": f"{query_name} from S3 failed to load."}
One thing I just realized is that we have our own package where we have wrapped a couple specific prefect_snowflake fuctions that we can pass in all information in one call rather than making multiple calls. This would include providing info like warehouse, database, schema, etc. That
snowflake.snowflake_execute
call returns an object with a status and description of the execution.
I don't know if that has anything to do with it or not but it does add an extra layer.
n
hey Bryan! no worries! it looks like in the above, you have 2
return
statements in the success branch of your
try
Copy code
try:
        result = snowflake.snowflake_execute(
            snowflake_block=env_vars.Snowflake_Connector_Block,
            database=env_vars._Db,
            query=sql_query,
            schema=env_vars.EXTRACT_SCHEMA,
            warehouse=env_vars.Snowflake_Warehouse_Daily)
                
            # print(results)
        return result # FIRST

        <http://logger.info|logger.info>("S3 files have been loaded.")
        <http://logger.info|logger.info>("------------------------------------------------------------")
        return { "status": "success", "description": f"{query_name} from S3 have been loaded."} # SECOND
šŸ‘ 1
are you intending to
return
the result from
snowflake.snowflake_execute
?
b
Good catch! I will test it but I am sure you are correct. That one is a little embarrasing
n
no worries! sometimes you just need a second set of eyes šŸ˜„
šŸ‘ 1