Bryan
05/02/2024, 7:41 PMNate
05/02/2024, 9:32 PMNate
05/02/2024, 9:32 PMCOPY INTO
statements?
if so, it should be pretty easy to just run_copy_into.map([args1, args2])
Bryan
05/02/2024, 9:34 PMBryan
05/02/2024, 9:37 PMNate
05/02/2024, 9:39 PMrun_copy_into
that uses the snowflake lib however you need to run the COPY INTO
then you can define a list of parameters that this task would accept e.g.
In [1]: from prefect import flow, task
In [2]: @task
...: def run_copy_into(sql: str):
...: print(f"Running {sql}")
...: return True if "Successful" else False
...:
In [3]: @flow(log_prints=True)
...: def pipeline():
...: futures = run_copy_into.map(["COPY INTO a...", "COPY INTO b..."])
...: assert all(f.result() for f in futures)
...:
In [4]: pipeline()
16:39:16.176 | INFO | prefect.engine - Created flow run 'laughing-mamba' for flow 'pipeline'
16:39:16.516 | INFO | Flow run 'laughing-mamba' - Created task run 'run_copy_into-0' for task 'run_copy_into'
16:39:16.517 | INFO | Flow run 'laughing-mamba' - Submitted task run 'run_copy_into-0' for execution.
16:39:16.529 | INFO | Flow run 'laughing-mamba' - Created task run 'run_copy_into-1' for task 'run_copy_into'
16:39:16.529 | INFO | Flow run 'laughing-mamba' - Submitted task run 'run_copy_into-1' for execution.
16:39:16.711 | INFO | Task run 'run_copy_into-0' - Running COPY INTO a...
16:39:16.829 | INFO | Task run 'run_copy_into-0' - Finished in state Completed()
16:39:16.995 | INFO | Task run 'run_copy_into-1' - Running COPY INTO b...
16:39:17.094 | INFO | Task run 'run_copy_into-1' - Finished in state Completed()
16:39:17.235 | INFO | Flow run 'laughing-mamba' - Finished in state Completed('All states completed.')
Out[4]:
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `bool`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `bool`'))]
Bryan
05/02/2024, 9:39 PMNate
05/02/2024, 9:40 PMunmapped
is your friendBryan
05/02/2024, 9:40 PMNate
05/02/2024, 9:40 PMBryan
05/02/2024, 10:37 PMassert all(f.result() for f in futures)
is ensuring that both copy into's finished successfully?Nate
05/02/2024, 10:49 PM@task
, you get a new method on that thing called .submit
which returns a future right away (before the task runs) which you can call .result()
on sometime else to get the value from
@task
def passthrough(value):
return value
future: PrefectFuture = foo.submit(42)
assert future.result() == 42
this is nice because if you want to do stuff at the same time, you can submit a bunch of things together
.map
is another method you get on tasks, and that is just doing submit()
for each thing in the list, so you get a list of futures back
in my original example, i was just returning bool
from the demo task to signal "hey did this sql finish correctly?" which you'd implement in your task, and all
is only true if every future's result turned out True
Nate
05/02/2024, 10:52 PMBryan
05/02/2024, 10:54 PMBryan
05/02/2024, 10:58 PMNate
05/02/2024, 11:02 PMresults = [f.result() for f in futures]
let me put together an example for handling failure, one minuteBryan
05/02/2024, 11:03 PMNate
05/02/2024, 11:16 PMsome_future.result()
call is going to wait for the associated tasks's result, so that snippet above is doing that in a row for each task's future
now to handle failure, you can use another thing you get wrap a function in a @task
which is a bool flag return_state=True
from prefect import flow, task, unmapped
@task
def fails_above_threshold(value: int, threshold: int) -> int:
if value > threshold:
raise ValueError("Value is above threshold")
return value
@flow
def my_flow():
list_of_numbers = list(range(10))
states = fails_above_threshold.map(
list_of_numbers, threshold=unmapped(5), return_state=True # same threshold of 5, for each mapped task
)
failed_states = [state for state in states if state.is_failed()]
print(f"Oh no! {len(failed_states)} states failed!")
completed_states = [state for state in states if state.is_completed()]
results = []
for state in completed_states:
result = state.result()
print(f"Finished successfully with value {result}")
results.append(result)
return results
if __name__ == "__main__":
assert my_flow() == [0, 1, 2, 3, 4, 5]
this example should be copy-pastable for you if you want to try it out but essentially you'll get
Oh no! 4 states failed!
Finished successfully with value 0
Finished successfully with value 1
Finished successfully with value 2
Finished successfully with value 3
Finished successfully with value 4
Finished successfully with value 5
18:15:55.701 | INFO | Flow run 'loutish-ringtail' - Finished in state Completed()
Nate
05/02/2024, 11:17 PMstates
you get back - so that downstream, you can only operate on results associated with Completed
task runs if you wantBryan
05/02/2024, 11:19 PMNate
05/02/2024, 11:19 PMBryan
05/03/2024, 12:15 AMcompleted_states = [state for state in states if state.is_completed()]
results = []
for state in completed_states:
result = state.result()
print(f"Finished successfully with value {result}")
results.append(result)
I don't see how I can get access to the returned dict from my task. For me, the .result() is always None.Nate
05/03/2024, 12:28 AMNone
⢠you are calling .result()
on a state
outside the context of a flow run where you did not persist the resultBryan
05/03/2024, 5:35 PM@flow(
name="Load_Daily_Extract",
flow_run_name=f"",
description="Runs the daily load from AWS S3.",
log_prints=env_vars.Print_Logging,
)
def load_daily_extract():
function_name = load_daily_extract.__name__ + "()"
logger = get_run_logger()
logger.info("Starting daily data extract & load.")
try:
extract_start_date = set_extract_start_date()
# get/generate list of copy into statements and metadata (i.e. {"COPY INTO...", "Policy Data"})
statements = assemble_statements(extract_start_date)
states = run_copy_into.map(list(statements.values()), list(statements.keys()), return_state=True)
failed_states = [state for state in states if state.is_failed()]
completed_states = [state for state in states if state.is_completed()]
for state in failed_states:
result = state.result()
logger.error(result['description'])
raise Exception(result)
for state in completed_states:
result = state.result()
logger.info(result['description'])
Task:
@task
def run_copy_into(sql_query: str, query_name: str) -> dict:
logger = get_run_logger()
try:
result = snowflake.snowflake_execute(
snowflake_block=env_vars.Snowflake_Connector_Block,
database=env_vars._Db,
query=sql_query,
schema=env_vars.EXTRACT_SCHEMA,
warehouse=env_vars.Snowflake_Warehouse_Daily)
# print(results)
return result
logger.info("S3 files have been loaded.")
logger.info("------------------------------------------------------------")
return { "status": "success", "description": f"{query_name} from S3 have been loaded."}
except Exception as ex:
logger.error(
f"ERROR: {query_name} COPY INTO failed - {str(ex)}"
)
return { "status": "failed", "description": f"{query_name} from S3 failed to load."}
Bryan
05/03/2024, 5:40 PMsnowflake.snowflake_execute
call returns an object with a status and description of the execution.Bryan
05/03/2024, 5:41 PMNate
05/03/2024, 6:02 PMreturn
statements in the success branch of your try
try:
result = snowflake.snowflake_execute(
snowflake_block=env_vars.Snowflake_Connector_Block,
database=env_vars._Db,
query=sql_query,
schema=env_vars.EXTRACT_SCHEMA,
warehouse=env_vars.Snowflake_Warehouse_Daily)
# print(results)
return result # FIRST
<http://logger.info|logger.info>("S3 files have been loaded.")
<http://logger.info|logger.info>("------------------------------------------------------------")
return { "status": "success", "description": f"{query_name} from S3 have been loaded."} # SECOND
Nate
05/03/2024, 6:03 PMreturn
the result from snowflake.snowflake_execute
?Bryan
05/03/2024, 6:26 PMNate
05/03/2024, 6:47 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by