Rajan Subramanian

    Rajan Subramanian

    5 months ago
    Hello all, i had my prefect orion cloud running for last 4 days with no hiccups. today i logged in and i see all the deployments disappeared from the cloud. Is there a time limit for these processes? @Anna Geller
    Kevin Kho

    Kevin Kho

    5 months ago
    There is no time limit. Could you share your deployment spec?
    Rajan Subramanian

    Rajan Subramanian

    5 months ago
    sure,
    import os
    from os.path import join
    
    from exchange_feeds.constants import EXCHANGEPATH, PHOBOSPATH
    from prefect.deployments import DeploymentSpec
    
    path_to_pipeline = join(PHOBOSPATH, "pipelines", "feed_to_redis_pipeline.py")
    path_to_file = os.path.join(EXCHANGEPATH, "feed_ingestor_to_redis.py")
    
    
    DeploymentSpec(
        name="kraken_blotter_btcusd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "btcusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "BTC/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_ethusd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "ethusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "ETH/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_bnbusd",
        flow_location=path_to_pipeline,
        tags=["kraken_blotter", "bnbusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "BNB/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_xrpusd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "xrpusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "XRP/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_lunausd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "lunausd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "LUNA/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_solusd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "solusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "SOL/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_avaxusd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "avaxusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "AVAX/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_dotusd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "dotusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "DOT/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_dogeusd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "dogeusd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "DOGE/USD",
            "stream_name": "kraken-blotter",
        },
    )
    
    DeploymentSpec(
        name="kraken_blotter_adausd",
        flow_location=path_to_pipeline,
        tags=["kraken-blotter", "adausd"],
        parameters={
            "shell_task": path_to_file,
            "symbol": "ADA/USD",
            "stream_name": "kraken-blotter",
        },
    )
    thats one such deployment
    heres my pipeline,
    import subprocess
    from typing import List
    
    from prefect import flow, task
    from prefect.task_runners import ConcurrentTaskRunner
    from prefect.tasks import Task
    
    from pipelines.prefecttoolz import build_command
    
    # shell task to run python programs
    
    
    @task(
        name="execute shell command",
        description="runs each of the exchange.py file as a separate shell task",
        retries=3,
        retry_delay_seconds=10,
    )
    def run_streamer_in_shell(command: Task):
        subprocess.run(command, shell=True)
    
    
    @flow(name="live_feeds_to_redis_pipeline", task_runner=ConcurrentTaskRunner)
    def run_flow(shell_task: str, symbol: str, stream_name: str):
        cmd = build_command(shell_task, symbol=symbol, stream_name=stream_name)
        run_streamer_in_shell(command=cmd)
    @task
    def build_command(path_to_file: str, **kwargs) -> Task:
        if "symbol" in kwargs and "stream_name" in kwargs:
            symbol = kwargs["symbol"]
            stream_name = kwargs["stream_name"]
            return f"""python3 {path_to_file} -t '{symbol}' -n '{stream_name}' -s"""
        return f"python3 {path_to_file}"
    how i stream data...
    # -- To run the file, go to shell and type:
    # -- python3 feed_ingestor_to_redis.py -t 'ethusdt' -n 'binance-blotter'
    # -- To save the stream, type
    # -- python3 feed_ingestor_to_redis.py -t 'ETH/USDT' -n 'kraken-L1' -s 'yes'
    
    import argparse
    import asyncio
    from datetime import datetime
    
    import uvloop
    import websockets
    from websockets import connect
    
    from binance import BinanceBlotter, BinanceOrderBook
    from constants import STREAM_NAMES, StreamName
    from ftx import FTXBlotter, FTXOrderBook
    from kraken import KrakenBlotter, KrakenOrderBook
    
    # event policy needs to be set at top of file
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    factory = {}
    factory[StreamName.BINANCE_BLOTTER.value] = BinanceBlotter
    factory[StreamName.BINANCE_L1.value] = BinanceOrderBook
    factory[StreamName.FTX_BLOTTER.value] = FTXBlotter
    factory[StreamName.FTX_ORDERBOOK.value] = FTXOrderBook
    factory[StreamName.KRAKEN_BLOTTER.value] = KrakenBlotter
    factory[StreamName.KRAKEN_L1.value] = KrakenOrderBook
    factory[StreamName.FTXUS_BLOTTER.value] = FTXBlotter
    factory[StreamName.FTXUS_ORDERBOOK.value] = FTXOrderBook
    
    
    async def stream_from_exchange():
        parser = argparse.ArgumentParser(
            prog="feed_ingestor_to_redis",
            usage="%(prog)s --symbol [options] --stream_name [options]",
            description="Streams Raw Feeds from Exchange",
            epilog="sit back and drink coffee - long running program",
        )
        parser.add_argument(
            "--ticker",
            "-t",
            action="store",
            required=True,
            type=str,
            help="ticker symbol from exchange",
        )
        parser.add_argument(
            "--stream_name",
            "-n",
            action="store",
            required=True,
            type=str,
            help="stream name in redis to store data to",
            choices=STREAM_NAMES,
        )
        parser.add_argument(
            "--save_stream",
            "-s",
            action=argparse.BooleanOptionalAction,
            default=False,
            help="to save to redis",
        )
        args = parser.parse_args()
    
        ticker, stream_name, save_stream = args.ticker, args.stream_name, args.save_stream
        if stream_name not in factory:
            raise KeyError("stream name not found")
    
        save = False if save_stream == "no" else True
    
        exchange = factory[stream_name]
        msg = "entered logger"
        exchg = exchange(ticker, stream_name)
        print(
            f"Connecting to {exchg.exchange}.  If an error occurs please press ctrl + c to stop this forever process"
        )
        print("erorrs are logged at exchange_feeds/exchangelogs.log")
        async for socket in connect(exchg.url, ping_interval=None):
            try:
                <http://exchg.logger.info|exchg.logger.info>(msg)
                exchg.websocket = socket
                await exchg.stream(save=save)
            except Exception as e:
                print(e)
                <http://exchg.logger.info|exchg.logger.info>(e)
                msg = f"""Socket Closed: at {datetime.now()};
                close reason: {exchg.websocket.close_reason},
                    close code: {exchg.websocket.close_code}"""
                <http://exchg.logger.info|exchg.logger.info>(msg)
                <http://exchg.logger.info|exchg.logger.info>(f"error occured in main body {stream_name}-{ticker}")
                if exchg.websocket.close_code == 1013:
                    # sleep for 15 minutes
                    await asyncio.sleep(3600)
                else:
                    await asyncio.sleep(15)
                await exchg.websocket.close()
                continue
    
    
    if __name__ == "__main__":
        asyncio.run(stream_from_exchange())
    Kevin Kho

    Kevin Kho

    5 months ago
    Will ask some team members what could cause this
    Rajan Subramanian

    Rajan Subramanian

    5 months ago
    this is my screenshot
    when i go to flows, i see it there which is weird that it disappeared from my main page
    ps aux | grep python | wc -l
    gives me 118 python processes running
    but when i check on the backend its not being streamed at all
    oh, my prefect agent stopped running. wth. why would the agent just randomly stop running?
    Kevin Kho

    Kevin Kho

    5 months ago
    It shouldn’t but it could die sometimes, you could use something like
    supervisor
    to run it as a long-running process that will start a new one if it fails.
    Anna Geller

    Anna Geller

    5 months ago
    @Rajan Subramanian I run my agent in a conda environment and with supervisor. You can create a file called
    supervisord.conf
    with this content:
    [unix_http_server]
    file=/tmp/supervisor.sock   
    
    [supervisord]
    loglevel=info          
    
    [rpcinterface:supervisor]
    supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
    
    [supervisorctl]
    serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket
    
    [program:prefect-agent]
    command=/Users/anna/opt/anaconda3/envs/orion39/bin/prefect agent start --api '<https://api-beta.prefect.io/api/accounts/accountId/workspaces/workspaceId>' WORK_QUEUE_ID
    and then to start it, run:
    supervisord -c ./supervisord.conf
    Kevin Kho

    Kevin Kho

    5 months ago
    Your dashboard image filters are only looking for one day back and one day forward. Could you try removing those filters?
    Rajan Subramanian

    Rajan Subramanian

    5 months ago
    yep i did, i redeployed the tasks, i am going to follow @Anna Geller advise and @Kevin Kho advise and add supervisor
    Kevin Kho

    Kevin Kho

    5 months ago
    Ah ok so are you good now?
    Rajan Subramanian

    Rajan Subramanian

    5 months ago
    yep will circle back if i have any issues