hello, have a question about prefect orion. is the...
# prefect-community
r
hello, have a question about prefect orion. is there a shellTask capability in orion that was available in prefect core? Another question in terms of implementation: i have an interesting use case. I am working as a cryptocurrency software engineer. I need to develop streaming pipelines from various exchanges that trade cryptocurrencies. for each exchange, i have two python files that streams different data. So, since i have 4 exchanges at the moment, thats 8 files of streaming data for a particular currency. What i did initially, i wanted all these files to be running at the same time where after every 500 records streamed, i am pushing this to a redis stream. i have a 2nd file where i have transferred data from teh redis stream to a postgresql database and cleaned out the stream and t his is scheduled every 5 minutes. All of this is done asynchronously. Initially, when i got into prefect, i decided to create shellTask and make shell calls using the dask executor. So, i created a file called exchange_to_redis_pipeline.py in prefect where i create a shellTask and store the path to the 8 files that streams the exchange. i think loop through the list and execute shellTask using the dask executor. This works incidentally and i see pipelines executed on the prefect cloud. However, after an hour or so, some of the tasks lose heartbeat and they just stop working. I am not sure if this is due to the websocket or due to prefect losing heartbeat on tasks where its expecting the task to end? Hence, my attempt at using orion now. There are quite a few changes in this library that isn't in prefect. Flow() constructor is no longer available. But also not what is available is the shellTask() capability. Curious if my method is the standard way of achieving this in prefect. Thank you
a
It definitely will be, it's on the roadmap. For now, you could start this task with a subprocess:
Copy code
from prefect import task
import subprocess

@task
def run_shell_command(cmd: str):
   subprocess.run(cmd, shell=True)
r
thank you that helps
k
I think for the streaming use cases or anything long running, cloud 1.0 is not the way to go because you can’t submit tasks dynamically so you are right to be looking at Orion for that
a
@Rajan Subramanian for the next time, can you ask a separate question in a separate thread? 😄 right now it feels like I ignored most of your message because originally the question only asked about
ShellTask
🙌 1
for the streaming use case, I answered your question in this StackOverflow question but if you want to discuss further here, happy to do so
🙌 1
and regarding real-time streaming for cryptocurrency data analysis, I've actually written several blog posts about it in the past, if you're interested - here is one: https://betterprogramming.pub/deep-dive-into-amazon-timestream-building-a-real-time-dashboard-6a15ed36baf6 basically instead of using two different solutions: • Redis for hot data (most recent price data) • Postgres for cold data (long term storage) and moving data around between those, I was taking advantage of Amazon Timestream which allows you to store both in a single database. Basically, with Timestream, you can quickly write real-time data (hot data) into an in-memory store (similarly to Redis) and AWS automatically moves this data into a magnetic store (cold data) after a specified retention period. When it comes to orchestration, you can do that with both, using Prefect 1.0 and Prefect 2.0. • For 2.0 you can use the approach from this blog post, • and for 1.0 you can have minutely batch jobs with a task writing data in mini-batches (technically near-real time but works better than Kinesis or Spark Streaming in practice):
Copy code
@task
def write_real_time_data():
    for iteration in range(1, 7):
        <http://logger.info|logger.info>("iteration nr %s", iteration)
        read_from_redis_and_load_to_postgres() # your logic here
        if iteration < 6:
            <http://logger.info|logger.info>("Sleeping for 10 seconds...")
            time.sleep(10)
🙌 2
r
Thats a very interesting approach. I think i might use this idea to create a real time crypto feed project
🙌 1
@Kevin Kho I am planning on deploying my prefect code to aws ec2 instance i have on the cloud. Is it possible to execute orion in aws?
k
Yes you should be able to as if it were your normal laptop but it may be a bit inconvenient to update the code where and get deployments unless you develop where. We are still developing the remote storage of flows
r
yea it is a bit inconvenient
but im dealing with something with heavy computational power. streaming tons of data. so aws would be better for load balancing etc
i have a macbook pro. and a running ec2 instance. Plan is to use the eks cluster eventually. But im not too sure how prefect will integrate into all this.
the ec2 instance is in linux
k
For EKS I think the story is a bit more mature right now than ec2 because you can register directly against the cluster. Have you tried the KubernetesFlowRunner yet? Docs here
upvote 1
r
hmm, it seems on my local machine, the prefect agent is executing whenever i type 'prefect agent start uiud' but when i ssh into a linux terminal, the same statement freezes the entire terminal. any idea whats going on?
k
Just to clarify, you are doing
prefect agent start uuid
on a VM that contains Prefect Orion?
r
@Kevin Kho, yep
k
Will have to try this. I can’t think of any reason at the moment
r
could it be memory issues?
k
Do you have a lot of late flows that try to get started all of a sudden? What is your current memory on the VM?
r
let me check give me a sec well i just deployed one flow
and it froze
but in my locak machine, i can deploy any number of flows and theres no issue
yea this works from my local machine so good news
k
Ok will try that later today
r
thing is after i run the prefect agent in the vm, it freezes the ssh terminal, so im forced to stop the instance and reboot from the aws side
i log in via ssh -i timescale.pem 'ubuntu@aws_addresss_.com'
k
Where did you create the deployment from and did you specify a default storage? Asking for when I test later
r
from the cli
default storage i used local storage
k
I mean from remote machine or on the VM?
r
oh i tested the deployment on the remote machine running the ec2 instance
and from my local machine
yea on the remote machine, when i type prefect agent start uiud, i see messages on the screen but then it freezes after few seconds
k
Ah ok so you created the deployment and then started the agent on local pointing to Cloud, and then in a VM you repeated the same thing? Or are you not on Cloud yet, and the VM is hosting Orion?
r
yep
exacxtly
i created the deployment and then started the agent on local pointing to Cloud, and then in a VM you repeated the same thing?
and then on the vm i repeated the same steps
k
Oh that is pretty confusing that seems like it should behave exactly the same. So you moved your flow file to the VM when you did this right?
r
yep
i have it all on github so i cloned it
k
and these are all pointed to the same cloud workspace right?
r
yep
do u think u can do the same? hello wo rld deployment on local, and hello world deployment in a vm
k
man…that is confusing. i’m not sure i’ll be able to replicate but i’ll give it a shot later
r
local im only u sing for testing
but technically it should work no?
or am i missing something
k
i dont see a reason why not. seems exactly the same
r
yea same steps followed
however notiiced something, in the linux side w hen i download prefect, it deinstalls aioredis and installs aioredis 1.3.1
whereas on my mac, it doens't do that
maybe some settings is off to make prefect work on the linux side
k
Ah what distro of linux are you using?
r
ubuntu 20.0
i love the new feel of the cloud UI
yea its definitely working from my local machine no issues. will test later on remote
k
That’s really weird because ubuntu 20.0 is supported