https://prefect.io logo
Title
c

Chris Gunderson

09/12/2022, 10:02 PM
Hi Team - I tried to use this flow to flow in Prefect 1.0. https://docs-v1.prefect.io/core/idioms/flow-to-flow.html . I ended up getting this error: [2022-09-12 16:55:40-0500] ERROR - prefect.sod-loader-parent-flow | Unexpected error occured in FlowRunner: ValueError('Cycle found; flows must be acyclic!')
2
# --------------------------------------------------------------------------------------------------
#   run_sod_loaders_create_flows.py
#
#
#   September 2022
# --------------------------------------------------------------------------------------------------
# created by Chris Gunderson
#
# Purpose:
# This script is intended to create start of day loader flows in prefect.
# --------------------------------------------------------------------------------------------------
# Module Imports

import pendulum
import pandas as pd
from prefect import task, Flow, unmapped
from prefect.storage import CodeCommit
from prefect.run_configs import DockerRun, ECSRun
from prefect.schedules import CronSchedule
from prefect.schedules.clocks import CronClock
from prefect.triggers import all_successful
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


# --------------------------------------------------------------------------------------------------
# --------------------------------------------------------------------------------------------------
weekday_schedule = CronSchedule(
        "30 9 * * 1-5", start_date = pendulum.now("America/Chicago")
)
# register flow
with Flow('sod-loader-parent-flow',
          schedule = weekday_schedule,
          run_config = DockerRun(
                  image = "spiderrockadvisors/py38-base:2.1.0",
                  labels = ['sradock01']
          ),
          storage = CodeCommit(
                  repo = 'advisorscodebase',
                  path = "flow_schedule/run_sod_loaders_create_flows.py",
                  commit = "SRA-471-SODLoadersRegisterNewName",
                  secrets = ["AWS_CREDENTIALS"],
                  client_options = {"region_name": "us-east-2"}
          ),
          ) as flow:

    flow_ClearPositions = create_flow_run(flow_name = "SOD Loaders Dev - ClearPositionsSodLoader", project_name = "StartOfDay",
                                          parameters = {'process_name': 'ClearPositionsSodLoader', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_ClearPositions = wait_for_flow_run(flow_ClearPositions, raise_final_state = True)

    flow_Fidelity = create_flow_run(flow_name = "SOD Loaders Dev - Fidelity", project_name = "StartOfDay",
                                    parameters = {'process_name': 'Fidelity', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_Fidelity = wait_for_flow_run(flow_Fidelity, raise_final_state = True)

    flow_WilliamBlair = create_flow_run(flow_name = "SOD Loaders Dev - WilliamBlair", project_name = "StartOfDay",
                                        parameters = {'process_name': 'WilliamBlair', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_WilliamBlair = wait_for_flow_run(flow_WilliamBlair, raise_final_state = True)

    flow_PershingPAS = create_flow_run(flow_name = "SOD Loaders Dev - Pershing PAS", project_name = "StartOfDay",
                                       parameters = {'process_name': 'PershingPAS', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_PershingPAS = wait_for_flow_run(flow_PershingPAS, raise_final_state = True)

    flow_TD = create_flow_run(flow_name = "SOD Loaders Dev - TD", project_name = "StartOfDay",
                              parameters = {'process_name': 'TD', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_TD = wait_for_flow_run(flow_TD, raise_final_state = True)

    flow_Schwab = create_flow_run(flow_name = "SOD Loaders Dev - Schwab", project_name = "StartOfDay",
                                  parameters = {'process_name': 'Schwab', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_Schwab = wait_for_flow_run(flow_Schwab, raise_final_state = True)

    flow_UBS = create_flow_run(flow_name = "SOD Loaders Dev - UBS", project_name = "StartOfDay",
                               parameters = {'process_name': 'UBS', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_UBS = wait_for_flow_run(flow_UBS, raise_final_state = True)

    flow_RiverNorth = create_flow_run(flow_name = "SOD Loaders Dev - RiverNorth", project_name = "StartOfDay",
                                      parameters = {'process_name': 'RiverNorth', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_RiverNorth = wait_for_flow_run(flow_RiverNorth, raise_final_state = True)

    flow_USBank = create_flow_run(flow_name = "SOD Loaders Dev - US Bank", project_name = "StartOfDay",
                                  parameters = {'process_name': 'USBank', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_USBank = wait_for_flow_run(flow_USBank, raise_final_state = True)

    flow_MS = create_flow_run(flow_name = "SOD Loaders Dev - MorganStanley", project_name = "StartOfDay",
                              parameters = {'process_name': 'MorganStanley', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_MS = wait_for_flow_run(flow_MS, raise_final_state = True)

    flow_SRALoaderAlerts = create_flow_run(flow_name = "SOD Loaders Dev - SRA Loader Alerts", project_name = "StartOfDay",
                                           parameters = {'process_name': 'SRALoaderAlerts', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
    wait_for_flow_SRALoaderAlerts = wait_for_flow_run(flow_SRALoaderAlerts, raise_final_state = True)

    flow_Fidelity.set_upstream(wait_for_flow_ClearPositions)
    flow_WilliamBlair.set_upstream(wait_for_flow_Fidelity)
    flow_PershingPAS.set_upstream(wait_for_flow_WilliamBlair)
    flow_TD.set_upstream(wait_for_flow_PershingPAS)
    flow_Schwab.set_upstream(wait_for_flow_TD)
    flow_UBS.set_upstream(wait_for_flow_Schwab)
    flow_USBank.set_upstream(wait_for_flow_UBS)
    flow_RiverNorth.set_upstream(wait_for_flow_USBank)
    flow_MS.set_upstream(wait_for_flow_RiverNorth)
    flow_SRALoaderAlerts.set_upstream(wait_for_flow_SRALoaderAlerts)

# --------------------------------------------------------------------------------------------------
# --------------------------------------------------------------------------------------------------
# run flow

if __name__ == "__main__":
    # flow.register(project_name='flow_schedule')
    flow.run(run_on_schedule = False)
# SOD Loaders Prefect Scheduler
import requests
import pendulum
import prefect
from prefect import task, Flow, Parameter
from prefect.run_configs import DockerRun
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect.engine import signals
from prefect.storage import CodeCommit
from prefect.tasks.notifications import SlackTask
from datetime import timedelta
from prefect.client import Secret

slackPrefectUrl = ""
# --------------------------------------------------------------------------------------------------
# prefect failure notification and reschedule on failed flow


def alert_on_special_failure(task, old_state, new_state):
    if new_state.is_failed():
        if getattr(new_state.result, "flag", False) is True:
            errMsg = '--- LOADER ERROR ---'
            msg = "{}\nTask: `{}` FAILED.\nThe loader process failed: `{}`".format(
                    errMsg, task.name, new_state.result.value)
            SlackTask().run(message = msg, webhook_secret = slackPrefectUrl)
    return new_state


@task(name = 'send API request', max_retries = 3, retry_delay = timedelta(minutes = 5), state_handlers = [alert_on_special_failure])
def post_request_process(process_name: str, arguments: str, inceptionUrlSecret: str, timeOut: int):
    logger = prefect.context.get('logger')

    url = Secret(inceptionUrlSecret).get()

    <http://logger.info|logger.info>(f'The url is: {url}')
    current_date = pendulum.now("America/Chicago").to_datetime_string()
    if not process_name:
        logger.error(f'Process name is null or empty')
        fail_signal = signals.FAIL(f'Process name is null or empty')
        fail_signal.flag = True
        fail_signal.value = f'Process name is null or empty'
        raise fail_signal
    else:
        <http://logger.info|logger.info>(f'Process: {process_name}')
    if not arguments:
        <http://logger.info|logger.info>(f'The arguments string is null or empty?')
    else:
        <http://logger.info|logger.info>(f'loading arguments:{arguments} for date: {current_date}')

    json_message = {'processName': process_name, 'arguments': arguments}
    <http://logger.info|logger.info>(f'The json message is: {json_message}')
    try:
        response = <http://requests.post|requests.post>(url = url, json = json_message, verify = False, timeout = timeOut)
    except requests.Timeout:
        logger.error(f'Timeout occurred: Exceeded {timeOut} seconds.')
        fail_signal = signals.FAIL(f'Timeout occurred: Exceeded {timeOut} seconds.')
        fail_signal.flag = True
        fail_signal.value = f'Timeout occurred: Exceeded {timeOut} seconds.'
        raise fail_signal
    except requests.ConnectionError:
        logger.error(f'Connection error to the Inception Service')
        fail_signal = signals.FAIL(f'Connection error to the Inception Service')
        fail_signal.flag = True
        fail_signal.value = f'Connection error to the Inception Service'
        raise fail_signal

    <http://logger.info|logger.info>(f'Response')
    if response.status_code != 200:
        logger.error(f'Request failed!')
        logger.error(f'Process: {process_name} {response.text}, {response.status_code}')
        fail_signal = signals.FAIL(f'Process failed: {process_name} {response.text}, {response.status_code}')
        fail_signal.flag = True
        fail_signal.value = f'Process failed: {process_name} {response.text}, {response.status_code}'
        raise fail_signal
    else:
        <http://logger.info|logger.info>(f'Request sent successfully!')
        <http://logger.info|logger.info>(f'Process: {process_name} {response.text},Status Code:  {response.status_code}')
        return


with Flow(name='Dev Start of Day Loaders', run_config = DockerRun(image = "spiderrockadvisors/py38-base:2.1.0", labels = ["sradock01"]),
          storage = CodeCommit(repo = 'advisorscodebase',
                               path = 'flow_schedule/loader.py',
                               commit = 'SRA-471-SODLoadersRegisterNewName',
                               secrets = ["AWS_CREDENTIALS"],
                               client_options = {"region_name": "us-east-2"})) as flowDev:
    slackPrefectUrl = "SLACK_PREFECT_DEV"
    procName = Parameter("process_name", default = 'ClearPositionsSodLoader')
    arguments = Parameter("arguments", default = None)
    inceptionUrlSecret = Parameter("inceptionUrlSecret", default='DEV_INCEPTION_URL')
    timeout = Parameter("timeOut", default = 600)
    post_request_process(process_name=procName, arguments=arguments, inceptionUrlSecret=inceptionUrlSecret, timeOut = timeout, upstream_tasks = [procName, arguments])


if __name__ == "__main__":
    # flowDev.run(parameters = {"process_name": '', "arguments": '', "inceptionUrlSecret": 'DEV_INCEPTION_URL'}, run_on_schedule = False)
    # flow.register(project_name='StartOfDay', set_schedule_active = False)
    flowDev.register(project_name='StartOfDay', set_schedule_active = False)
j

Jean Luciano

09/13/2022, 3:28 AM
It looks like you are setting
flow_SRALoadersAlert
as a cyclic dependency here
flow_SRALoaderAlerts.set_upstream(wait_for_flow_SRALoaderAlerts)
👍 1
c

Chris Gunderson

09/13/2022, 1:11 PM
Thanks Jean. I just made the change and will be testing it shortly
It is working now. I was hoping to be able to name the flows. How can I do that?
I tried doing something like this:
flow_MS = create_flow_run(flow_name = "Dev Start of Day Loaders", project_name = "StartOfDay", run_name = 'Morgan Stanley',
                          parameters = {'process_name': 'MorganStanley', 'inceptionUrlSecret': 'DEV_INCEPTION_URL'})
wait_for_flow_MS = wait_for_flow_run(flow_MS, raise_final_state = True)
^^ Nevermind, the flows were named correctly on the main dashboard.
@Jean Luciano You can close this