Hello, Ive tried Prefect today and I really love i...
# prefect-cloud
l
Hello, Ive tried Prefect today and I really love it as pipeline data from TMDB to my Supabase. But its is normal that my python script is very slow. I means I should update around 5000 items per days but its takes 3min for 100 items that pretty weird no ? Thx for help !
n
hi @Loup - welcome! there are many reasons why things could be moving slowly, especially if your work is IO / network bound could you show how you're utilizing prefect here and perhaps what your code is doing in general? prefect should not generally add that much latency
l
Thx for your help ! There is all my script :
Because 2,5s per item is pretty slow but actually isnt really a problem... Now I try to host this script because I dont really understand how work Prefect Cloud. Right now Im running the script through my computer, but I wanna something in cloud. So I wanna try Prefect Cloud but I dont know how to do it especially because my script download file (around 500mo in total) so is it possible ? To test, Im using the my script .py script but Im getting this error :
Flow run infrastructure exited with non-zero status code:
Task flow-run-0b5fa2b4-816f-4e52-a960-6338bac43f22-5775ec5191cd45h4n-task0 failed with message: The container exited with an error. (Error Code: 1)
b
When you want to run your script off of your local machine @Loup, this part of the getting started talks explain how to host it in a github repo: https://docs.prefect.io/latest/getting-started/quickstart/#step-4-choose-a-remote-infrastructure-location
l
The best option for me is to use Prefect Cloud because I don’t really wanna have an another VPS for this. But I’m getting an error with it. Also Prefect don’t give us the pricing of the Cloud, and I don’t know if we have a limit of storage space (because my script will download 2 files of 250mo at each execution) and machine specification either (cpu, ram,…). Ive tried to make a VPS using Docker with Prefect. It’s working but I need to have my VPS terminal open because if I leave the terminal (after started the pool worker), all my execution are in state mode (so I guess my worker isn’t reachable)
I dont find tutorial about setup kinda VPS server on Ubuntu for exemple as new worker pool. Because if the VPS crash I wanna when restarting automatically be ready for prefect task
n
hi @Loup - even if you’re using prefect cloud, by default you will need to run a worker process that will listen for work to submit. we are in the process of rolling out managed execution where we do this for you - right now push pools are probably your best bet @Marvin please lookup push pools and also find that systemd linux worker article, also eli5 everything
hmm idk what’s wrong with @Marvin but here
m
It seems like there might be an issue with the link formatting that you've posted. To help you more effectively, could you please describe the problem you're encountering? Are you trying to access the Prefect documentation related to "push work pools"? If there's a specific issue or topic you need information on, please clarify, and I can assist you further.
n
@Marvin look up work pools / workers / push pools all separately and give a eli5 on each, explaining how they let me run my stuff without keeping my laptop open - be concise
m
Sure, let's break it down into simpler terms: Work Pools in Prefect 2.x: Work pools are like designated areas where flow runs wait to be performed. When you have a flow that's ready to run, it's sent to a specific work pool. Think of it as a bus stop where the flow run waits for a bus (the worker) to pick it up and take it to its destination (execution). Workers in Prefect 2.x: Workers are like the bus drivers for your flows. They check in with the work pool to see if there are any flow runs that need to be done. If they find any, they pick up the flow run and execute it. You don't need to keep your laptop open because these workers are running elsewhere, like in the cloud or on a server, doing the work automatically. Push Pools in Prefect 2.x: Push work pools are a special kind of bus stop that doesn't need a bus driver at all! Instead of waiting for a worker to pick up the flow run, the work pool directly sends the flow run to serverless infrastructure, such as AWS ECS Tasks or GCP Cloud Run Jobs. This means your flows can be run on-demand in these serverless services without needing a dedicated worker to start the run. Using work pools and workers, or push work pools, allows your flows to run automatically without needing your laptop to be on since cloud services or dedicated servers handle the execution for you.
l
@Nate thx Ive added a service to run autmatically my worker. Do I really need to use docker ? Or I can just create a local subprocess ? Because I dont know how can I add pip library to the docker image 😅
n
you don't need to use docker, the absolute easiest way if you just have one script is to use the
.serve
method and maybe put that up as a systemd process that revives itself
but in case you end up wanting docker at some point
I dont know how can I add pip library to the docker image 😅
I'll mention that you can just build an image
FROM prefecthq/prefect:2-latest
and that will have everything you need as far as prefect is concerned
l
So I can just create a local subprocess worker ?
Ive try to make a Makefile but Im new to docker so maybe not a good idea ahah
n
if you only have 1 script, then
serve
does the whole job of the Process worker - if you have
Copy code
# my_new_deployment.py
from prefect import flow

