José Duarte
08/19/2022, 10:00 AMOscar Björhn
08/19/2022, 10:08 AMJosé Duarte
08/19/2022, 10:12 AMgraph.gpickle
from dbt and I am trying to run each model as a subflow yes, which then uses the prefect-dbt
util
To sort them, I take advantage of networkx
topological sort utilitiesOscar Björhn
08/19/2022, 10:21 AMJosé Duarte
08/19/2022, 10:26 AMOscar Björhn
08/19/2022, 10:28 AMJosé Duarte
08/19/2022, 10:37 AMA -> B, C
B -> D, E
C -> E
Given a random node on that graph, we want to be able to launch the “topological runner” for the remaining nodes
For example:
• Given node C
I want to run, C
and then run E
• Given node B
I want to run B
and then D
and E
I want to be able to run the previous scenarios, at the same timeOscar Björhn
08/19/2022, 10:39 AMJosé Duarte
08/19/2022, 1:26 PMOscar Björhn
08/19/2022, 1:32 PM@task
async def run_deployment(flow_name: str, env: Env, run_name: str, parameters: Dict[str, Any] = None, fire_and_forget: bool = False):
"""
Creates a flow run from a deployment and polls it until it reaches a terminal state.
If fire and forget is true, no polling is performed and the function exits early.
"""
logger = get_run_logger()
async with get_client() as client:
deployments = await client.read_deployments()
for deployment in deployments:
if deployment.name == f"{flow_name} ({env.value})":
deployment_id = deployment.id
break
flow_run = await client.create_flow_run_from_deployment(deployment_id=deployment_id, name=run_name, parameters=parameters)
flow_run_id = flow_run.id
<http://logger.info|logger.info>(f"Created flow run for flow {flow_name} with name: {flow_run.name} and id: {flow_run.id}")
# If fire_and_forget is true, don't poll, just return.
while not fire_and_forget:
flow_run = await client.read_flow_run(flow_run_id=flow_run_id)
<http://logger.info|logger.info>(f"Waiting for child flow run {flow_name} / {flow_run.name} with state {flow_run.state_type}")
if flow_run.state_type in TERMINAL_STATES:
if flow_run.state_type == StateType.COMPLETED:
print(f"Child flow run {flow_name} / {flow_run.name} completed successfully.")
break
elif flow_run.state_type in (StateType.CANCELLED, StateType.FAILED, StateType.CRASHED):
<http://logger.info|logger.info>(f"Child flow run {flow_name} / {flow_run.name} exited non-successfully.")
return Failed()
else:
logger.warn(f"Encountered an unknown terminal state in child flow run {flow_name}: {flow_run.state_type}")
return Failed()
await asyncio.sleep(30)
José Duarte
08/19/2022, 1:43 PMOscar Björhn
08/19/2022, 2:11 PMJosé Duarte
08/19/2022, 2:12 PMEnv
, can you provide a path? I’m not finding it in the docsOscar Björhn
08/19/2022, 2:13 PMJosé Duarte
08/19/2022, 2:13 PMOscar Björhn
08/19/2022, 3:10 PMJosé Duarte
08/19/2022, 3:12 PMOscar Björhn
08/19/2022, 3:16 PM