Mukamisha jocelyne
03/12/2022, 11:22 AMJames Harr
03/12/2022, 2:20 PMprefect deployment run test-flow/test-deployment
, the UI shows that the flow run is Pending. When I look at the logs in Kubernetes with kubectl logs -l job-name=voracious-antelope2b89g
, I see a FileNotFound stack trace
Thank you for your help!Tomer Cagan
03/13/2022, 12:21 PMDekel R
03/13/2022, 4:50 PMCREDENTIALS_SECRET_NAME = 'PREFECT_SERVICE_ACCOUNT_CREDENTIALS'
with Flow('comparable-products-data-extraction-development',
storage=Docker(registry_url="us-central1-docker.pkg.dev/xxx/",
dockerfile="./Dockerfile"), executor=LocalDaskExecutor(scheduler="processes")) as flow: # , schedule=daily_schedule
raw_credentials = PrefectSecret(CREDENTIALS_SECRET_NAME)
# raw_credentials = get_local_credentials()
google_credentials = parse_credentials(credentials=raw_credentials)
And here is the parse_credentials task -
from prefect import task
from google.oauth2 import service_account
@task()
def parse_credentials(credentials):
return service_account.Credentials.from_service_account_info(credentials)
This works fine in multiple flows when not using Dask executor - and it also runs without any problem locally when using Dask….
When running this in Prefect cloud - (PREFECT_SERVICE_ACCOUNT_CREDENTIALS is saved as a json secret, and it works in other flows) I get this Dask error - (see at the my first comment)
Any solution for this? what am I missing?
ThanksRaymond Yu
03/14/2022, 2:23 AMAyah Safeen
03/14/2022, 6:42 AMrun prefect server start
I get the following error
Pulling postgres ... error
Pulling hasura ... error
Pulling graphql ... error
Pulling apollo ... error
Pulling towel ... error
Pulling ui ...
richard-social 812
03/14/2022, 8:35 AM@task(result=LocalResult(dir=f'{os.getcwd()}/storage/pre_delinquency_models',
serializer=H2OModelSerializer()),
checkpoint=True, target='{date:%Y}-Week {date:%U}', log_stdout=True)
def train_new_model(data: pd.DataFrame):
I have it running on prefect cloud with a run_config that looks like below:
run_config = LocalRun(env={'PREFECT__FLOWS__CHECKPOINTING':os.environ.get('PREFECT__FLOWS__CHECKPOINTING', 'true')})
However, looking at the results folder I see that the task result file is created a new with each daily run . What am I missing?Dekel R
03/14/2022, 9:21 AMflow.run_config = VertexRun(scheduling={'timeout': '3600s'},
machine_type='n2-highcpu-80', labels=["ml"],
service_account=PREFECT_SERVICRE_ACCOUNT)
It works without the scheduling parameter - I added it and used this documentation -
https://docs.prefect.io/orchestration/flow_config/run_configs.html#vertexrun
Now when registering to Prefect cloud and running I get this error -
Parameter to MergeFrom() must be instance of same class: expected google.protobuf.Duration got str.
Am I missing something?
ThanksEmma Rizzi
03/14/2022, 9:54 AMPatrick.H
03/14/2022, 1:08 PMHugo Polloli
03/14/2022, 1:39 PMreturn True
and pass it, unused, to the next task. Is that ok or did I miss a way to do that more "beautifully" in the docs ? (I don't find it particularly bad, + with the fact that I suffix it with "_done" I think it's explicit, but was still wondering)
t1_done = task1()
t2_done = task2(t1_done)
....
Chris Reuter
03/14/2022, 2:12 PMNico Neumann
03/14/2022, 2:23 PMFlow(..., state_handlers=[…])
but the callback is called on the system where the flow is executed. My idea is to use StartFlowRun
e.g. on my local computer which starts the flow in AWS cloud. And every time the state changes (submitted
, running
, canceled
, etc.) a local callback function is called where I see the state
and flow_run_id
.Tyndyll
03/14/2022, 2:29 PMChris Reuter
03/14/2022, 4:59 PMAnna Geller
03/14/2022, 5:19 PMChris Reuter
03/14/2022, 8:20 PMetem citil
03/14/2022, 8:38 PMApoorva Desai
03/14/2022, 9:40 PMfrom prefect.utilities.notifications import slack_notifier
and my tasks now look like this :
task(log_stdout=True, state_handlers=[slack_notifier])
install_snowflake_task = ShellTask(helper_script="pip install boto3 \
snowflake-connector-python[pandas] \
snowflake-ingest && pip install PyJWT==1.7.1", shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
install_dbt_task = ShellTask(helper_script="pip install dbt==0.18.0", \
shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
My flow looks like
with Flow("name-of-flow", state_handlers=[slack_notifier]) as flow:
The flow runs successfully but I see no notifications on the slack channel that I have authorized for this. What am I doing wrong?Wieger Opmeer
03/14/2022, 10:54 PMStevon Shakey Eugene Crowder, Jr.
03/15/2022, 4:01 AMKevin Otte
03/15/2022, 4:03 AMScarlett King
03/15/2022, 12:55 PMMuddassir Shaikh
03/15/2022, 12:56 PMJacob Wilson
03/15/2022, 4:31 PMSubmitted for execution:
phase. I am using Docker
storage (The image is hosted in ECR and I’ve confirmed my execution role has access to the repo) and ECS Run
. The flow runs locally but not in ECS.
Dockerfile:
FROM prefecthq/prefect:latest-python3.8
WORKDIR /opt/prefect
COPY requirements.txt .
RUN pip install --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
Flow code:
import os
from prefect import Flow, task
from prefect.run_configs import ECSRun
from prefect.storage import Docker
@task(log_stdout=True)
def extract():
x = [4, 5, 6]
print("Starting: {}".format(x))
return x
@task
def transform(y):
return [i * 10 for i in y]
@task(log_stdout=True)
def load(z):
print("Received: {}".format(z))
with Flow("Test Flow", storage=Docker(dockerfile="Dockerfile", registry_url=os.getenv("REGISTRY_URL"), image_name=os.getenv("IMAGE_NAME"))) as flow:
e = extract()
t = transform(e)
l = load(t)
flow.run_config = ECSRun(
task_role_arn=os.getenv("TASK_ROLE_ARN"),
execution_role_arn=os.getenv("EXECUTION_ROLE_ARN")
)
Bradley Hurley
03/15/2022, 4:55 PMRenameFlowRun
. Would it be expected that in a downstream task if I call prefect.context.get("flow_run_name")
the new/updated name is returned?Xavier Babu
03/15/2022, 5:30 PMShaoyi Zhang
03/15/2022, 5:33 PMAdam Roderick
03/15/2022, 5:36 PMSarah Floris
03/15/2022, 5:53 PMSarah Floris
03/15/2022, 5:53 PMKevin Kho
03/15/2022, 5:54 PMfrom prefect import Client
client = Client()
client.set_secret(name="MYSECRET", value="MY SECRET VALUE")
Sarah Floris
03/15/2022, 8:20 PMKevin Kho
03/15/2022, 8:22 PM