@flow(log_prints=True)
def some_flow():
   pass

if __name__ == "__main__":
   some_flow.serve("my_new_deployment")
then you can do
python my_new_deployment.py
and that will start a long-lived process analogous to a process worker the only nice thing about a worker is that if you wanted to add another script (i.e. deployment) later to run in the same place, you'd just need to create the deployment (
prefect deployment somefile.py:somefunc -n new-deployment -p my-process-pool
) and the process worker would be able to pull it in (without having to change anything else) @Marvin concisely give a systemd config file to run the above script on linux as a service
m
To run your script as a service on a Linux system with
systemd
, you can create a service file (let's call it
my_flow_service.service
) with the following configuration:
Copy code
ini
[Unit]
Description=My Flow Service
After=network.target

[Service]
Type=simple
User=<YourUsername>
WorkingDirectory=<YourScriptDirectory>
ExecStart=/usr/bin/python3 my_new_deployment.py
Restart=on-failure
RestartSec=5s

[Install]
WantedBy=multi-user.target
Replace
<YourUsername>
with your Linux username and
<YourScriptDirectory>
with the full path of the directory where your script
my_new_deployment.py
is located. You would then place this file in the
/etc/systemd/system/
directory. Enable and start the service using the following commands:
Copy code
shell
sudo systemctl enable my_flow_service.service
sudo systemctl start my_flow_service.service
This will set up your script as a systemd service that starts automatically, runs continuously in the background, and restarts in case of failure.
n
where if you ended up wanting to go the worker route, you could instead run the worker process here
Copy code
ExecStart=/usr/path/to/my/venv/prefect worker start -p my-process-pool
l
But with
.serve
we can't specify worker pool no ? And no I wanna have multiple script so maybe docker is better I guess, but dont know how add rule for adding pip library for my script
n
But with
.serve
we can't specify worker pool no ?
correct. you'd have to
pip install mylib
into the virtual environment where you'd start the
serve
process if you end up wanting to go the docker route, you'd just have to • create a work pool (I'd recommend push pool as mentioned earlier), but it could be
docker
too if you wanted to run docker on your VM • swap
.serve
for
.deploy
on your script ◦ pass in the reference to your
Dockerfile
to
.deploy
which does some
RUN pip install mylib
on top of the prefect base image
l
Ohh I didn't know we can pass in reference a Dockerfile to .deploy
I have this Dockerfile (with
requirement.ts
) :
Copy code
# We're using the latest version of Prefect with Python 3.10
FROM prefecthq/prefect:2-python3.10

# Add our requirements.txt file to the image and install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir

# Add our flow code to the image
COPY flows /opt/prefect/flows

# Run our flow script when the container starts
CMD ["python", "flows/tmdb_update_person.py"]
And Push pool is only with AWS, Google and Azure right ? I guess its expensive no ? Im a student so I dont wanna pay more than 5e/month for my script
n
Push pool is only with AWS, Google and Azure right ?
yeah gotcha, how much exactly it costs depends on how much runtime you use but if you've already got a VM then I'd just install docker there and start a docker worker there like i mentioned above w systemd
and then remove
Copy code
# Run our flow script when the container starts
CMD ["python", "flows/tmdb_update_person.py"]
from the Dockerfile you pass to
.deploy
the docker work pool will use a
command
we need to invoke the engine / track the exectuion
l
Yes Im gonna do that but I dont find the docs to add Makefile inside deploy... Im so sry for disturbing with that
n
no worries! what do you mean by Makefile here?
l
I think I miss understood something about how we make deployment. Because I have my script and at the end I have the deploy :
Copy code
...python code before
@task
def set_last_update_date(current_date: datetime) -> None:
    supabase.table('recomend_tmdb_changes').insert([{
        'date': current_date.strftime("%Y-%m-%d"),
        'success': True,
    }]).execute()
    
