Jeff Kehler
04/04/2022, 6:18 AMcreate_flow_run
function. I would like to use a Parameter to configure the top level flow but it seems parameters are only really meant to be passed into tasks. Is it possible to use the value from a Parameter within a Flow?Anna Geller
import platform
import prefect
from prefect import Flow, Parameter, task
CHILD_FLOW_NAME = "child_flow_example"
@task(log_stdout=True)
def hello_world_child(x: str):
print(f"Hello {x} from {CHILD_FLOW_NAME}!")
print(
f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
)
with Flow(CHILD_FLOW_NAME) as flow:
user_input = Parameter("user_input", default="Marvin")
hw = hello_world_child(user_input)
parent flow overwriting Marvin to "world" via a Parameter task in the parent flow:
import os
import subprocess
from prefect import Flow, task, Parameter
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
PARENT_FLOW_NAME = "parent_flow_example"
CHILD_FLOW_NAME = "child_flow_example"
PREFECT_PROJECT_NAME = "community"
@task(log_stdout=True)
def hello_world_parent():
print(f"Hello from {PARENT_FLOW_NAME}!")
return PARENT_FLOW_NAME
with Flow(PARENT_FLOW_NAME) as parent_flow:
normal_non_subflow_task = hello_world_parent()
param = Parameter("user_name", default="world")
first_child_flow_run_id = create_flow_run(
flow_name=CHILD_FLOW_NAME,
project_name=PREFECT_PROJECT_NAME,
parameters=dict(user_input=param),
task_args=dict(name="First subflow"),
upstream_tasks=[normal_non_subflow_task],
)
first_child_flowrunview = wait_for_flow_run(
first_child_flow_run_id,
raise_final_state=True,
stream_logs=True,
task_args=dict(name="Wait for the first subflow"),
)
if __name__ == "__main__":
dir_path = os.path.dirname(os.path.realpath(__file__))
subprocess.run(
f"prefect register --project {PREFECT_PROJECT_NAME} -p {dir_path}", shell=True,
)
subprocess.run(
f"prefect run --name {PARENT_FLOW_NAME} --project {PREFECT_PROJECT_NAME}",
shell=True,
)
subprocess.run(
f"prefect agent local start", shell=True,
)
Kevin Kho
create_flow_run
? Or are you trying to add conditional logic?Jeff Kehler
04/05/2022, 1:07 AMcreate_flow_run
function.Kevin Kho
Anna Geller
Jeff Kehler
04/07/2022, 2:12 AM.map
on the create_flow_run
function. However, in my situation I would like to backfill the dates sequentially and I believe the .map
doesn't support that?Kevin Kho
Jeff Kehler
04/07/2022, 9:59 AMstart_date
to end_date
. That part works fine. However, only the first flow from the create_flow_run
function executes and the rest appear to be completely ignored (despite the console showing them being created, however with the same flow id it appears)Jeff Kehler
04/07/2022, 10:17 AMidempotency_key
for each and also had to remove the raise_final_state=True
part. Now it's running the flows sequentially as I wanted them to.Anna Geller
idempotency_key
, this makes sense, but you shouldn't have to remove raise_final_state
Jeff Kehler
04/07/2022, 10:30 AMraise_final_state=True
back in again and it's back to the prior behavior. It will only run the first flow and then considers it finished after that without running any subsequent datesJeff Kehler
04/07/2022, 10:32 AM[2022-04-07 17:28:36+0700] INFO - prefect.FlowRunner | Beginning Flow run for 'Backfill Mongo->BigQuery'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'end_date': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database_subtask': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_port': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'end_date': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'server_tag': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'server_tag': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_port': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'mongo_database_subtask': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'start_date': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'start_date': Finished task run for task with final state: 'Success'
[2022-04-07 17:28:36+0700] INFO - prefect.TaskRunner | Task 'backfill_data': Starting task run...
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Creating flow run 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]' for flow 'Mongo->BigQuery'...
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Created flow run 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': <http://localhost:8080/default/flow-run/ada29a87-91f0-4b59-9161-4977b24fe36e>
[2022-04-07 17:28:36+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Scheduled>: Flow run scheduled.
[2022-04-07 17:28:44+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Submitted>: Submitted for execution
[2022-04-07 17:28:46+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Running>: Running flow.
[2022-04-07 17:29:35+0700] INFO - prefect.backfill_data | Flow 'US (Mongo->BigQuery)[2022-04-01T00:00:00+00:00 - 2022-04-01T23:59:59.999999+00:00]': Entered state <Success>: All reference tasks succeeded.
[2022-04-07 17:29:37+0700] INFO - prefect.TaskRunner | Task 'backfill_data': Finished task run for task with final state: 'Success'
[2022-04-07 17:29:37+0700] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
For this example I set start_date=2022-04-01
and end_date=2022-04-03
. So this should generate 3 flow runs for each date from 1st to 3rd of April. It only ran the 1st and then exited.Anna Geller
Jeff Kehler
04/07/2022, 10:41 AMwith Flow("Mongo->BigQuery") as flow:
min_time = DateTimeParameter(
"min_time",
required=False,
default=(datetime.utcnow() - timedelta(hours=1)).replace(
minute=0, second=0, microsecond=0
),
)
max_time = DateTimeParameter(
"max_time",
required=False,
default=(datetime.utcnow() - timedelta(hours=1)).replace(
minute=59, second=59, microsecond=999999
),
)
mongo_server_tag = Parameter("mongo_server_tag", required=True)
mongo_url = Parameter("mongo_url", required=True)
mongo_database = Parameter("mongo_database", required=True)
mongo_database_subtask = Parameter("mongo_database_subtask", required=True)
# Create a mapping of tasks for all collections in main Mongo database
params = MONGO_COLLECTIONS["main"]
MongoCollectionToBigQuery(
task_run_name="{mongo_collection}",
bq_project_id=prefect.context.get("BQ_PROJECT_ID"),
bq_dataset_id=prefect.context.get("BQ_DATASET_ID"),
bq_location=prefect.context.get("BQ_LOCATION"),
).map(
mongo_collection=[i["mongo_collection"] for i in params],
mongo_projection=[i.get("mongo_projection", None) for i in params],
mongo_created_key=[i.get("mongo_created_key", None) for i in params],
mongo_updated_key=[i.get("mongo_updated_key", None) for i in params],
bq_collection_name=[i.get("bq_collection_name") for i in params],
bq_stream_chunk_size=[i.get("bq_stream_chunk_size", 3000) for i in params],
min_time=unmapped(min_time),
max_time=unmapped(max_time),
mongo_server_tag=unmapped(mongo_server_tag),
mongo_url=unmapped(mongo_url),
mongo_database=unmapped(mongo_database),
task_args=dict(
max_retries=3,
retry_delay=timedelta(minutes=1)
)
)
# Create a mapping of tasks for all collections in SubTask Mongo database
params = MONGO_COLLECTIONS["subtask"]
MongoCollectionToBigQuery(
task_run_name="{mongo_collection}",
bq_project_id=prefect.context.get("BQ_PROJECT_ID"),
bq_dataset_id=prefect.context.get("BQ_DATASET_ID"),
bq_location=prefect.context.get("BQ_LOCATION"),
).map(
mongo_collection=[i["mongo_collection"] for i in params],
mongo_projection=[i.get("mongo_projection", None) for i in params],
mongo_created_key=[i.get("mongo_created_key", "created") for i in params],
mongo_updated_key=[i.get("mongo_updated_key", "updated") for i in params],
bq_collection_name=[i.get("bq_collection_name") for i in params],
bq_stream_chunk_size=[i.get("bq_stream_chunk_size", 3000) for i in params],
min_time=unmapped(min_time),
max_time=unmapped(max_time),
mongo_server_tag=unmapped(mongo_server_tag),
mongo_url=unmapped(mongo_url),
mongo_database=unmapped(mongo_database_subtask),
task_args=dict(
max_retries=3,
retry_delay=timedelta(minutes=1)
)
)
flow.executor = LocalDaskExecutor()
Jeff Kehler
04/07/2022, 10:41 AMMongoCollectionToBigQuery
is a class based Task that is subclassed from prefect.Task
Anna Geller
Anna Geller
mongo_server_tag = Parameter("mongo_server_tag", required=True)
I would recommend mapping with a functional API task rather than with class. Mapping with imperative API is confusing because mapping required stateless functions and classes are stateful and you may even need to add mapped=True argument - it's your choice thoughBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by