https://prefect.io logo
#prefect-community
Title
# prefect-community
j

Jeff Kehler

04/04/2022, 6:18 AM
I've created a flow that triggers other flows using the
create_flow_run
function. I would like to use a Parameter to configure the top level flow but it seems parameters are only really meant to be passed into tasks. Is it possible to use the value from a Parameter within a Flow?
a

Anna Geller

04/04/2022, 9:08 AM
It's definitely possible. If you need an example, this worked well for me: child flow
Copy code
import platform
import prefect
from prefect import Flow, Parameter, task


CHILD_FLOW_NAME = "child_flow_example"


@task(log_stdout=True)
def hello_world_child(x: str):
    print(f"Hello {x} from {CHILD_FLOW_NAME}!")
    print(
        f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
    )


with Flow(CHILD_FLOW_NAME) as flow:
    user_input = Parameter("user_input", default="Marvin")
    hw = hello_world_child(user_input)
parent flow overwriting Marvin to "world" via a Parameter task in the parent flow:
Copy code
import os
import subprocess
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

PARENT_FLOW_NAME = "parent_flow_example"
CHILD_FLOW_NAME = "child_flow_example"
PREFECT_PROJECT_NAME = "community"


@task(log_stdout=True)
def hello_world_parent():
    print(f"Hello from {PARENT_FLOW_NAME}!")
    return PARENT_FLOW_NAME


with Flow(PARENT_FLOW_NAME) as parent_flow:
    normal_non_subflow_task = hello_world_parent()
    param = Parameter("user_name", default="world")
    first_child_flow_run_id = create_flow_run(
        flow_name=CHILD_FLOW_NAME,
        project_name=PREFECT_PROJECT_NAME,
        parameters=dict(user_input=param),
        task_args=dict(name="First subflow"),
        upstream_tasks=[normal_non_subflow_task],
    )
    first_child_flowrunview = wait_for_flow_run(
        first_child_flow_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for the first subflow"),
    )

if __name__ == "__main__":
    dir_path = os.path.dirname(os.path.realpath(__file__))
    subprocess.run(
        f"prefect register --project {PREFECT_PROJECT_NAME} -p {dir_path}", shell=True,
    )
    subprocess.run(
        f"prefect run --name {PARENT_FLOW_NAME} --project {PREFECT_PROJECT_NAME}",
        shell=True,
    )
    subprocess.run(
        f"prefect agent local start", shell=True,
    )
k

Kevin Kho

04/04/2022, 1:50 PM
Are you trying to pass the parameters into
create_flow_run
? Or are you trying to add conditional logic?
j

Jeff Kehler

04/05/2022, 1:07 AM
@Kevin Kho My intention is to create a backfill Flow where I pass a start date and end date and then I generate a flow for each date between start/end using the
create_flow_run
function.
k

Kevin Kho

04/05/2022, 2:10 AM
Oh ok yeah, I think Anna’s suggestion is what you need
a

Anna Geller

04/05/2022, 10:35 AM
@Jeff Kehler we have a full example for using parametrized flow-of-flows for backfilling here
j

Jeff Kehler

04/07/2022, 2:12 AM
@Anna Geller thanks for the example. That helps clarify a few things. Especially regarding using
.map
on the
create_flow_run
function. However, in my situation I would like to backfill the dates sequentially and I believe the
.map
doesn't support that?
k

Kevin Kho

04/07/2022, 2:18 AM
Ah I think you are saying that map can create multiple runs, but the order is not sequential right? That is right, if you want dynamic sequential execution, you need to look into looping
j

Jeff Kehler

04/07/2022, 9:59 AM
I couldn't get this to work. I tried using the LOOP feature to iterate through a list of dates from
start_date
to
end_date
. That part works fine. However, only the first flow from the
create_flow_run
function executes and the rest appear to be completely ignored (despite the console showing them being created, however with the same flow id it appears)
Ok I got it to work. I had to set a unique
idempotency_key
for each and also had to remove the
raise_final_state=True
part. Now it's running the flows sequentially as I wanted them to.
a

Anna Geller

04/07/2022, 10:20 AM
Nice work! Re
idempotency_key
, this makes sense, but you shouldn't have to remove
raise_final_state
j

Jeff Kehler

