Matheus
01/09/2024, 2:51 PMfrom prefect import task, flow, serve, unmapped
from prefect.runtime import task_run
import time
def generate_task_name():
task_name = task_run.task_name
parameters = task_run.parameters
name = parameters["name"]
return f"{task_name}-with-{name}"
@task(name='ext', log_prints=True,
task_run_name=generate_task_name)
def extract_data(region, name):
return ([1, 2, 3, 4], region)
@task(name='transf', log_prints=True,
task_run_name=generate_task_name)
def transform_data(data, name):
region = data[1]
output = [i * 2 for i in data[0]]
time.sleep(3)
return (output, region)
@task(name='load', log_prints=True,
task_run_name=generate_task_name)
def load_data(data, name):
region = data
print(region)
@flow
def map_test():
region = ['sandbox']
extract_test = extract_data.map(region, unmapped('test'))
extract_ball = extract_data.map(region, unmapped('ball'))
transform_test = transform_data.map(extract_test, unmapped('test'), return_state=unmapped(True))
transform_ball = transform_data.map(extract_ball, unmapped('ball'), return_state=unmapped(True))
for t in transform_test:
if t.is_completed():
load_data(t.result()[1], 'test')
for t in transform_ball:
if t.is_completed():
load_data(t.result()[1], 'ball')
if __name__ == "__main__":
map_deploy = map_test.to_deployment(name="map_test")
serve(map_deploy)
Nate
01/09/2024, 2:52 PMNate
01/09/2024, 2:52 PMNate
01/09/2024, 2:53 PMNate
01/09/2024, 2:55 PM