@flow(name="tmdb_update_person", log_prints=True)
def tmdb_update_person():
    current_date = datetime.now()
    last_update_date = get_last_update_date()
    print(f"current_date: {current_date}")
    print(f"last_update_date: {last_update_date}")
    # tmdb_check_changes_person_ids(current_date,last_update_date)
    tmdb_check_export_person_ids(current_date,last_update_date)
    # set_last_update_date(current_date)

if __name__ == "__main__":
    flow.from_source(
    	source="<https://github.com/lxup/recomend-prefect.git>",
    	entrypoint="tmdb_update_person.py:tmdb_update_person",
    ).deploy(
        name="tmdb_update_person",
        work_pool_name="VPS_Worker_1",
        cron="0 6 * * *"
    )
Because this script in inside my local machine (not my VPS). So to deploy it I use
python3 name
. So how can I give him the dockerfile with the
pip install
command
n
so take this example
test.py
I can either install
pip install EXTRA_PIP_PACKAGES
at runtime or go ahead and pass a
DeploymentImage
like this so I can: • run this
python test.py
on my local • this will create the deployment on prefect cloud • create flow runs from my deployment that will be picked up by a worker (k8s work pool for me here, docker work pool for you on your VM)
l
Damn I think its working
Thx a lot @Nate so sry again ahah
Im gonna try to optimize my script now because 2.5s for getting one item. Im gonna try to implement multithreading in the script
n
good to hear! i think there are some native prefect features you could leverage before jumping straight to your own multithreading for example, I think you could rework some of your code to leverage task's
map
/
submit
interface, which allows you to leverage multithreading without implementing it yourself for example, this will run in about 5 seconds, as each task is executed in its own worker thread
if you're a visual person,

this

