Loup
01/07/2024, 11:33 PMNate
01/08/2024, 4:57 AMLoup
01/08/2024, 10:31 AMLoup
01/08/2024, 10:35 AMFlow run infrastructure exited with non-zero status code:
Task flow-run-0b5fa2b4-816f-4e52-a960-6338bac43f22-5775ec5191cd45h4n-task0 failed with message: The container exited with an error. (Error Code: 1)
Brandon Reid
01/08/2024, 1:14 PMLoup
01/08/2024, 1:47 PMLoup
01/08/2024, 2:33 PMNate
01/08/2024, 3:21 PMNate
01/08/2024, 3:24 PMMarvin
01/08/2024, 3:24 PMNate
01/08/2024, 3:25 PMNate
01/08/2024, 3:26 PMMarvin
01/08/2024, 3:26 PMLoup
01/08/2024, 4:01 PMNate
01/08/2024, 4:02 PM.serve
method and maybe put that up as a systemd process that revives itselfNate
01/08/2024, 4:03 PMI dont know how can I add pip library to the docker image 😅I'll mention that you can just build an image
FROM prefecthq/prefect:2-latest
and that will have everything you need as far as prefect is concernedLoup
01/08/2024, 4:04 PMLoup
01/08/2024, 4:05 PMNate
01/08/2024, 4:11 PMserve
does the whole job of the Process worker - if you have
# my_new_deployment.py
from prefect import flow
@flow(log_prints=True)
def some_flow():
pass
if __name__ == "__main__":
some_flow.serve("my_new_deployment")
then you can do python my_new_deployment.py
and that will start a long-lived process analogous to a process worker
the only nice thing about a worker is that if you wanted to add another script (i.e. deployment) later to run in the same place, you'd just need to create the deployment (prefect deployment somefile.py:somefunc -n new-deployment -p my-process-pool
) and the process worker would be able to pull it in (without having to change anything else)
@Marvin concisely give a systemd config file to run the above script on linux as a serviceMarvin
01/08/2024, 4:11 PMsystemd
, you can create a service file (let's call it my_flow_service.service
) with the following configuration:
ini
[Unit]
Description=My Flow Service
After=network.target
[Service]
Type=simple
User=<YourUsername>
WorkingDirectory=<YourScriptDirectory>
ExecStart=/usr/bin/python3 my_new_deployment.py
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
Replace <YourUsername>
with your Linux username and <YourScriptDirectory>
with the full path of the directory where your script my_new_deployment.py
is located.
You would then place this file in the /etc/systemd/system/
directory. Enable and start the service using the following commands:
shell
sudo systemctl enable my_flow_service.service
sudo systemctl start my_flow_service.service
This will set up your script as a systemd service that starts automatically, runs continuously in the background, and restarts in case of failure.Nate
01/08/2024, 4:13 PMExecStart=/usr/path/to/my/venv/prefect worker start -p my-process-pool
Loup
01/08/2024, 4:17 PM.serve
we can't specify worker pool no ? And no I wanna have multiple script so maybe docker is better I guess, but dont know how add rule for adding pip library for my scriptNate
01/08/2024, 4:20 PMBut withcorrect. you'd have towe can't specify worker pool no ?.serve
pip install mylib
into the virtual environment where you'd start the serve
process
if you end up wanting to go the docker route, you'd just have to
• create a work pool (I'd recommend push pool as mentioned earlier), but it could be docker
too if you wanted to run docker on your VM
• swap .serve
for .deploy
on your script
◦ pass in the reference to your Dockerfile
to .deploy
which does some RUN pip install mylib
on top of the prefect base imageLoup
01/08/2024, 4:45 PMLoup
01/08/2024, 4:46 PMrequirement.ts
) :
# We're using the latest version of Prefect with Python 3.10
FROM prefecthq/prefect:2-python3.10
# Add our requirements.txt file to the image and install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir
# Add our flow code to the image
COPY flows /opt/prefect/flows
# Run our flow script when the container starts
CMD ["python", "flows/tmdb_update_person.py"]
Loup
01/08/2024, 4:49 PMNate
01/08/2024, 4:50 PMPush pool is only with AWS, Google and Azure right ?yeah gotcha, how much exactly it costs depends on how much runtime you use but if you've already got a VM then I'd just install docker there and start a docker worker there like i mentioned above w systemd
Nate
01/08/2024, 4:51 PM# Run our flow script when the container starts
CMD ["python", "flows/tmdb_update_person.py"]
from the Dockerfile you pass to .deploy
the docker work pool will use a command
we need to invoke the engine / track the exectuionLoup
01/08/2024, 4:52 PMNate
01/08/2024, 4:53 PMLoup
01/08/2024, 4:55 PM...python code before
@task
def set_last_update_date(current_date: datetime) -> None:
supabase.table('recomend_tmdb_changes').insert([{
'date': current_date.strftime("%Y-%m-%d"),
'success': True,
}]).execute()
@flow(name="tmdb_update_person", log_prints=True)
def tmdb_update_person():
current_date = datetime.now()
last_update_date = get_last_update_date()
print(f"current_date: {current_date}")
print(f"last_update_date: {last_update_date}")
# tmdb_check_changes_person_ids(current_date,last_update_date)
tmdb_check_export_person_ids(current_date,last_update_date)
# set_last_update_date(current_date)
if __name__ == "__main__":
flow.from_source(
source="<https://github.com/lxup/recomend-prefect.git>",
entrypoint="tmdb_update_person.py:tmdb_update_person",
).deploy(
name="tmdb_update_person",
work_pool_name="VPS_Worker_1",
cron="0 6 * * *"
)
Loup
01/08/2024, 4:56 PMpython3 name
. So how can I give him the dockerfile with the pip install
commandNate
01/08/2024, 5:44 PMtest.py
I can either install pip install EXTRA_PIP_PACKAGES
at runtime or go ahead and pass a DeploymentImage
like this
so I can:
• run this python test.py
on my local
• this will create the deployment on prefect cloud
• create flow runs from my deployment that will be picked up by a worker (k8s work pool for me here, docker work pool for you on your VM)Loup
01/08/2024, 6:59 PMLoup
01/08/2024, 7:00 PMLoup
01/08/2024, 7:00 PMNate
01/08/2024, 7:09 PMmap
/ submit
interface, which allows you to leverage multithreading without implementing it yourself
for example, this will run in about 5 seconds, as each task is executed in its own worker threadLoup
01/08/2024, 7:27 PMLoup
01/08/2024, 7:28 PMdict
error so Im gonna check ur video because right now its gonna take 6h just for updating person. I also need to update movieLoup
01/08/2024, 7:28 PMNate
01/08/2024, 7:32 PM.submit
/ and .map
which I think would be directly useful for optimizing your code
in general I'd just recommend refactoring your code so that heavy IO / network bound stuff is in tasks that you can map (run concurrently)
feel free to #C04DZJC94DC about VPS stuff, he'll probably know more than me 🙂Loup
01/08/2024, 7:38 PMNate
01/08/2024, 7:57 PMNate
01/08/2024, 7:57 PMLoup
01/08/2024, 9:37 PMNate
01/08/2024, 9:42 PMLoup
01/08/2024, 11:06 PMLoup
01/09/2024, 10:34 AMLoup
01/09/2024, 10:34 AMNate
01/09/2024, 1:15 PMNate
01/09/2024, 1:15 PMLoup
01/09/2024, 1:25 PMfrom prefect import flow
from prefect.blocks.system import Secret
@flow(name="tmdb_update", log_prints=True)
def tmdb_update():
print("Starting tmdb_update")
tmdb_update_person()
# tmdb_update_person()
if __name__ == "__main__":
flow.from_source(
source="<https://github.com/lxup/recomend-prefect.git>",
entrypoint="flows/tmdb_update.py:tmdb_update",
).deploy(
name="tmdb_update_person",
work_pool_name="VPS_Worker_1",
cron="0 6 * * *",
job_variables={
"env": {
"EXTRA_PIP_PACKAGES": "supabase tqdm more_itertools",
}
}
)
is it possible ?Nate
01/09/2024, 1:29 PMNate
01/09/2024, 1:32 PMNate
01/09/2024, 1:36 PMNate
01/09/2024, 1:47 PMLoup
01/10/2024, 6:10 PMLoup
01/10/2024, 6:10 PMLoup
01/10/2024, 6:10 PMtmdb_update
is a flow but tmdb_update_language
is a task)Nate
01/10/2024, 6:13 PMLoup
01/10/2024, 6:15 PMNate
01/10/2024, 6:16 PMNate
01/10/2024, 6:16 PMLoup
01/11/2024, 3:32 PMNate
01/11/2024, 3:35 PMNate
01/11/2024, 3:35 PMLoup
01/11/2024, 3:40 PM@flow(name="tmdb_update", log_prints=True)
def tmdb_update():
print("Starting TMDB update")
tmdb_update_language()
tmdb_update_country()
tmdb_update_keyword()
tmdb_update_collection()
tmdb_update_company()
tmdb_update_person()
And each subflow contain task. For exemple there is the tmdb_update_person()
:
As u can see the get_tmdb_person_details
is a task because I call it with map inside the flow for making parallele task...Loup
01/11/2024, 3:42 PMLoup
01/11/2024, 3:44 PMLoup
01/11/2024, 3:44 PMLoup
01/11/2024, 3:52 PM@task
above a function im geting this :
'function' object has no attribute 'map'Loup
01/11/2024, 4:06 PMfrom multiprocessing.pool import Pool
instead using taskNate
01/11/2024, 4:08 PMLoup
01/11/2024, 4:09 PMNate
01/11/2024, 4:12 PMLoup
01/11/2024, 4:14 PMLoup
01/11/2024, 4:15 PMLoup
01/11/2024, 4:15 PMNate
01/11/2024, 4:16 PMLoup
01/11/2024, 4:29 PMLoup
01/11/2024, 4:29 PMNate
01/11/2024, 4:30 PMLoup
01/11/2024, 4:31 PMLoup
01/11/2024, 4:31 PMtmdb_update_person_with_changes_list
for exempleLoup
01/11/2024, 4:32 PM191
Loup
01/11/2024, 5:05 PM@task
?Loup
01/11/2024, 6:18 PMLoup
01/12/2024, 1:35 AMAndrew Brookins
01/12/2024, 6:40 AMLoup
01/12/2024, 8:27 PMWill Raphaelson
01/12/2024, 9:07 PMprefect config set PREFECT_LOGGING_TO_API_ENABLED=False
if that doesnt work I do think. my colleague’s suggestion above is the most elegant.Loup
01/12/2024, 9:15 PM@flow
def my_flow():
for _ in range(100):
rate_limit("slow-my-flow", occupy=1)
my_task.submit(1)
But in my case im using .map()
so its the same usage ?
There is a part of my code :
// INSIDE FUNCTION (FLOW)
...
for chunk in chunks:
current_collections_to_update = []
collection_details_futures = get_tmdb_collection_details.map(chunk)
The get_tmdb_collection_details()
is a task so should I put the rate_limit function just before the map ?Loup
01/13/2024, 10:13 AMrate_limit
before map()
function, and also inside my taskLoup
01/13/2024, 4:27 PMNate
01/13/2024, 4:28 PMrate_limit
?Loup
01/13/2024, 4:37 PMrate_limit
or this command 🤣 :
prefect config set PREFECT_LOGGING_TO_API_ENABLED=False
I have the rate_limit
function before all my .map()
functionLoup
01/13/2024, 4:39 PMNate
01/13/2024, 5:10 PMLoup
01/13/2024, 5:42 PMfrom prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret
my_flow = flow.from_source(
source=GitRepository(
url="<https://github.com/org/repo.git>",
access_token=Secret.load("github-access-token").get(),
),
entrypoint="flows.py:my_flow",
)
my_flow()
But I have an error :
GitRepository.__init__() got an unexpected keyword argument 'access_token'
Loup
01/13/2024, 5:45 PMNate
01/13/2024, 5:45 PMGitRepository
accepts credentials
- not access_token
Nate
01/13/2024, 5:46 PMLoup
01/13/2024, 5:46 PMLoup
01/13/2024, 5:46 PMLoup
01/13/2024, 5:47 PMNate
01/13/2024, 5:49 PMGitCredentials
accepts credentials
not access_token
as a kwarg. you should be able to do
from prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret
my_flow = flow.from_source(
source=GitRepository(
url="<https://github.com/org/repo.git>",
credentials=Secret.load("github-access-token"), # note im passing the loaded block itself
),
entrypoint="flows.py:my_flow",
)
my_flow()
i will open a PR to fix that docs exampleLoup
01/13/2024, 5:51 PMsource=GitRepository(
url="<https://github.com/lxup/recomend-prefect.git>",
credentials=Secret.load("github-access-token")
),
`ValueError: Please provide a token
or password
in your Credentials block to clone a repo.`Loup
01/13/2024, 5:52 PMLoup
01/13/2024, 5:53 PMsource=GitRepository(
url="<https://github.com/lxup/recomend-prefect.git>",
credentials={"access_token": Secret.load("github-access-token").get()}
),
entrypoint="flows/tmdb_update.py:tmdb_update",
but having this issue :
ValueError: Please save your access token as a Secret block before converting this storage object to a pull step.Loup
01/13/2024, 5:54 PMflow.from_source(
source=GitRepository(
url="<https://github.com/lxup/recomend-prefect.git>",
credentials={"access_token": Secret.load("github-access-token")}
),
entrypoint="flows/tmdb_update.py:tmdb_update",
)
Nate
01/13/2024, 5:55 PMLoup
01/13/2024, 5:59 PMNate
01/13/2024, 6:00 PM