Rajan Subramanian
04/04/2022, 2:08 PMKevin Kho
Rajan Subramanian
04/04/2022, 2:13 PMimport os
from os.path import join
from exchange_feeds.constants import EXCHANGEPATH, PHOBOSPATH
from prefect.deployments import DeploymentSpec
path_to_pipeline = join(PHOBOSPATH, "pipelines", "feed_to_redis_pipeline.py")
path_to_file = os.path.join(EXCHANGEPATH, "feed_ingestor_to_redis.py")
DeploymentSpec(
name="kraken_blotter_btcusd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "btcusd"],
parameters={
"shell_task": path_to_file,
"symbol": "BTC/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_ethusd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "ethusd"],
parameters={
"shell_task": path_to_file,
"symbol": "ETH/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_bnbusd",
flow_location=path_to_pipeline,
tags=["kraken_blotter", "bnbusd"],
parameters={
"shell_task": path_to_file,
"symbol": "BNB/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_xrpusd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "xrpusd"],
parameters={
"shell_task": path_to_file,
"symbol": "XRP/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_lunausd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "lunausd"],
parameters={
"shell_task": path_to_file,
"symbol": "LUNA/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_solusd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "solusd"],
parameters={
"shell_task": path_to_file,
"symbol": "SOL/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_avaxusd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "avaxusd"],
parameters={
"shell_task": path_to_file,
"symbol": "AVAX/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_dotusd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "dotusd"],
parameters={
"shell_task": path_to_file,
"symbol": "DOT/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_dogeusd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "dogeusd"],
parameters={
"shell_task": path_to_file,
"symbol": "DOGE/USD",
"stream_name": "kraken-blotter",
},
)
DeploymentSpec(
name="kraken_blotter_adausd",
flow_location=path_to_pipeline,
tags=["kraken-blotter", "adausd"],
parameters={
"shell_task": path_to_file,
"symbol": "ADA/USD",
"stream_name": "kraken-blotter",
},
)
import subprocess
from typing import List
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.tasks import Task
from pipelines.prefecttoolz import build_command
# shell task to run python programs
@task(
name="execute shell command",
description="runs each of the exchange.py file as a separate shell task",
retries=3,
retry_delay_seconds=10,
)
def run_streamer_in_shell(command: Task):
subprocess.run(command, shell=True)
@flow(name="live_feeds_to_redis_pipeline", task_runner=ConcurrentTaskRunner)
def run_flow(shell_task: str, symbol: str, stream_name: str):
cmd = build_command(shell_task, symbol=symbol, stream_name=stream_name)
run_streamer_in_shell(command=cmd)
@task
def build_command(path_to_file: str, **kwargs) -> Task:
if "symbol" in kwargs and "stream_name" in kwargs:
symbol = kwargs["symbol"]
stream_name = kwargs["stream_name"]
return f"""python3 {path_to_file} -t '{symbol}' -n '{stream_name}' -s"""
return f"python3 {path_to_file}"
# -- To run the file, go to shell and type:
# -- python3 feed_ingestor_to_redis.py -t 'ethusdt' -n 'binance-blotter'
# -- To save the stream, type
# -- python3 feed_ingestor_to_redis.py -t 'ETH/USDT' -n 'kraken-L1' -s 'yes'
import argparse
import asyncio
from datetime import datetime
import uvloop
import websockets
from websockets import connect
from binance import BinanceBlotter, BinanceOrderBook
from constants import STREAM_NAMES, StreamName
from ftx import FTXBlotter, FTXOrderBook
from kraken import KrakenBlotter, KrakenOrderBook
# event policy needs to be set at top of file
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
factory = {}
factory[StreamName.BINANCE_BLOTTER.value] = BinanceBlotter
factory[StreamName.BINANCE_L1.value] = BinanceOrderBook
factory[StreamName.FTX_BLOTTER.value] = FTXBlotter
factory[StreamName.FTX_ORDERBOOK.value] = FTXOrderBook
factory[StreamName.KRAKEN_BLOTTER.value] = KrakenBlotter
factory[StreamName.KRAKEN_L1.value] = KrakenOrderBook
factory[StreamName.FTXUS_BLOTTER.value] = FTXBlotter
factory[StreamName.FTXUS_ORDERBOOK.value] = FTXOrderBook
async def stream_from_exchange():
parser = argparse.ArgumentParser(
prog="feed_ingestor_to_redis",
usage="%(prog)s --symbol [options] --stream_name [options]",
description="Streams Raw Feeds from Exchange",
epilog="sit back and drink coffee - long running program",
)
parser.add_argument(
"--ticker",
"-t",
action="store",
required=True,
type=str,
help="ticker symbol from exchange",
)
parser.add_argument(
"--stream_name",
"-n",
action="store",
required=True,
type=str,
help="stream name in redis to store data to",
choices=STREAM_NAMES,
)
parser.add_argument(
"--save_stream",
"-s",
action=argparse.BooleanOptionalAction,
default=False,
help="to save to redis",
)
args = parser.parse_args()
ticker, stream_name, save_stream = args.ticker, args.stream_name, args.save_stream
if stream_name not in factory:
raise KeyError("stream name not found")
save = False if save_stream == "no" else True
exchange = factory[stream_name]
msg = "entered logger"
exchg = exchange(ticker, stream_name)
print(
f"Connecting to {exchg.exchange}. If an error occurs please press ctrl + c to stop this forever process"
)
print("erorrs are logged at exchange_feeds/exchangelogs.log")
async for socket in connect(exchg.url, ping_interval=None):
try:
<http://exchg.logger.info|exchg.logger.info>(msg)
exchg.websocket = socket
await exchg.stream(save=save)
except Exception as e:
print(e)
<http://exchg.logger.info|exchg.logger.info>(e)
msg = f"""Socket Closed: at {datetime.now()};
close reason: {exchg.websocket.close_reason},
close code: {exchg.websocket.close_code}"""
<http://exchg.logger.info|exchg.logger.info>(msg)
<http://exchg.logger.info|exchg.logger.info>(f"error occured in main body {stream_name}-{ticker}")
if exchg.websocket.close_code == 1013:
# sleep for 15 minutes
await asyncio.sleep(3600)
else:
await asyncio.sleep(15)
await exchg.websocket.close()
continue
if __name__ == "__main__":
asyncio.run(stream_from_exchange())
Kevin Kho
Rajan Subramanian
04/04/2022, 2:18 PMps aux | grep python | wc -l
gives me 118 python processes runningKevin Kho
supervisor
to run it as a long-running process that will start a new one if it fails.Anna Geller
supervisord.conf
with this content:
[unix_http_server]
file=/tmp/supervisor.sock
[supervisord]
loglevel=info
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket
[program:prefect-agent]
command=/Users/anna/opt/anaconda3/envs/orion39/bin/prefect agent start --api '<https://api-beta.prefect.io/api/accounts/accountId/workspaces/workspaceId>' WORK_QUEUE_ID
and then to start it, run:
supervisord -c ./supervisord.conf
Kevin Kho
Rajan Subramanian
04/04/2022, 2:40 PMKevin Kho
Rajan Subramanian
04/04/2022, 2:49 PM