Mitch
06/10/2024, 8:26 PMMitch
06/10/2024, 10:07 PMon_cancellationasync def cancel_subflows(flow, flow_run, state):
    async with get_client() as client:
        logger = get_logger()
        logger.info(f"Flow run context is {flow_run.id}")
        runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(parent_flow_run_id=dict(any_=[flow_run.id]))
        )
        logger.info("Cancelling runs")
        for run in runs:
            logger.info(f"Cancelling run {run.flow_id}")
            await client.set_flow_run_state(
                flow_run_id=run.id, state=Cancelling(), force=False
            )
        logger.info("Completed cancelling runs")force=Truestate=Cancelled()Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by