might be a helpful video
l
My VPS with only 1vCpu and 2RAM dont use all power
Ive try multithreading but I got
dict
error so Im gonna check ur video because right now its gonna take 6h just for updating person. I also need to update movie
Btw with docker I can run multiple script in the same VPS right ?
n
i am not super familiar with running Prefect on VPS infra to be clear, the video just briefly covers task's
.submit
/ and
.map
which I think would be directly useful for optimizing your code in general I'd just recommend refactoring your code so that heavy IO / network bound stuff is in tasks that you can map (run concurrently) feel free to #C04DZJC94DC about VPS stuff, he'll probably know more than me 🙂
l
Well because right now my code for each person, make 2 request for getting details, and also 2 requests for uploading data in my database so if I can do multiple person in once like 5 directly it could be awesome. Im gonna check that. Btw Prefect is sick !
n
catjam
lmk if you have any other questions!
l
ahah wait Marvin is an ai agent ? 😭
n
yep! if you're curious here's the code
🙌 1
l
Im stupid, my actual script make for each items 3 supabase request, so during the script my supabase instance is completly overloaded. So I should store locally all items details, and after make one single Supabase request for update everything
Damn the video u send me @Nate for multitasking is awesome, now my script take around 45 min to run instead of 6hours ahah
Its way better. Im gonna try to be around 5 minutes 😄
n
catjam
great to hear!
l
There is a way to run multiple deployment in chain ? For exemple I have a main flow in tmdb_update.py, and I wanna call multiple flow (in tmdb_update_person.py for exemple)
from prefect import flow
from prefect.blocks.system import Secret
@flow(name="tmdb_update", log_prints=True)
def tmdb_update():
print("Starting tmdb_update")
tmdb_update_person()
# tmdb_update_person()
if __name__ == "__main__":
flow.from_source(
source="<https://github.com/lxup/recomend-prefect.git>",
entrypoint="flows/tmdb_update.py:tmdb_update",
).deploy(
name="tmdb_update_person",
work_pool_name="VPS_Worker_1",
cron="0 6 * * *",
job_variables={
"env": {
"EXTRA_PIP_PACKAGES": "supabase tqdm more_itertools",
}
}
)
is it possible ?
n
when you say “multiple deployments in a chain” do you mean you’ve already created some deployments and you want to call them from your tmdb flow and have them show up there? would you want to run them in the same process as your (parent) tmdb flow, or as a separate process? what you’re asking for is definitely possible, just trying to hone in on exactly what you mean
here’s an alternate path where we use a statehook to trigger a run of an existing deployment on_completion of a given flow
and the most flexible / modular way to chain stuff together (in my opinion) is using events
where you could put a trigger on your deployments such that when a specific event occurs (or does not occur ) you can run that deployment (for example a prefect.flow-run.Completed event from an upstream deployment)
l
Sry @Nate for the late, well Im remaking all my script because it was completly unsecure (during the night all my database was broken because of the script ahah thx the backup). So now Im making all my script into one. Because I need to make run before other (like updating language, countries before adding new movie because movie table need relation with language et and countries table)
I was wondering, why we cant call task inside task ? Because I dont wanna have a infinite number of flow inside Prefect
I just wanna have flow for big process (for exemple, my flow
tmdb_update
is a flow but
tmdb_update_language
is a task)
n
hi Loup, its a busy day for me, i can look more at this later. in general you cannot call tasks in tasks (arbitrary hard rule for now). subflows are infinitely nestable, so its just a question of what modularization is best for your use case (make tasks things you want to map or retry) can come back to this later
l
No problem Im gonna try myself I dont wanna force u to help me ahah🤣 And yeah the problem is if map can be only done with task I have to create flow...
n
there's also this pattern which is a way around that
downside is you have to create a deployment that represents the thing you want to parallelize
l
Hello, my script crash because Ive reached the rate limite : Response: {'detail': 'Orchestration API rate limit reached. To raise your rate limit please contact help@prefect.io'} How can I fix that ? I dont understand when api are made because I dont really need them. I only wanna launchy my script automatically
n
api calls are made whenever you're actively running a flow for example, setting the states of flow or task runs, creating the flow / task runs themselves etc
are you mapping a lot of tasks or something?
l
Yeah I guess, I have this main flow in my script :
Copy code
@flow(name="tmdb_update", log_prints=True)
def tmdb_update():
	print("Starting TMDB update")
	tmdb_update_language()
	tmdb_update_country()
	tmdb_update_keyword()
	tmdb_update_collection()
	tmdb_update_company()
	tmdb_update_person()
And each subflow contain task. For exemple there is the
tmdb_update_person()
: As u can see the
get_tmdb_person_details
is a task because I call it with map inside the flow for making parallele task...
Wait im gonna give the file instead of print it
There the part for update person
But I only use task for making multiple TMDB request in the same time...
If I delete the
@task
above a function im geting this : 'function' object has no attribute 'map'
I guess I have to use :
from multiprocessing.pool import Pool
instead using task
n
how often are you running this flow?
l
My script should run every day at 8 am UTC
n
hmm i’d be surprised if running this 1 or a couple times a day caused you to hit rate limits
l
But maybe because my the task is running around 5 000 time just for one flow ?
Yesteday I also have this rate limit alert
But on the web ui
n
yeah 5000 task runs within a short time would cause that on free tier. can you show your code specifically for that part?
l
I ca give u all my script :
But kinda messy ahah
n
please the section that’s involving the mapping / submission of 5k tasks would be more helpful :)
l
Yep but in my script I have 7 flows they all gonna make around 2-3k task... But for the one flow for exemple :
Inside the
tmdb_update_person_with_changes_list
for exemple
line
191
We cant just disable api call but still using
@task
?
Mhh I dont know what can I do... Also during the my script Im making a big sql request (SELECT id from table) with around 4 millions items. But the script crash in my VPS but not in my macbook, so its maybe something is the power ? My VPS only have 1vCPU and 2gb RAM 😅
With an 8gb of RAM and 4vCPU in VPS its working but still hit the rate limite...
a
You can slow down task submission with a global concurrency limit to avoid triggering rate limits: https://docs.prefect.io/latest/guides/global-concurrency-limits/#use-cases
l
Ohh I’m gonna check that but we can’t just disable api call for a task ? I don’t really know how work the task system to make parallel task but in my case I don’t need log from task, only the parallel system
w
If the theory is that logging api calls are putting you over the rate limit and you would be all good if you disabled them, you can see what happens if you run the following command to disable logs.
Copy code
prefect config set PREFECT_LOGGING_TO_API_ENABLED=False
if that doesnt work I do think. my colleague’s suggestion above is the most elegant.
l
Yeah I probably should use the first method. The example show this :
Copy code
@flow
def my_flow():
    for _ in range(100):
        rate_limit("slow-my-flow", occupy=1)
        my_task.submit(1)
