itay livni
03/12/2021, 8:02 PMtask
that went "rogue" during a run, (prefect cloud). Is there a way to delete that specific run. It is contorting the Run History visualizationBrian Mesick
03/12/2021, 8:41 PMAlex Papanicolaou
03/12/2021, 8:45 PMtqdm
inside a task? I know there would be issues around stdout and how the prefect logger works but maybe someone has thought about this.Charles Liu
03/12/2021, 9:15 PMRob Fowler
03/12/2021, 10:52 PMJay Sundaram
03/13/2021, 3:53 AMFrederick Thomas
03/13/2021, 4:42 PMimport requests
import os
def vent(flow:str,state:str):
msg = "Flow {0} finished in fail state '{1}'".format(flow, state)
g = os.environ.get("TEAMS_NOTIFICATION")# webhook link to Outlook for Teams
<http://requests.post|requests.post>(g, json={"text": msg})
check = lambda flow,state:vent(flow.name,state.message)
.
.
.
with Flow(flow_name, schedule=None,on_failure=check) as flow:
Which is probably not optimal but I just found the API and I don't have the time to tuck in at the moment. Any help is greatly appreciated. Thanks.CA Lee
03/14/2021, 4:42 AM0.14.12
,
Python version: 3.8.8
,
Docker base image: prefecthq/prefect:0.14.12-python3.8
My flows are registered using CI (Github Actions) with the following shell command in a loop over my flows, to avoid re-registration of flows on each push (adapted from this guide):
prefect register flow --skip-if-flow-metadata-unchanged
I develop my tasks separately from my flows, as a Python package, which then gets built as a Docker container and pushed to ECR.
_Expected behavior_:
---------------------
Flow pulls the latest flow code from S3 (from the most recent push), which then runs the latest task code from ECR
_Observed behavior_:
----------------------
On each flow registration, a new file is uploaded to S3.
However, on inspecting flow logs, the flow does not pull the latest version of the flow code stored in S3. This was verified by looking at the slugified-current-timestamp
of the S3 flow - an earlier version of the flow is being pulled instead.
I have also pulled image:latest
from ECR on my machine, dropped into a shell using local run
and confirmed that my task code in the Docker container was updated, but the flow code stored on S3 is not pulling the updated task code from ECR.
Storage:
STORAGE = S3(bucket="aws-ecs-flows")
Run config:
RUN_CONFIG = ECSRun(
image="repository-url/image:latest",
labels=['ecs']
)
Re-registering the flows without specifying --skip-if-flow-metadata-unchanged
results in the expected behavior, but also leads to unwanted flow version bumps in the Prefect Cloud UI.
The strange thing is that even though the tasks were developed (as a Docker container) separately from the flows, the flows doesn’t pull the latest task code from the Docker image, even when the image tag latest
is specified (the task code seems to stop at the version which identified that the flow metadata was unchanged i.e. idempotency_key = flow.serialized_hash()
, even if the task code was modified).
Appreciate if anyone has any insight on how to make this work for ECS! Kubernetes has a flag called imagePullPolicy: always
which may solve this issue, but I’m not aware if there is something similar for ECS.Warsame Bashir
03/14/2021, 4:20 PMflow.run_config = LocalRun()
and with Flow("ETL", run_config=LocalRun(labels=["dev", "ml"])) as flow:
Varun Joshi
03/15/2021, 4:53 AMThanneermalai Lakshmanan
03/15/2021, 8:28 AMMilly gupta
03/15/2021, 10:46 AMVipul
03/15/2021, 11:38 AMWill Milner
03/15/2021, 2:26 PMfoo | bar
it will only check the exit status of bar
any ideas on how to make prefect raise an error if either tasks fails?Ross
03/15/2021, 2:39 PMTomás Emilio Silva Ebensperger
03/15/2021, 3:01 PMitay livni
03/15/2021, 3:11 PMMatthew Blau
03/15/2021, 3:47 PMSamuel Hinton
03/15/2021, 4:18 PMAndrew Hannigan
03/15/2021, 5:51 PMgit
was designed as a fully distributed version control system, and doesn’t stop the majority of users from using it in a centralized workflow. For git
, solving the harder version control problem turned out to yield a solution that also worked better for the easier version control problem. Maybe that would also be the case for prefect and the workflow orchestration problem too.Robert Bastian
03/15/2021, 5:52 PM[2021-03-15 17:48:45,887] ERROR - rai-fargate | Error while deploying flow
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/agent/agent.py", line 414, in deploy_and_update_flow_run
deployment_info = self.deploy_flow(flow_run)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/prefect/agent/ecs/agent.py", line 322, in deploy_flow
resp = self.ecs_client.run_task(taskDefinition=taskdef_arn, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/botocore/client.py", line 357, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/botocore/client.py", line 676, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
I checked the registered task definition and I can see:
"compatibilities": [
"EC2"
],
When I do the same with 0.14.6 I see:
"compatibilities": [
"EC2",
"FARGATE"
],
Thx!Marwan Sarieddine
03/15/2021, 6:10 PMCharles Liu
03/15/2021, 6:38 PM# You'd set a secret in Prefect cloud containing your credentials as a JSON dictionary of the following form:
{"ACCESS_KEY": "your aws_access_key_id here",
"SECRET_ACCESS_KEY": "your aws_secret_access_key here"}
# For now the secret name is hardcoded, so you'd need to name the secret "AWS_CREDENTIALS"
# you'd then attach the secret to your `S3` storage as follows:
flow.storage = S3(..., secrets=["AWS_CREDENTIALS"])
Assuming AWS_CREDENTIALS has been declared on the Prefect Cloud side, is this how we pass functions into both storage=storage_type(secrets=['']) and KubernetesRun(image_pull_secrets=['']) for now?Pedro Machado
03/15/2021, 7:08 PMJustin Chavez
03/15/2021, 7:50 PMStartFlowRun
is it possible to see the logs for each called flow? I am locally running a parent flow that calls multiple child flows using StartFlowRun
, but I can only see the success or failed status, not any of stdouts that some of the child flow tasks prints.Julio Venegas
03/15/2021, 8:00 PMadls_utils
, I'm using a logging.conf
file that gets read in the main module dataproject.py
. The flow is adls_create_day_dirs.py
. I start the local agent with the local_agent_start.txt
to set environment variables, i.e. that's where I set PREFECT__LOGGING__EXTRA_LOGGER
. @Jim Crist-Harif saw you've helped other people with this topic before, hence the mention but feel free to leave it for the community to answer :)daniel
03/15/2021, 9:19 PMasm
03/15/2021, 10:22 PMasm
03/15/2021, 10:22 PMCharles Liu
03/15/2021, 11:40 PM