flapili
02/23/2023, 2:32 PM"data": {
"type": "reference",
"serializer_type": "compressed/json",
"storage_block_id": "e3311389-34e1-491c-aaef-cbcf6e6ea977",
"storage_key": "e4940b1d2ada42a7a1ac77710cae4d18"
},
Sam Garvis
02/23/2023, 3:02 PMSteph Clacksman
02/23/2023, 3:31 PMMarcel
02/23/2023, 3:46 PMKyle Austin
02/23/2023, 3:46 PM10:44:28.065 | WARNING | Task run 'email_send_message-357' - Task run '52fa4b9c-deb0-407c-a992-ccde4685dfcd' received abort during orchestration: The enclosing flow must be running to begin task execution. Task run is in PENDING state.
I almost always I am getting the following error message
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
I am setting
persist_result=True
in all my task decorators too. Plus I have set concurrency limits tags so none of these tasks have more than 30 running at once. But all 5k tasks are still being submitted and created all at once!
Here is kinda how the code looks like in the flow now
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for email in emails:
email_send_message.submit(
subject=email.subject,
msg=email.rendered_html_template,
email_server_credentials=humana_smtp,
email_from=email.email_from,
email_to=<http://email.to|email.to>,
email_to_cc=<http://email.cc|email.cc>,
email_to_bcc=email.bcc,
attachments=email.attachments,
images=email.images,
dry_run=dry_run,
)
I have done something like this to prevent it from submitting all 5k at once and throttle it down to working with 50 at a time
email_chunks_for_sending = chunkify(emails, 50)
humana_smtp = EmailServerCredentials.load("some-smtp-server")
for chunk in email_chunks_for_sending:
wait_for_complete_object = []
for email in chunk:
sent = email_send_message.submit(
subject=email.subject,
msg=email.rendered_html_template,
email_server_credentials=humana_smtp,
email_from=email.email_from,
email_to=<http://email.to|email.to>,
email_to_cc=<http://email.cc|email.cc>,
email_to_bcc=email.bcc,
attachments=email.attachments,
images=email.images,
dry_run=dry_run,
)
wait_for_complete_object.append(sent)
[future.result() for future in wait_for_complete_object]
here chunkify I stole from another post on slack looks like
def chunkify(xs, size):
return (xs[pos : pos + size] for pos in range(0, len(xs), size))
Is there a way to set a limit to the number of tasks that are submitted to the task runner at a given time? Task concurrency didnt do the trick for me -- it only prevented the number of tasks running at given time.Jean-Michel Provencher
02/23/2023, 4:21 PMLeon Kozlowski
02/23/2023, 5:42 PMJacob Bedard
02/23/2023, 6:09 PMJacob Bedard
02/23/2023, 6:15 PMAric Huang
02/23/2023, 7:15 PMGhislain Picard
02/23/2023, 7:42 PMBianca Hoch
02/23/2023, 8:16 PMBianca Hoch
02/23/2023, 9:19 PMBilly McMonagle
02/23/2023, 9:59 PMKubernetesJob
infrastructure.Tomás Emilio Silva Ebensperger
02/23/2023, 11:00 PMPrefect 2.0
Is it possible to run a deployment from one server and have that flow picked up by an agent that is running on a different server?Tony Alfonse
02/23/2023, 11:32 PM#!/bin/bash
eval "$(conda shell.bash hook)"
conda activate ETL
prefect server start
And created this systemd service:
[Unit]
Description=Prefect
[Service]
Type=simple
ExecStart=/home/zeephremia/files_required/initate.sh
User=root
Group=root
[Install]
WantedBy=multi-user.target
Whenever I run the shell script locally, it works but whenever the systemd service triggers it, it throws this error:
prefect_service.service - Prefect
Loaded: loaded (/etc/systemd/system/prefect_service.service; enabled; vendor preset: enabled)
Active: inactive (dead) since Thu 2023-02-23 23:29:43 UTC; 3s ago
Process: 2233 ExecStart=/home/zeephremia/files_required/initate.sh (code=exited, status=0/SUCCESS)
Main PID: 2233 (code=exited, status=0/SUCCESS)
Feb 23 23:29:43 linux-server systemd[1]: Started Prefect.
Feb 23 23:29:43 linux-server initate.sh[2233]: Starting the script...
Feb 23 23:29:43 linux-server initate.sh[2234]: /home/zeephremia/files_required/initate.sh: line 5: conda: command not found
Feb 23 23:29:43 linux-server initate.sh[2235]: /home/zeephremia/files_required/initate.sh: line 6: conda: command not found
Feb 23 23:29:43 linux-server initate.sh[2237]: /home/zeephremia/files_required/initate.sh: line 7: prefect: command not found
Feb 23 23:29:43 linux-server initate.sh[2233]: Script completed.
Feb 23 23:29:43 linux-server systemd[1]: prefect_service.service: Succeeded.
Would really appreciate any help, thank you! 🙂Muhammad Tariq
02/24/2023, 12:05 AMn
successive flow failures of a deployment within a specified
time frame or without any time frame. Don't need alerts for every flow failure.
I went through the documentation here and it's not super clear.
<https://docs.prefect.io/ui/automations/#triggers>
Has anyone else tried doing something like this before and could offer some advice or tips? Thanks!Samuel Hinton
02/24/2023, 1:54 AMSamuel Hinton
02/24/2023, 2:41 AMSamuel Hinton
02/24/2023, 3:23 AM/Users/sh/Projects/flows/.venv/lib/python3.9/site-packages/prefect/tasks.py:270: UserWarning: A task named 'some_task' and defined at 'flows/data/endpoints/example.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/Users/sh/Projects/flows/.venv/lib/python3.9/site-packages/prefect/flows.py:214: UserWarning: A flow named 'get-some-data' and defined at 'flows/data/endpoints/example.py:9' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
13:20:46.203 | INFO | Flow run 'important-jaguarundi' - Created task run 'some_task-0' for task 'some_task'
Apparently both my task and my flow conflict with another…
This is… not possible? I’m playing around with prefect and have a single example.py
that contains a single flow and task. Is there a way I can figure out why prefect thinks theres a conflict, or just silence the spurious warnings? (Ideally Id like to figure out whats going on rather than ignoring the warnings)Farid
02/24/2023, 4:57 AM.map
, the whole flow fails immediately if one of the child tasks fail. I have tried passing return_state=True
but it doesn't seem be effective in mapped mode.Tanishq Hooda
02/24/2023, 5:22 AMDeployment.build_from_flow(
flow=my_flow,
name="my_flow",
parameters={},
infra_overrides={
"image": SHARED_CONFIG.value["prefect_image_name"],
"service_account_name": "my-sa",
"image_pull_policy": "Always",
"namespace": "mynamespace",
},
infrastructure={"block_type_slug": "kubernetes-job"},
work_queue_name="k8s",
work_pool_name="my-worker-pool", # doesn't work
storage=storage,
.....
.....
Even though I can update the worker pool from the UI but I want to do it from code. Prefect version : 2.7.7
Jamie Blakeman
02/24/2023, 10:11 AMChristianto
02/24/2023, 10:39 AMprefect build ...
it gives me error saying
ModuleNotFoundError: No module named 'gcsfs'
.
Meanwhile when i did conda list
. The gcsfs module is already installed but somehow the prefect cli can’t find it. Is there any workaround for this? Thanks!Stefan
02/24/2023, 11:06 AMAndrei Nikonov
02/24/2023, 12:00 PMCannot return null for non-nullable field flow_run.flow
when query GraphQL in Interactive API tab. I want to see flow name alongside with flow_run information, but I can’t get flow name. I see flow_id and I can fetch all names and map programmatically, but I hope there’s more convenient solution.Aaron Gonzalez
02/24/2023, 12:43 PMbigquery_query
here. Not that big of a deal but I just would prefer to see something a little more descriptive.
https://prefecthq.github.io/prefect-gcp/bigquery/#prefect_gcp.bigquery.bigquery_query
if await bigquery_query(
exists_query,
creds
):
print(f"Deleting data from the {sync_date} partition.")
await bigquery_query(
del_query,
creds
)
await bigquery_query(
load_query,
creds
)
Wellington Braga
02/24/2023, 2:31 PMprefect deployment ls --flow-name <flow-name>
Vadym Dytyniak
02/24/2023, 3:05 PMhttpx.HTTPStatusError: Client error '429 Too Many Requests'
Where can I read about the limits and best practices?John Mizerany
02/24/2023, 4:51 PM