But in my case im using
.map()
so its the same usage ? There is a part of my code :
Copy code
// INSIDE FUNCTION (FLOW)
...
for chunk in chunks:
				current_collections_to_update = []
				collection_details_futures = get_tmdb_collection_details.map(chunk)
The
get_tmdb_collection_details()
is a task so should I put the rate_limit function just before the map ?
My script hit the rate limit Ive try multiple thing, adding
rate_limit
before
map()
function, and also inside my task
Okay I think its working
n
catjamdid you get it work by using
rate_limit
?
l
I have to admit I dont know if is thank to the
rate_limit
or this command 🤣 :
Copy code
prefect config set PREFECT_LOGGING_TO_API_ENABLED=False
I have the
rate_limit
function before all my
.map()
function
But my script works now, I can sleep now ahah (I dont if I can increase the speed because 1h26m still to long I guess)
💤 1
n
great to hear you’ve got it working!
l
Yeah ! Last question (after I stop asking u dummy question ahah), how can I add access_token for a private github repository ? The docs say this :
Copy code
from prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret

my_flow = flow.from_source(
    source=GitRepository(
        url="<https://github.com/org/repo.git>",
        access_token=Secret.load("github-access-token").get(),
    ),
    entrypoint="flows.py:my_flow",
)

my_flow()
But I have an error :
Copy code
GitRepository.__init__() got an unexpected keyword argument 'access_token'
There is Credentials otherwise
n
hmm I think
GitRepository
accepts
credentials
- not
access_token
can you point me to the docs where you found that?
But what Im supposed to do for creating oAuth Prefect app on Github ?
n
ahh that is a mistake in our docs, apologies.
GitCredentials
accepts
credentials
not
access_token
as a kwarg. you should be able to do
Copy code
from prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret

my_flow = flow.from_source(
    source=GitRepository(
        url="<https://github.com/org/repo.git>",
        credentials=Secret.load("github-access-token"), # note im passing the loaded block itself
    ),
    entrypoint="flows.py:my_flow",
)

my_flow()
i will open a PR to fix that docs example
l
Im getting an error with :
Copy code
source=GitRepository(
        	url="<https://github.com/lxup/recomend-prefect.git>",
			credentials=Secret.load("github-access-token")
		),
`ValueError: Please provide a
token
or
password
in your Credentials block to clone a repo.`
Ive try with .get() also but another error : AttributeError: 'str' object has no attribute 'items'
This should work :
Copy code
source=GitRepository(
            url="<https://github.com/lxup/recomend-prefect.git>",
            credentials={"access_token": Secret.load("github-access-token").get()}
        ),
        entrypoint="flows/tmdb_update.py:tmdb_update",
but having this issue : ValueError: Please save your access token as a Secret block before converting this storage object to a pull step.
Okay my bad, this work :
Copy code
flow.from_source(
        source=GitRepository(
            url="<https://github.com/lxup/recomend-prefect.git>",
            credentials={"access_token": Secret.load("github-access-token")}
        ),
        entrypoint="flows/tmdb_update.py:tmdb_update",
    )
n
great! yeah we can definitely make the docs better here 😅 thanks for sharing what worked for you!
l
Nahh thx you for helping me all this time ! 🙏
n
no problem! 🤘