04/07/2022, 10:30 AM
@Anna Geller I just tried adding the
raise_final_state=True
back in again and it's back to the prior behavior. It will only run the first flow and then considers it finished after that without running any subsequent dates
Copy code
[2022-04-07 17:28:36+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'Backfill Mongo->BigQuery'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'end_date': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database_subtask': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_port': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'end_date': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'server_tag': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'server_tag': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_port': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database_subtask': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'start_date': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'start_date': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'backfill_data': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Creating flow run 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]' for flow 'Mongo->BigQuery'...
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Created flow run 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': <http://localhost:8080/default/flow-run/ada29a87-91f0-4b59-9161-4977b24fe36e>
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Scheduled>: Flow run scheduled.
[2022-04-07 17:28:44+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Submitted>: Submitted for execution
[2022-04-07 17:28:46+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Running>: Running flow.
[2022-04-07 17:29:35+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Success>: All reference tasks succeeded.
[2022-04-07 17:29:37+0700] INFO - prefect.TaskRunner | Task 'backfill_data': Finished task run for task with final state: 'Success'
[2022-04-07 17:29:37+0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
For this example I set
start_date=2022-04-01
and
end_date=2022-04-03
. So this should generate 3 flow runs for each date from 1st to 3rd of April. It only ran the 1st and then exited.
a

Anna Geller

04/07/2022, 10:36 AM
can you share your code?
j

Jeff Kehler

04/07/2022, 10:41 AM
I can't share the repository or all the glue code. But this is the Flow that I am triggering from the screenshot code I shared above.
Copy code
with Flow("Mongo->BigQuery") as flow:
    min_time = DateTimeParameter(
        "min_time",
        required=False,
        default=(datetime.utcnow() - timedelta(hours=1)).replace(
            minute=0, second=0, microsecond=0
        ),
    )
    max_time = DateTimeParameter(
        "max_time",
        required=False,
        default=(datetime.utcnow() - timedelta(hours=1)).replace(
            minute=59, second=59, microsecond=999999
        ),
    )
    mongo_server_tag = Parameter("mongo_server_tag", required=True)
    mongo_url = Parameter("mongo_url", required=True)
    mongo_database = Parameter("mongo_database", required=True)
    mongo_database_subtask = Parameter("mongo_database_subtask", required=True)

    # Create a mapping of tasks for all collections in main Mongo database
    params = MONGO_COLLECTIONS["main"]
    MongoCollectionToBigQuery(
        task_run_name="{mongo_collection}",
        bq_project_id=prefect.context.get("BQ_PROJECT_ID"),
        bq_dataset_id=prefect.context.get("BQ_DATASET_ID"),
        bq_location=prefect.context.get("BQ_LOCATION"),
    ).map(
        mongo_collection=[i["mongo_collection"] for i in params],
        mongo_projection=[i.get("mongo_projection", None) for i in params],
        mongo_created_key=[i.get("mongo_created_key", None) for i in params],
        mongo_updated_key=[i.get("mongo_updated_key", None) for i in params],
        bq_collection_name=[i.get("bq_collection_name") for i in params],
        bq_stream_chunk_size=[i.get("bq_stream_chunk_size", 3000) for i in params],
        min_time=unmapped(min_time),
        max_time=unmapped(max_time),
        mongo_server_tag=unmapped(mongo_server_tag),
        mongo_url=unmapped(mongo_url),
        mongo_database=unmapped(mongo_database),
        task_args=dict(
            max_retries=3,
            retry_delay=timedelta(minutes=1)
        )
    )

    # Create a mapping of tasks for all collections in SubTask Mongo database
    params = MONGO_COLLECTIONS["subtask"]
    MongoCollectionToBigQuery(
        task_run_name="{mongo_collection}",
        bq_project_id=prefect.context.get("BQ_PROJECT_ID"),
        bq_dataset_id=prefect.context.get("BQ_DATASET_ID"),
        bq_location=prefect.context.get("BQ_LOCATION"),
    ).map(
        mongo_collection=[i["mongo_collection"] for i in params],
        mongo_projection=[i.get("mongo_projection", None) for i in params],
        mongo_created_key=[i.get("mongo_created_key", "created") for i in params],
        mongo_updated_key=[i.get("mongo_updated_key", "updated") for i in params],
        bq_collection_name=[i.get("bq_collection_name") for i in params],
        bq_stream_chunk_size=[i.get("bq_stream_chunk_size", 3000) for i in params],
        min_time=unmapped(min_time),
        max_time=unmapped(max_time),
        mongo_server_tag=unmapped(mongo_server_tag),
        mongo_url=unmapped(mongo_url),
        mongo_database=unmapped(mongo_database_subtask),
        task_args=dict(
            max_retries=3,
            retry_delay=timedelta(minutes=1)
        )
    )

flow.executor = LocalDaskExecutor()
Where
MongoCollectionToBigQuery
is a class based Task that is subclassed from
prefect.Task
a

Anna Geller

04/07/2022, 10:59 AM
I meant the parent flow
Some comments to improve your flow code: doing this is dangerous - required parameters should have a default value, otherwise you can't schedule this flow:
Copy code
mongo_server_tag = Parameter("mongo_server_tag", required=True)
I would recommend mapping with a functional API task rather than with class. Mapping with imperative API is confusing because mapping required stateless functions and classes are stateful and you may even need to add mapped=True argument - it's your choice though
3 Views