Loup
01/07/2024, 11:33 PMNate
01/08/2024, 4:57 AMLoup
01/08/2024, 10:31 AMLoup
01/08/2024, 10:35 AMFlow run infrastructure exited with non-zero status code:
Task flow-run-0b5fa2b4-816f-4e52-a960-6338bac43f22-5775ec5191cd45h4n-task0 failed with message: The container exited with an error. (Error Code: 1)Brandon Reid
01/08/2024, 1:14 PMLoup
01/08/2024, 1:47 PMLoup
01/08/2024, 2:33 PMNate
01/08/2024, 3:21 PMNate
01/08/2024, 3:24 PMMarvin
01/08/2024, 3:24 PMNate
01/08/2024, 3:25 PMNate
01/08/2024, 3:26 PMMarvin
01/08/2024, 3:26 PMLoup
01/08/2024, 4:01 PMNate
01/08/2024, 4:02 PM.serve method and maybe put that up as a systemd process that revives itselfNate
01/08/2024, 4:03 PMI dont know how can I add pip library to the docker image 😅I'll mention that you can just build an image
FROM prefecthq/prefect:2-latest and that will have everything you need as far as prefect is concernedLoup
01/08/2024, 4:04 PMLoup
01/08/2024, 4:05 PMNate
01/08/2024, 4:11 PMserve does the whole job of the Process worker - if you have
# my_new_deployment.py
from prefect import flow
@flow(log_prints=True)
def some_flow():
pass
if __name__ == "__main__":
some_flow.serve("my_new_deployment")
then you can do python my_new_deployment.py and that will start a long-lived process analogous to a process worker
the only nice thing about a worker is that if you wanted to add another script (i.e. deployment) later to run in the same place, you'd just need to create the deployment (prefect deployment somefile.py:somefunc -n new-deployment -p my-process-pool) and the process worker would be able to pull it in (without having to change anything else)
@Marvin concisely give a systemd config file to run the above script on linux as a serviceMarvin
01/08/2024, 4:11 PMsystemd, you can create a service file (let's call it my_flow_service.service) with the following configuration:
ini
[Unit]
Description=My Flow Service
After=network.target
[Service]
Type=simple
User=<YourUsername>
WorkingDirectory=<YourScriptDirectory>
ExecStart=/usr/bin/python3 my_new_deployment.py
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
Replace <YourUsername> with your Linux username and <YourScriptDirectory> with the full path of the directory where your script my_new_deployment.py is located.
You would then place this file in the /etc/systemd/system/ directory. Enable and start the service using the following commands:
shell
sudo systemctl enable my_flow_service.service
sudo systemctl start my_flow_service.service
This will set up your script as a systemd service that starts automatically, runs continuously in the background, and restarts in case of failure.Nate
01/08/2024, 4:13 PMExecStart=/usr/path/to/my/venv/prefect worker start -p my-process-poolLoup
01/08/2024, 4:17 PM.serve we can't specify worker pool no ? And no I wanna have multiple script so maybe docker is better I guess, but dont know how add rule for adding pip library for my scriptNate
01/08/2024, 4:20 PMBut withcorrect. you'd have towe can't specify worker pool no ?.serve
pip install mylib into the virtual environment where you'd start the serve process
if you end up wanting to go the docker route, you'd just have to
• create a work pool (I'd recommend push pool as mentioned earlier), but it could be docker too if you wanted to run docker on your VM
• swap .serve for .deploy on your script
◦ pass in the reference to your Dockerfile to .deploy which does some RUN pip install mylib on top of the prefect base imageLoup
01/08/2024, 4:45 PMLoup
01/08/2024, 4:46 PMrequirement.ts) :
# We're using the latest version of Prefect with Python 3.10
FROM prefecthq/prefect:2-python3.10
# Add our requirements.txt file to the image and install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt --trusted-host <http://pypi.python.org|pypi.python.org> --no-cache-dir
# Add our flow code to the image
COPY flows /opt/prefect/flows
# Run our flow script when the container starts
CMD ["python", "flows/tmdb_update_person.py"]Loup
01/08/2024, 4:49 PMNate
01/08/2024, 4:50 PMPush pool is only with AWS, Google and Azure right ?yeah gotcha, how much exactly it costs depends on how much runtime you use but if you've already got a VM then I'd just install docker there and start a docker worker there like i mentioned above w systemd
Nate
01/08/2024, 4:51 PM# Run our flow script when the container starts
CMD ["python", "flows/tmdb_update_person.py"]
from the Dockerfile you pass to .deploy
the docker work pool will use a command we need to invoke the engine / track the exectuionLoup
01/08/2024, 4:52 PMNate
01/08/2024, 4:53 PMLoup
01/08/2024, 4:55 PM...python code before
@task
def set_last_update_date(current_date: datetime) -> None:
supabase.table('recomend_tmdb_changes').insert([{
'date': current_date.strftime("%Y-%m-%d"),
'success': True,
}]).execute()
@flow(name="tmdb_update_person", log_prints=True)
def tmdb_update_person():
current_date = datetime.now()
last_update_date = get_last_update_date()
print(f"current_date: {current_date}")
print(f"last_update_date: {last_update_date}")
# tmdb_check_changes_person_ids(current_date,last_update_date)
tmdb_check_export_person_ids(current_date,last_update_date)
# set_last_update_date(current_date)
if __name__ == "__main__":
flow.from_source(
source="<https://github.com/lxup/recomend-prefect.git>",
entrypoint="tmdb_update_person.py:tmdb_update_person",
).deploy(
name="tmdb_update_person",
work_pool_name="VPS_Worker_1",
cron="0 6 * * *"
)Loup
01/08/2024, 4:56 PMpython3 name . So how can I give him the dockerfile with the pip install commandNate
01/08/2024, 5:44 PMtest.py
I can either install pip install EXTRA_PIP_PACKAGES at runtime or go ahead and pass a DeploymentImage like this
so I can:
• run this python test.py on my local
• this will create the deployment on prefect cloud
• create flow runs from my deployment that will be picked up by a worker (k8s work pool for me here, docker work pool for you on your VM)Loup
01/08/2024, 6:59 PMLoup
01/08/2024, 7:00 PMLoup
01/08/2024, 7:00 PMNate
01/08/2024, 7:09 PMmap / submit interface, which allows you to leverage multithreading without implementing it yourself
for example, this will run in about 5 seconds, as each task is executed in its own worker threadLoup
01/08/2024, 7:27 PMLoup
01/08/2024, 7:28 PMdict error so Im gonna check ur video because right now its gonna take 6h just for updating person. I also need to update movieLoup
01/08/2024, 7:28 PMNate
01/08/2024, 7:32 PM.submit / and .map which I think would be directly useful for optimizing your code
in general I'd just recommend refactoring your code so that heavy IO / network bound stuff is in tasks that you can map (run concurrently)
feel free to #C04DZJC94DC about VPS stuff, he'll probably know more than me 🙂Loup
01/08/2024, 7:38 PMNate
01/08/2024, 7:57 PMNate
01/08/2024, 7:57 PMLoup
01/08/2024, 9:37 PMNate
01/08/2024, 9:42 PMLoup
01/08/2024, 11:06 PMLoup
01/09/2024, 10:34 AMLoup
01/09/2024, 10:34 AMNate
01/09/2024, 1:15 PMNate
01/09/2024, 1:15 PMLoup
01/09/2024, 1:25 PMfrom prefect import flow
from prefect.blocks.system import Secret
@flow(name="tmdb_update", log_prints=True)
def tmdb_update():
print("Starting tmdb_update")
tmdb_update_person()
# tmdb_update_person()
if __name__ == "__main__":
flow.from_source(
source="<https://github.com/lxup/recomend-prefect.git>",
entrypoint="flows/tmdb_update.py:tmdb_update",
).deploy(
name="tmdb_update_person",
work_pool_name="VPS_Worker_1",
cron="0 6 * * *",
job_variables={
"env": {
"EXTRA_PIP_PACKAGES": "supabase tqdm more_itertools",
}
}
)
is it possible ?Nate
01/09/2024, 1:29 PMNate
01/09/2024, 1:32 PMNate
01/09/2024, 1:36 PMNate
01/09/2024, 1:47 PMLoup
01/10/2024, 6:10 PMLoup
01/10/2024, 6:10 PMLoup
01/10/2024, 6:10 PMtmdb_update is a flow but tmdb_update_language is a task)Nate
01/10/2024, 6:13 PMLoup
01/10/2024, 6:15 PMNate
01/10/2024, 6:16 PMNate
01/10/2024, 6:16 PMLoup
01/11/2024, 3:32 PMNate
01/11/2024, 3:35 PMNate
01/11/2024, 3:35 PMLoup
01/11/2024, 3:40 PM@flow(name="tmdb_update", log_prints=True)
def tmdb_update():
print("Starting TMDB update")
tmdb_update_language()
tmdb_update_country()
tmdb_update_keyword()
tmdb_update_collection()
tmdb_update_company()
tmdb_update_person()
And each subflow contain task. For exemple there is the tmdb_update_person() :
As u can see the get_tmdb_person_details is a task because I call it with map inside the flow for making parallele task...Loup
01/11/2024, 3:42 PMLoup
01/11/2024, 3:44 PMLoup
01/11/2024, 3:44 PMLoup
01/11/2024, 3:52 PM@task above a function im geting this :
'function' object has no attribute 'map'Loup
01/11/2024, 4:06 PMfrom multiprocessing.pool import Pool instead using taskNate
01/11/2024, 4:08 PMLoup
01/11/2024, 4:09 PMNate
01/11/2024, 4:12 PMLoup
01/11/2024, 4:14 PMLoup
01/11/2024, 4:15 PMLoup
01/11/2024, 4:15 PMNate
01/11/2024, 4:16 PMLoup
01/11/2024, 4:29 PMLoup
01/11/2024, 4:29 PMNate
01/11/2024, 4:30 PMLoup
01/11/2024, 4:31 PMLoup
01/11/2024, 4:31 PMtmdb_update_person_with_changes_list for exempleLoup
01/11/2024, 4:32 PM191Loup
01/11/2024, 5:05 PM@task ?Loup
01/11/2024, 6:18 PMLoup
01/12/2024, 1:35 AMAndrew Brookins
01/12/2024, 6:40 AMLoup
01/12/2024, 8:27 PMWill Raphaelson
01/12/2024, 9:07 PMprefect config set PREFECT_LOGGING_TO_API_ENABLED=False
if that doesnt work I do think. my colleague’s suggestion above is the most elegant.Loup
01/12/2024, 9:15 PM@flow
def my_flow():
for _ in range(100):
rate_limit("slow-my-flow", occupy=1)
my_task.submit(1)
But in my case im using .map() so its the same usage ?
There is a part of my code :
// INSIDE FUNCTION (FLOW)
...
for chunk in chunks:
current_collections_to_update = []
collection_details_futures = get_tmdb_collection_details.map(chunk)
The get_tmdb_collection_details() is a task so should I put the rate_limit function just before the map ?Loup
01/13/2024, 10:13 AMrate_limit before map() function, and also inside my taskLoup
01/13/2024, 4:27 PMNate
01/13/2024, 4:28 PMrate_limit ?Loup
01/13/2024, 4:37 PMrate_limit or this command 🤣 :
prefect config set PREFECT_LOGGING_TO_API_ENABLED=False
I have the rate_limit function before all my .map() functionLoup
01/13/2024, 4:39 PMNate
01/13/2024, 5:10 PMLoup
01/13/2024, 5:42 PMfrom prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret
my_flow = flow.from_source(
source=GitRepository(
url="<https://github.com/org/repo.git>",
access_token=Secret.load("github-access-token").get(),
),
entrypoint="flows.py:my_flow",
)
my_flow()
But I have an error :
GitRepository.__init__() got an unexpected keyword argument 'access_token'Loup
01/13/2024, 5:45 PMNate
01/13/2024, 5:45 PMGitRepository accepts credentials - not access_tokenNate
01/13/2024, 5:46 PMLoup
01/13/2024, 5:46 PMLoup
01/13/2024, 5:46 PMLoup
01/13/2024, 5:47 PMNate
01/13/2024, 5:49 PMGitCredentials accepts credentials not access_token as a kwarg. you should be able to do
from prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret
my_flow = flow.from_source(
source=GitRepository(
url="<https://github.com/org/repo.git>",
credentials=Secret.load("github-access-token"), # note im passing the loaded block itself
),
entrypoint="flows.py:my_flow",
)
my_flow()
i will open a PR to fix that docs exampleLoup
01/13/2024, 5:51 PMsource=GitRepository(
url="<https://github.com/lxup/recomend-prefect.git>",
credentials=Secret.load("github-access-token")
),
`ValueError: Please provide a token or password in your Credentials block to clone a repo.`Loup
01/13/2024, 5:52 PMLoup
01/13/2024, 5:53 PMsource=GitRepository(
url="<https://github.com/lxup/recomend-prefect.git>",
credentials={"access_token": Secret.load("github-access-token").get()}
),
entrypoint="flows/tmdb_update.py:tmdb_update",
but having this issue :
ValueError: Please save your access token as a Secret block before converting this storage object to a pull step.Loup
01/13/2024, 5:54 PMflow.from_source(
source=GitRepository(
url="<https://github.com/lxup/recomend-prefect.git>",
credentials={"access_token": Secret.load("github-access-token")}
),
entrypoint="flows/tmdb_update.py:tmdb_update",
)Nate
01/13/2024, 5:55 PMLoup
01/13/2024, 5:59 PMNate
01/13/2024, 6:00 PM