Gabe Grand
05/18/2021, 4:23 PMLocalDaskExecutor
in combination with RunNamespacedJob
to launch a bunch of trivially parallelizable child processes on Kubernetes. However, it looks like only 2 child jobs are getting run at a time. Is there a way to increase the parallelism // are we doing this right? šMarko MuÅ”njak
05/18/2021, 6:49 PMCharles Liu
05/18/2021, 8:12 PMEnda Peng
05/18/2021, 9:10 PMprefect backend server (cloud)
Right now I am doing os.system("prefect backend cloud")
, not sure whether there is a more elegant wayChris McClellan
05/19/2021, 12:48 AMRanu Goldan
05/19/2021, 2:00 AMKubernetesRun(
labels=["agent:k8s-dev"],
cpu_limit=8,
memory_limit="12Gi",
cpu_request=0.5,
memory_request="512Mi",
)
And my executor was:
DaskExecutor(adapt_kwargs={"maximum": 16, "minimum": 1})
When I start a flowrun with these config, prefect agent create a new pod with this config: (image attached)
The problem is:
The dask worker always killed first before the GKE pod going for upscale, and prefect logging says error KilledWorker
Any idea how to perfectly adjust between DaskExecutor and KubernetesRun?Zach Schumacher
05/19/2021, 2:24 AMPeter Roelants
05/19/2021, 6:10 AM/venv/lib/python3.9/site-packages/prefect/engine/task_runner.py:865: UserWarning: This task is running in a daemonic subprocess; consequently Prefect can only enforce a soft timeout limit, i.e., if your Task reaches its timeout limit it will enter a TimedOut state but continue running in the background.
I'm running a scheduled flow via a Docker Agent with DaskExecutors (multiple processes due to CPU intensive nature of the task). Some of the tasks in the flow can hang and thus I've added a timeout on the task.
This warning scares me because, if I interpret it correctly, it might mean that certain processes might hang indefinitely.
What is the best way to avoid this and still benefit from running multiple CPU intensive tasks in parallel with Prefect?
(I'm running Prefect 0.14.19 and Dask 2021.5.0).Yohann
05/19/2021, 7:07 AMfrom prefect import Flow, task
from prefect.tasks.prefect import StartFlowRun
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from uuid import uuid4
@task
def run_subflow(item):
StartFlowRun(flow_name="my_sub_flow", project_name="test", wait=True, parameters={"item":item}).run(idempotency_key=str(uuid4()))
with Flow("my_flow", schedule=Schedule(clocks=[CronClock("0 0 * * *")]),) as flow:
run_subflow.map(item=["a", "b", "c"])
flow.run()
David
05/19/2021, 10:53 AMtask_definition_arn
when using Docker
storageā?
I have a flow that I want to register with docker (ECR) but I dont want to generate_task_definition every deploy (because besides the docker itself nothing changed)
Why cant I pass task arn when using docker storage?
ThanksMichael Hadorn
05/19/2021, 1:13 PMFailed to load and execute Flow's environment: GitlabGetError('404 Project Not Found')
We use it with a own hosted gitlab environment. Beside of the new based docker image, we changed nothing:
prefecthq/prefect:0.14.12-python3.8 -> prefecthq/prefect:0.14.17-python3.8
prefecthq/prefect:0.14.12-python3.8 -> prefecthq/prefect:0.14.19-python3.8
Till 0.14.16 it's working:
prefecthq/prefect:0.14.12-python3.8 -> prefecthq/prefect:0.14.16-python3.8
The new Git storage is not able to handle the company's certificate (this was my first try):
git storage: ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1131)
Are there any problems known?
We use it like this in the flow configuration:
, storage=GitLab(
host="<https://gitlab.xxx.ch>"
, repo="group/repo"
, path="test_flow.py"
, ref="feature"
, secrets=["GITLAB_ACCESS_TOKEN"]
)
Peter Roelants
05/19/2021, 2:04 PMon_failure
handler is called even when there is retry logic added to the Task, and the task still hasn't exhausted its max_retries
. (I think I just had a Task on which the on_failure
handler was called, even though it succeeded in a retry.)
Is there a way to add a failure handler that is called only after the task has exhausted all its retries and has truly failed?Newskooler
05/19/2021, 3:43 PMflows
folder (my folder is about 200GB of size. from just 2 days worth of flows)?
Ideally I would like to delete the flows data once a flow is completed with 100% success.Brian Mesick
05/19/2021, 4:31 PMPREFECT__USER_CONFIG_PATH=prod.toml
but prefect.config
no longer seems to have the values from the toml file. Any idea how we can work around this?Walee
05/19/2021, 5:42 PMBraun Reyes
05/19/2021, 6:10 PMkevin
05/19/2021, 7:43 PMJeff Williams
05/19/2021, 7:57 PMTo store:
prefect.context[label] = time.time()
To retrieve:
time_val = prefect.context.get(label)
Sean Talia
05/19/2021, 8:18 PMSƩbastien Arnaud
05/20/2021, 2:17 AMclient.register_worker_plugin
allows)Gee Xun Chen
05/20/2021, 3:48 AMcase
.Joƫl Luijmes
05/20/2021, 7:26 AMJocelyn Boullier
05/20/2021, 7:54 AMJoƫl Luijmes
05/20/2021, 1:35 PMdbt test
which generates loads of logging output, and it seems something starts to break then:
RuntimeError: reentrant call inside <_io.BufferedWriter name='logs/dbt.log'>
Newskooler
05/20/2021, 2:35 PMlocalhost:8080
. and I canāt go back to my local prefect⦠why is that and how can I fix this? š¤May
05/20/2021, 2:58 PMAdam Shamlian
05/20/2021, 3:18 PM@task
def generate_inputs_from_params(args):
#....
@task
def create_db_conn(args):
# ....
@task
def do_db_work(args):
# ....
@task
def do_some_other_work(args):
# ...
with Flow("example") as f:
# Parameter tasks
conn_inputs, db_work_inputs, other_work_inputs = generate_inputs_from_params(args) # from param tasks
conn_map = create_db_conn.map(conn_inputs)
res_map = do_db_work.map(conn_map, db_work_inputs)
res2_map = do_some_other_work(res_map, other_work_inputs)
# some reduce func if necessary
I have two questions about this:
1. Is that flow constructed properly - I'm ultimately after something like:
inputs = generate_inputs_from_params(args) # from param tasks
for (conn_input, db_work_input, other_work_input) in inputs:
conn = create_db_conn(conn_input)
res = do_db_work(conn, db_work_input)
res2 = do_some_other_work(res2, other_work_input)
2. When mapping over credentials dynamically, would I inject `Secret`s into the conn_inputs
or would I resolve the proper Secret
"within the for loop" (i.e. extending the map chain to include an additional layer that resolves `Secret`s)? My understanding of docs is that if I do the former, the secret data would be exposed in conn_inputs
, which in a distributed environment means that plaintext credentials could be making network hops, or in any environment would be persisted as part of Result
instances. I'd like to make sure I'm understanding this correctly.Jocelyn Boullier
05/20/2021, 3:43 PMJoe Schmid
05/20/2021, 5:23 PMZach Schumacher
05/20/2021, 7:08 PM