Dekel R
03/14/2022, 9:21 AMflow.run_config = VertexRun(scheduling={'timeout': '3600s'},
machine_type='n2-highcpu-80', labels=["ml"],
service_account=PREFECT_SERVICRE_ACCOUNT)
It works without the scheduling parameter - I added it and used this documentation -
https://docs.prefect.io/orchestration/flow_config/run_configs.html#vertexrun
Now when registering to Prefect cloud and running I get this error -
Parameter to MergeFrom() must be instance of same class: expected google.protobuf.Duration got str.
Am I missing something?
ThanksEmma Rizzi
03/14/2022, 9:54 AMPatrick.H
03/14/2022, 1:08 PMHugo Polloli
03/14/2022, 1:39 PMreturn True
and pass it, unused, to the next task. Is that ok or did I miss a way to do that more "beautifully" in the docs ? (I don't find it particularly bad, + with the fact that I suffix it with "_done" I think it's explicit, but was still wondering)
t1_done = task1()
t2_done = task2(t1_done)
....
Chris Reuter
03/14/2022, 2:12 PMNico Neumann
03/14/2022, 2:23 PMFlow(..., state_handlers=[…])
but the callback is called on the system where the flow is executed. My idea is to use StartFlowRun
e.g. on my local computer which starts the flow in AWS cloud. And every time the state changes (submitted
, running
, canceled
, etc.) a local callback function is called where I see the state
and flow_run_id
.Tyndyll
03/14/2022, 2:29 PMChris Reuter
03/14/2022, 4:59 PMAnna Geller
Chris Reuter
03/14/2022, 8:20 PMetem citil
03/14/2022, 8:38 PMApoorva Desai
03/14/2022, 9:40 PMfrom prefect.utilities.notifications import slack_notifier
and my tasks now look like this :
task(log_stdout=True, state_handlers=[slack_notifier])
install_snowflake_task = ShellTask(helper_script="pip install boto3 \
snowflake-connector-python[pandas] \
snowflake-ingest && pip install PyJWT==1.7.1", shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
install_dbt_task = ShellTask(helper_script="pip install dbt==0.18.0", \
shell="bash", stream_output=True, return_all=True, state_handlers=[slack_notifier])
My flow looks like
with Flow("name-of-flow", state_handlers=[slack_notifier]) as flow:
The flow runs successfully but I see no notifications on the slack channel that I have authorized for this. What am I doing wrong?Wieger Opmeer
03/14/2022, 10:54 PMStevon Shakey Eugene Crowder, Jr.
03/15/2022, 4:01 AMKevin Otte
03/15/2022, 4:03 AMScarlett King
03/15/2022, 12:55 PMMuddassir Shaikh
03/15/2022, 12:56 PMJacob Wilson
03/15/2022, 4:31 PMSubmitted for execution:
phase. I am using Docker
storage (The image is hosted in ECR and I’ve confirmed my execution role has access to the repo) and ECS Run
. The flow runs locally but not in ECS.
Dockerfile:
FROM prefecthq/prefect:latest-python3.8
WORKDIR /opt/prefect
COPY requirements.txt .
RUN pip install --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
Flow code:
import os
from prefect import Flow, task
from prefect.run_configs import ECSRun
from prefect.storage import Docker
@task(log_stdout=True)
def extract():
x = [4, 5, 6]
print("Starting: {}".format(x))
return x
@task
def transform(y):
return [i * 10 for i in y]
@task(log_stdout=True)
def load(z):
print("Received: {}".format(z))
with Flow("Test Flow", storage=Docker(dockerfile="Dockerfile", registry_url=os.getenv("REGISTRY_URL"), image_name=os.getenv("IMAGE_NAME"))) as flow:
e = extract()
t = transform(e)
l = load(t)
flow.run_config = ECSRun(
task_role_arn=os.getenv("TASK_ROLE_ARN"),
execution_role_arn=os.getenv("EXECUTION_ROLE_ARN")
)
Bradley Hurley
03/15/2022, 4:55 PMRenameFlowRun
. Would it be expected that in a downstream task if I call prefect.context.get("flow_run_name")
the new/updated name is returned?Xavier Babu
03/15/2022, 5:30 PMShaoyi Zhang
03/15/2022, 5:33 PMAdam Roderick
03/15/2022, 5:36 PMSarah Floris
03/15/2022, 5:53 PMRafael Sá
03/15/2022, 6:03 PMSarah Floris
03/15/2022, 6:27 PMRafael Sá
03/15/2022, 8:16 PMSarah Floris
03/15/2022, 8:38 PMSystem Version check: OK
/opt/prefect/healthcheck.py:130: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
flows = cloudpickle_deserialization_check(flow_file_paths)
Traceback (most recent call last):
File "/opt/prefect/healthcheck.py", line 130, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named "Module"
Brad
03/15/2022, 9:04 PMSet default storage to 'local'.
Traceback (most recent call last):
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/base.py", line 58, in wrapper
return fn(*args, **kwargs)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 120, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 67, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
return asynclib.run(func, *args, **backend_options)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
return native_run(wrapper(), debug=debug)
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
return await func(*args)
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/storage.py", line 136, in create
exit_with_success(f"Set default storage to {name!r}.")
File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/base.py", line 193, in exit_with_success
raise typer.Exit(0)
click.exceptions.Exit: 0
An exception occurred.
Darshan
03/15/2022, 9:33 PMBrad
03/15/2022, 10:11 PM