Felix Horvat
08/11/2022, 1:03 PMDarren
08/11/2022, 7:19 PMEdmondo Porcu
08/11/2022, 11:36 PMmatt_innerspace.io
08/16/2022, 5:08 AMJack Prominski
08/16/2022, 1:38 PMCaio RogƩrio Silva dos Santos
08/16/2022, 7:23 PM.map
, mapping each task in the pipeline to one of the failed timestamps. This generates some issues, as all the data for each timestamp remains in memory during the flow run, exploding our resources limits and taking forever to run each task sequentially.
Talking with a coworker, we arrived at a quite elegant solution, which would be to create a capture flow run for each timestamp, distributing the work through multiple pods and cleaning up the code quite nicely. The problem is the task create_flow_run
does not work with .map
as it's giving me the error:
TypeError: Cannot map over unsubscriptable object of type <class 'pipelines.utils.custom.CustomFlow'>
So I'm here to ask, can someone point me to the direction of what would be the best practice to implement this idea? Before this, I've tried looking kinda deep into garbage collection to try and retrieve the memory used for each piece of data, to no avail, as we need to produce code with as low maintenance cost as possible and are looking for robustness where we can.
Thanks!Alex Wilcoxson
08/18/2022, 7:30 PM.map
vs asyncio.gather
on a list of started async tasks? Thanks!Jimmy Le
08/18/2022, 10:38 PM/etc/supervisor/conf.d/
folder.
I've named mine `prefect.conf`:
[program:prefect]
command=/home/usr/venv/bin/python /home/usr/venv/bin/prefect agent start --work-queue "queue-name"
autorestart=true
autostart=true
stderr_logfile=/var/log/prefect.err.log
stdout_logfile=/var/log/prefect.out.log
environment=PATH="/home/usr/venv/bin",PREFECT_API_URL="",PREFECT_API_KEY=""
eddy davies
08/19/2022, 1:51 PMSander
08/24/2022, 6:00 PMJiri Klein
08/26/2022, 3:31 PMLocalDaskExecutor
in V1 - I have two sets of mapped tasks, each set executing in parallel. Is there a way of informing Prefect to wait for 1 set to fully finish? I tried Task triggers with all_successful
flag, but to no avail, the second set of mapped tasks fails with TriggerFailed
.
Python pseudocode sample would be e.g.
from prefect.triggers import all_successful
from prefect import task, Flow
@task(name="Task A")
def task_a(input: Any):
return embarrassingly_parallel_func_1()
@task(name="Task B", trigger=all_successful)
def task_b(input: Any):
return embarrassingly_parallel_func_2(input)
with Flow() as flow:
flow.executor = "LocalDaskExecutor"
_coll = [1, 2, 3]
_a = task_a.map(input=_coll)
_b = task_b.map(input=_a, upstream_tasks=[task_a])
Unfortunately, for me some child tasks from task_b.map
either begin executing BEFORE all task_a.map
are finished OR they fail with TriggerFailed
Does anyone have any experience on this?Luca Vehbiu
08/26/2022, 6:02 PMJohn Kang
08/29/2022, 4:58 PMDaniel Lomartra
08/29/2022, 11:16 PM{
'dbt_env_var1': 'value1',
'dbt_env_var2': ['value2','value3','value4'],
'dbt_env_var3': {
'another_key': 'value5'
}
}
Will prefect accept this? will dbt_env_var2 and dbt_env_var3 be coercible back to their original data type in jinja or just passed as a string?Ghislain Picard
08/30/2022, 7:14 AMJordan Charlier
08/30/2022, 2:26 PMSlackbot
08/30/2022, 8:19 PMStƩphan Taljaard
08/31/2022, 9:30 AMflow.register(..., _*idempotency_key=flow.serialized_hash()*_, ...)
, only bumps the actual flow version if the hash changes. As expected.
Bundling this with flow-based storage (instead of pickle-based), it should work well in CICD processes. It is then only required to bump the flow version if metadata changes, allowing behaviour changes inside tasks without having to re-register the flow.
Let's say that you do change the innards of a task, the flow metadata does not change. Thus the serialized hash does not change. If you then run flow.register(..., idempotency_key=flow.serialized_hash(), ...)
:
1. A new flow script file will be built and pushed to storage (e.g. my GCS Bucket) (because the default value for build=True).
2. The flow version does not get bumped, as expected.
However, when creating a flow run, it still uses the previously uploaded flow script from GCS, not the new one just created (because the unchanged idempotency key prevents pointing to the new file in storage).
Thus, you either have to:
⢠force a re-registration anyway (e.g. by not specifying a idempotency key)
⢠"manually" upload a new flow script file into Storage, with the same name as the previous one, replacing the old one.
This does not feel as automated as reading the docs makes it sound like to be?
Am I missing something?Mike Kovetsky
08/31/2022, 2:16 PMRuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use `task.fn()`.
When i try tune.Tuner(prefect_task.fn), the flow runs successfully. But the task registration is skipped, so I see no progress in the prefect dashboard š
This point is really crucial, because I cannot run parallel tasks with RayTaskRunner, as there are no prefect tasks registered.
3. I wonder what is the best practice to manage max concurrency. Should i specify concurrency-limit 2 on the prefect level? Or maybe on Ray level tune.TuneConfig(max_concurrent_trials=2)?
Thank you in advance! šAbdullah Khan
08/31/2022, 6:29 PMJohn Kang
08/31/2022, 8:55 PMYoung Ho Shin
09/05/2022, 11:47 AMNaila Chennit
09/05/2022, 6:27 PM2.1.1
vesrion and when deploying my flows using kubernestes-job as infrastructure, im having this error below with manifest_path
argument
manifest_path
none is not an allowed value (type=type_error.none.not_allowed)
is it mendatory to set the manisfest_path
?
Here is my deployment def
deployment = Deployment.build_from_flow(
flow=my_flow,
name="tes-deployment",
version="1",
work_queue_name="kubernetes",
tags=["dev"],
ignorefile='.prefectignore',
storage=storage,
infrastructure=infra
)
Patryk Kalinowski
09/06/2022, 1:14 PMlink89
09/07/2022, 6:43 AMZi Yuan
09/07/2022, 11:26 AMAngel Acosta
09/07/2022, 7:06 PMStefan
09/07/2022, 7:19 PMFady Khallaf
09/07/2022, 11:20 PMgertjan
09/08/2022, 9:49 AMdescription
field? As you cannot set this via the cli?