Kyle Niosco
09/23/2023, 7:01 AMSarika
09/23/2023, 4:20 PMmassumo
09/24/2023, 9:35 AMEric
09/24/2023, 11:10 PMprefect.yaml
file, is this the right way to supply my tasks with the necessary env vars?
definitions:
tags: &common_tags
- "feedback-insight"
- "demo"
work_pool: &common_work_pool
name: "demo-work-pool"
job_variables:
image: $BACKEND_IMAGE_URL
ENV_VAR_1: $ENV_VAR_1
ENV_VAR_2: $ENV_VAR_2
Seems strange that the docker image would be grouped with the env vars supplied to the pod / execution envSarhan
09/25/2023, 4:25 AMprefecthq/prefect:2.13.2-python3.10
). Since Saturday (23rd Sept) at 1pm (UTC+8), all our flow runs have been crashing. All prior flow runs were successful. Have verified internally that no settings changes were made to our orgβs AWS. There were no changes to any of our flows immediately before the crashes starting either.
The ECS logs for a single flow run are the following:
03:40:03.187 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Registering task definition...
03:40:03.600 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Creating task run...
03:40:04.104 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Waiting for task run to start...
03:40:04.139 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is PROVISIONING.
03:40:14.196 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is PENDING.
03:40:39.345 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is RUNNING.
03:40:44.349 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Running command 'python -m prefect.engine' in container 'prefect' (prefecthq/prefect:2.13.2-python3.10)...
03:40:44.632 | INFO | prefect.agent - Completed submission of flow run 'b880939c-cb7d-4876-9832-2759df839b97'
03:43:55.558 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is DEPROVISIONING.
03:44:10.742 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is STOPPED.
03:44:10.777 | WARNING | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Container 'prefect' exited with non-zero exit code 1.
03:44:11.070 | INFO | prefect.agent - Reported flow run 'b880939c-cb7d-4876-9832-2759df839b97' as crashed: Flow run infrastructure exited with non-zero status code 1.
No logs are recorded in the prefect console. No other stack trace info is given in the ECS logs.
What could be the problem?David Salgado
09/25/2023, 6:56 AMimport asyncio
import prefect.client.schemas.filters as filters
from prefect.client.orchestration import get_client
from prefect.server.schemas.states import StateType
run_ids = [ ... a list of the flow run IDs I'm interested in ... ]
client = get_client()
tasks = asyncio.run(
client.read_task_runs(
flow_run_filter=filters.FlowRunFilter(id=filters.FlowRunFilterId(any_=run_ids)), # <-- This works by itself
task_run_filter=filters.TaskRunFilterState(type=filters.TaskRunFilterStateType(any_=[StateType.FAILED])), # <-- this doesn't work
)
)
This fails with:
prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' for url '<https://api.prefect.cloud/api/accounts/8d8b3f47-6199-40be-98a9-1bc1b3056ee4/workspaces/71539e7c-8525-4c61-842e-90aba3248e79/task_runs/filter>'
Response: {'exception_message': 'Invalid request received.', 'exception_detail': [{'loc': ['body', 'task_runs', 'type'], 'msg': 'extra fields not permitted', 'type': 'value_error.extra'}], 'request_body': {'flows': None, 'flow_runs': {'id': {'any_': ['7924bcd8-8dea-46a0-a6f4-f47fc84002dd', 'ad2d8cd7-8302-4bf2-b434-5f923ded1ec3', '239fc509-f4b1-4b20-a37d-4a66c4d6ef37', '4e5fa333-ecf6-4a9b-bc7c-1572b8c1c4b9', '2d995267-d507-48c1-a2f4-d1b4ef60bbec']}}, 'task_runs': {'type': {'any_': ['FAILED']}, 'name': None}, 'deployments': None, 'sort': None, 'limit': None, 'offset': 0}}
I've tried a few variants for the task_run_filter
but I always get more of less the same error.Derek
09/25/2023, 8:00 AMsbrabez
09/25/2023, 8:35 AMEncountered exception during execution:
Traceback (most recent call last):
...
File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 98, in _get_state_result
result = await state.data.get()
File "/usr/local/lib/python3.10/site-packages/prefect/results.py", line 394, in get
raise MissingResult("The result was not persisted and is no longer available.")
prefect.exceptions.MissingResult: The result was not persisted and is no longer available.
Finished in state Failed('Flow run encountered an exception. MissingResult: The result was not persisted and is no longer available.\n')
We are using the persistence of the result on some tasks using the @task(persist_result=True)
but when click again Retry
for the flow to retry, we got the following error and we couldnβt go further with the flow. Is it a known issue? I saw couple of opened issues on GitHub related to persistence
β’ https://github.com/PrefectHQ/prefect/issues?q=is%3Aissue+is%3Aopen+MissingResult+
At this stage, we are not sure, what to do to unlock the flows and continue the progression π€. Any help to unblock the flow will be appreciated, thanks! π€Faheem Khan
09/25/2023, 9:00 AMMichael Urrutia
09/25/2023, 1:48 PMprefect-hightouch
package and am see the package sub flows, such as trigger-sync-run-and-wait-for-completion
, shown in the UI as standalone flows. Is this expected behavior? If yes, are we able to hide these?Andreas Nord
09/25/2023, 3:09 PMEric
09/25/2023, 6:02 PMprefect.yaml
file, is this the right way to supply my tasks with the necessary env vars?
definitions:
tags: &common_tags
- "feedback-insight"
- "demo"
work_pool: &common_work_pool
name: "demo-work-pool"
job_variables:
image: $BACKEND_IMAGE_URL
ENV_VAR_1: $ENV_VAR_1
ENV_VAR_2: $ENV_VAR_2
Seems strange that the docker image would be grouped with the env vars supplied to the pod / execution envEric
09/25/2023, 6:18 PMFuETL
09/25/2023, 7:29 PMValidationError
why this happen? All tasks show as "created". The only pattern that i noticed is this happen when i have alot of flows to run, is this a issue that is now fixed on v2? Why this happen?Eric
09/25/2023, 7:48 PMConstantino Schillebeeckx
09/25/2023, 8:49 PMYaron Levi
09/25/2023, 9:05 PMCasey M
09/25/2023, 9:25 PMfrom prefect_aws import AwsCredentials
, but I like to use the standardized boto3 when uploading files.
Here is the function:
def save_to_s3(directory, file_name, local_file_path):
s3 = boto3.resource('s3')
key = f"{directory}/{file_name}"
s3.meta.client.upload_file(local_file_path, 'test-updates', key)
Brian Newman
09/25/2023, 9:31 PMasync with httpx.AsyncClient(follow_redirects=True) as client:
auth_result = await get_auth(client=client)
project_status_inactive, project_data = await get_project_by_name(
client=client,
auth=auth_result,
project_name="test_project",
)
<http://logger.info|logger.info>(f"PROJECT STATUS: {project_status_inactive}")
The issue is after the first function(task) runs the client is being closed. How can I pass the client to many tasks?
100758.489 | ERROR | Flow run 'interesting-echidna' - Finished in state Failed('Flow run encountered an exception. RuntimeError: Event loop is closed')Sivanandha Rajadurai
09/25/2023, 11:49 PMSarhan
09/26/2023, 12:28 AMprefecthq/prefect:2.13.2-python3.10
). Since Saturday (23rd Sept) at 5am (UTC) though, all our flow runs have been crashing. All prior flow runs were successful. Have verified internally that no settings changes were made to our orgβs AWS. There were no changes to any of our flows immediately before the crashes starting either.
The ECS logs for a single flow run are here:
03:40:03.187 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Registering task definition...
03:40:03.600 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Creating task run...
03:40:04.104 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Waiting for task run to start...
03:40:04.139 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is PROVISIONING.
03:40:14.196 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is PENDING.
03:40:39.345 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is RUNNING.
03:40:44.349 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Running command 'python -m prefect.engine' in container 'prefect' (prefecthq/prefect:2.13.2-python3.10)...
03:40:44.632 | INFO | prefect.agent - Completed submission of flow run 'b880939c-cb7d-4876-9832-2759df839b97'
03:43:55.558 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is DEPROVISIONING.
03:44:10.742 | INFO | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Status is STOPPED.
03:44:10.777 | WARNING | prefect.infrastructure.ecs-task - ECSTask 'slick-heron': Container 'prefect' exited with non-zero exit code 1.
03:44:11.070 | INFO | prefect.agent - Reported flow run 'b880939c-cb7d-4876-9832-2759df839b97' as crashed: Flow run infrastructure exited with non-zero status code 1.
No logs are recorded in the prefect console. No other stack trace info is given in the ECS logs.
What could be the problem?Rob Fowler
09/26/2023, 2:16 AMββββββββββββββββββββββ³ββββββββββββββββ³βββββββββββββββββββββββββββββββββββββββ³ββββββββββββββββββββ
β Name β Type β ID β Concurrency Limit β
β‘ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ©
β default-agent-pool β process β 6fa5a366-8cb6-4e4a-9056-cfec8e58dc52 β 10 β
β eks-agent-pool β prefect-agent β afa2839e-0e51-44f8-928e-853383846208 β None β
ββββββββββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββββββββββββββββββββββββββ΄ββββββββββββββββββββ
But creating an agent pool is not valid:
create -t prefect-agent agent-test-pool
...Unknown work pool type 'prefect-agent'. Please choose from process.
Version: 2.13.2Lior Barak
09/26/2023, 2:34 PMNicholas Torba
09/26/2023, 3:34 PMfrom prefect import flow, task
import random
from time import sleep, time
import boto3
@task
def print_nums(nums):
for n in nums:
print(n)
@task
def square_num(num):
ran_num = random.randint(1,10)
if ran_num < 5:
raise Exception("less than 5")
print(f"sleeping for **{ran_num}** seconds")
sleep(ran_num)
return num**2
@flow
def map_flow(nums):
print_nums(nums)
start1 = time()
squared_nums = square_num.map(nums)
# squared_nums = [square_num.submit(i) for i in nums]
start2 = time()
print(start2 - start1, " seconds for map to start all tasks")
print_nums(squared_nums)
end = time()
print(end - start2, " seconds for map to finish all tasks")
map_flow([1,2,3,5,8,13])
Without the exception, prefect handling this flow great and does the async handling of each of those PrefectFuture results for me.
Is there any arg I could pass that would allow me to continue my flow with the results that are successfully received from the square_num
task, while just logging a warning of the ones that failed?
I know I can custom handle this situation by gettin the results from each PrefectFuture manually, but this seems like a case you would have a built in solution forShane Breeze
09/26/2023, 4:02 PMconcurrency(tag)
context and whenever I cancel a flow, the tasks donβt release the concurrency slot as they halt inside the context. Iβm looking for something like:
try:
with concurrency(tag):
...
except CancellationRequest as e:
raise e
John Mizerany
09/26/2023, 4:53 PMInternal Server Error
as the response. Was not sure if this was a known issueBob Colner
09/26/2023, 6:21 PMslack_notifier
from from prefect.utilities.notifications
. I've been using this with great success for years on data-pipeline but I'm getting a JWT error when trying to set it up on a new prefect 1.4 flow: Missing Authorization header in JWT authentication mode
. I have my slack url setup as an config.toml
secret just like in my working application. Any ideas about why I'm seeing this error?Eric
09/26/2023, 7:53 PMVang Xiong
09/26/2023, 10:17 PMMarvin AI
in our prefect account to test it out. However, I am unable to disable or turn it off? I see a setting that looks to be disabled already. I did try toggling it on and off, but we are still seeing the Marvin AI logs. There are no documentations on this anywhere. Does anyone here have any information to help us turn off this feature. ThanksHenri
09/26/2023, 11:00 PMfrom prefect.filesystems import S3
import os
def s3_storage() -> None:
block = S3(
bucket_path="prefect-portal-us-east-1-qa",
aws_access_key_id=os.getenv("QA_AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("QA_AWS_SECRET_ACCESS_KEY"),
).save("ecs-s3-qa", overwrite=True)
if __name__ == "__main__":
s3_storage()
An example
from prefect_aws import AwsCredentials
from prefect_aws.ecs import ECSTask
from prefect import flow, task
from prefect.deployments.deployments import Deployment
from prefect.filesystems import S3
[...]
@flow(log_prints=True)
def cool_numbers(nums=[1, 2, 3, 5, 8, 13]): # essentially map_flow
print_nums(nums)
squared_nums = square_num.map(nums)
print_nums(squared_nums)
if __name__ == "__main__":
aws_credentials_block = AwsCredentials.load("aws-credentials-qa") # loads my aws cred
ecs_task_block = ECSTask.load("ecs-cluster-agent-qa") # refers to the aws ecs cluster using
s3_block = S3.load("ecs-s3-qa") # doesn't really work
deployment = Deployment.build_from_flow(
flow=cool_numbers,
name="ecs-cool-numbers",
work_queue_name="default",
infrastructure=ecs_task_block,
path="/",
apply=True,
tags=["ecs-testing"],
)
The tl;dr is that prefect can't seem to upload to s3 and I can't execute the flow as its looking in s3.