Jeff Kehler
04/04/2022, 6:18 AMcreate_flow_run
function. I would like to use a Parameter to configure the top level flow but it seems parameters are only really meant to be passed into tasks. Is it possible to use the value from a Parameter within a Flow?Anna Geller
04/04/2022, 9:08 AMimport platform
import prefect
from prefect import Flow, Parameter, task
CHILD_FLOW_NAME = "child_flow_example"
@task(log_stdout=True)
def hello_world_child(x: str):
print(f"Hello {x} from {CHILD_FLOW_NAME}!")
print(
f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
)
with Flow(CHILD_FLOW_NAME) as flow:
user_input = Parameter("user_input", default="Marvin")
hw = hello_world_child(user_input)
parent flow overwriting Marvin to "world" via a Parameter task in the parent flow:
import os
import subprocess
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
PARENT_FLOW_NAME = "parent_flow_example"
CHILD_FLOW_NAME = "child_flow_example"
PREFECT_PROJECT_NAME = "community"
@task(log_stdout=True)
def hello_world_parent():
print(f"Hello from {PARENT_FLOW_NAME}!")
return PARENT_FLOW_NAME
with Flow(PARENT_FLOW_NAME) as parent_flow:
normal_non_subflow_task = hello_world_parent()
param = Parameter("user_name", default="world")
first_child_flow_run_id = create_flow_run(
flow_name=CHILD_FLOW_NAME,
project_name=PREFECT_PROJECT_NAME,
parameters=dict(user_input=param),
task_args=dict(name="First subflow"),
upstream_tasks=[normal_non_subflow_task],
)
first_child_flowrunview = wait_for_flow_run(
first_child_flow_run_id,
raise_final_state=True,
stream_logs=True,
task_args=dict(name="Wait for the first subflow"),
)
if __name__ == "__main__":
dir_path = os.path.dirname(os.path.realpath(__file__))
subprocess.run(
f"prefect register --project {PREFECT_PROJECT_NAME} -p {dir_path}", shell=True,
)
subprocess.run(
f"prefect run --name {PARENT_FLOW_NAME} --project {PREFECT_PROJECT_NAME}",
shell=True,
)
subprocess.run(
f"prefect agent local start", shell=True,
)
Kevin Kho
04/04/2022, 1:50 PMcreate_flow_run
? Or are you trying to add conditional logic?Jeff Kehler
04/05/2022, 1:07 AMcreate_flow_run
function.Kevin Kho
04/05/2022, 2:10 AMAnna Geller
04/05/2022, 10:35 AMJeff Kehler
04/07/2022, 2:12 AM.map
on the create_flow_run
function. However, in my situation I would like to backfill the dates sequentially and I believe the .map
doesn't support that?Kevin Kho
04/07/2022, 2:18 AMJeff Kehler
04/07/2022, 9:59 AMstart_date
to end_date
. That part works fine. However, only the first flow from the create_flow_run
function executes and the rest appear to be completely ignored (despite the console showing them being created, however with the same flow id it appears)idempotency_key
for each and also had to remove the raise_final_state=True
part. Now it's running the flows sequentially as I wanted them to.Anna Geller
04/07/2022, 10:20 AMidempotency_key
, this makes sense, but you shouldn't have to remove raise_final_state
Jeff Kehler
04/07/2022, 10:30 AMraise_final_state=True
back in again and it's back to the prior behavior. It will only run the first flow and then considers it finished after that without running any subsequent dates[2022-04-07 17:28:36+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'Backfill Mongo->BigQuery'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'end_date': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database_subtask': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_port': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'end_date': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'server_tag': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'server_tag': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_port': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database_subtask': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'start_date': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'start_date': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'backfill_data': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Creating flow run 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]' for flow 'Mongo->BigQuery'...
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Created flow run 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': <http://localhost:8080/default/flow-run/ada29a87-91f0-4b59-9161-4977b24fe36e>
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Scheduled>: Flow run scheduled.
[2022-04-07 17:28:44+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Submitted>: Submitted for execution
[2022-04-07 17:28:46+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Running>: Running flow.
[2022-04-07 17:29:35+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Success>: All reference tasks succeeded.
[2022-04-07 17:29:37+0700] INFO - prefect.TaskRunner | Task 'backfill_data': Finished task run for task with final state: 'Success'
[2022-04-07 17:29:37+0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
For this example I set start_date=2022-04-01
and end_date=2022-04-03
. So this should generate 3 flow runs for each date from 1st to 3rd of April. It only ran the 1st and then exited.Anna Geller
04/07/2022, 10:36 AMJeff Kehler
04/07/2022, 10:41 AMwith Flow("Mongo->BigQuery") as flow:
min_time = DateTimeParameter(
"min_time",
required=False,
default=(datetime.utcnow() - timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
),
)
max_time = DateTimeParameter(
"max_time",
required=False,
default=(datetime.utcnow() - timedelta(hours=1)).replace(
minute=59, second=59, microsecond=999999
),
)
mongo_server_tag = Parameter("mongo_server_tag", required=True)
mongo_url = Parameter("mongo_url", required=True)
mongo_database = Parameter("mongo_database", required=True)
mongo_database_subtask = Parameter("mongo_database_subtask", required=True)
# Create a mapping of tasks for all collections in main Mongo database
params = MONGO_COLLECTIONS["main"]
MongoCollectionToBigQuery(
task_run_name="{mongo_collection}",
bq_project_id=prefect.context.get("BQ_PROJECT_ID"),
bq_dataset_id=prefect.context.get("BQ_DATASET_ID"),
bq_location=prefect.context.get("BQ_LOCATION"),
).map(
mongo_collection=[i["mongo_collection"] for i in params],
mongo_projection=[i.get("mongo_projection", None) for i in params],
mongo_created_key=[i.get("mongo_created_key", None) for i in params],
mongo_updated_key=[i.get("mongo_updated_key", None) for i in params],
bq_collection_name=[i.get("bq_collection_name") for i in params],
bq_stream_chunk_size=[i.get("bq_stream_chunk_size", 3000) for i in params],
min_time=unmapped(min_time),
max_time=unmapped(max_time),
mongo_server_tag=unmapped(mongo_server_tag),
mongo_url=unmapped(mongo_url),
mongo_database=unmapped(mongo_database),
task_args=dict(
max_retries=3,
retry_delay=timedelta(minutes=1)
)
)
# Create a mapping of tasks for all collections in SubTask Mongo database
params = MONGO_COLLECTIONS["subtask"]
MongoCollectionToBigQuery(
task_run_name="{mongo_collection}",
bq_project_id=prefect.context.get("BQ_PROJECT_ID"),
bq_dataset_id=prefect.context.get("BQ_DATASET_ID"),
bq_location=prefect.context.get("BQ_LOCATION"),
).map(
mongo_collection=[i["mongo_collection"] for i in params],
mongo_projection=[i.get("mongo_projection", None) for i in params],
mongo_created_key=[i.get("mongo_created_key", "created") for i in params],
mongo_updated_key=[i.get("mongo_updated_key", "updated") for i in params],
bq_collection_name=[i.get("bq_collection_name") for i in params],
bq_stream_chunk_size=[i.get("bq_stream_chunk_size", 3000) for i in params],
min_time=unmapped(min_time),
max_time=unmapped(max_time),
mongo_server_tag=unmapped(mongo_server_tag),
mongo_url=unmapped(mongo_url),
mongo_database=unmapped(mongo_database_subtask),
task_args=dict(
max_retries=3,
retry_delay=timedelta(minutes=1)
)
)
flow.executor = LocalDaskExecutor()
MongoCollectionToBigQuery
is a class based Task that is subclassed from prefect.Task
Anna Geller
04/07/2022, 10:59 AMmongo_server_tag = Parameter("mongo_server_tag", required=True)
I would recommend mapping with a functional API task rather than with class. Mapping with imperative API is confusing because mapping required stateless functions and classes are stateful and you may even need to add mapped=True argument - it's your choice though