Mark Reyes
06/13/2024, 6:04 AMMarvin
06/13/2024, 6:05 AMpython
from prefect import task, flow
@task
def my_task(x):
    return x * 2
@task
def aggregate(results):
    return sum(results)
@flow
def my_flow():
    # Map the my_task over a list of items
    mapped_results = my_task.map([1, 2, 3, 4])
    
    # Wait for the mapped tasks to complete and aggregate their results
    aggregated_result = aggregate(
        wait_for=[r for r in mapped_results]
    )
    
    return aggregated_result.result()
if __name__ == "__main__":
    result = my_flow()
    print(f"Final Result: {result}").mapwait_forMark Reyes
06/13/2024, 6:07 AMmapped_resultsMarvin
06/13/2024, 6:07 AMmapped_resultspython
from prefect import task, flow
@task
def my_task(x):
    return x * 2
@flow
def my_flow():
    # Map the my_task over a list of items
    mapped_results = my_task.map([1, 2, 3, 4])
    
    # Wait for the mapped tasks to complete
    for result in mapped_results:
        final_state = result.wait()
        print(f"Task completed with result: {final_state.result()}")
if __name__ == "__main__":
    my_flow().map.wait()Mark Reyes
06/13/2024, 6:07 AMMarvin
06/13/2024, 6:07 AMpython
from prefect import task, flow
@task
def my_task(x):
    return x * 2
@flow
def my_flow():
    # Map the my_task over a list of items
    input_data = [1, 2, 3, 4]
    mapped_results = my_task.map(input_data)
    
    # List to store results in order
    results = []
    # Wait for each task to complete and collect results in input order
    for result in mapped_results:
        final_state = result.wait()
        results.append(final_state.result())
    
    return results
if __name__ == "__main__":
    result = my_flow()
    print(f"Final Result: {result}").wait()Mark Reyes
06/13/2024, 6:30 AMMarvin
06/13/2024, 6:30 AMsubmitwaitpython
from prefect import task, flow
@task
def task_a():
    # Simulated task logic
    return "Result from task_a"
@task
def task_b():
    # Simulated task logic
    return "Result from task_b"
@flow
def my_flow():
    # Submit tasks to run concurrently
    future_a = task_a.submit()
    future_b = task_b.submit()
    
    # Wait for tasks to complete and retrieve results
    result_a = future_a.wait().result()
    result_b = future_b.wait().result()
    
    return result_a, result_b
if __name__ == "__main__":
    result = my_flow()
    print(f"Final Results: {result}").submit().wait().result()task_atask_bMark Reyes
06/13/2024, 6:32 AMMarvin
06/13/2024, 6:33 AMpython
from prefect import task, flow
@task
def task_a(arg):
    # Simulated task logic
    return f"Result from task_a with arg: {arg}"
@task
def task_b(arg):
    # Simulated task logic
    return f"Result from task_b with arg: {arg}"
@flow
def my_flow():
    # Define arguments for each task
    arg_a = 1
    arg_b = 2
    # Submit tasks with arguments to run concurrently
    future_a = task_a.submit(arg_a)
    future_b = task_b.submit(arg_b)
    
    # Wait for tasks to complete and retrieve results
    result_a = future_a.wait().result()
    result_b = future_b.wait().result()
    
    return result_a, result_b
if __name__ == "__main__":
    result = my_flow()
    print(f"Final Results: {result}").submit().wait()task_atask_b