Constantino Schillebeeckx
08/03/2021, 5:22 PM.py
file? we've split out commonly shared functionality between flows into an e.g. utils.py
file which is referenced in the flow. given all the storage documentation this design doesn't seem to fit into the intended use of storage.David Elliott
08/03/2021, 6:32 PMprefect-job-xxxxx
would create 4 ephemeral dask workers (named something like dask-root-xxxx
)
• Now the behaviour I'm seeing is:
◦ K8s agent creates the prefect-job-xxx
◦ In the prefect-job
logs, it gives me _prefect.DaskExecutor | Creating a new Dask cluster with __main__.make_cluster
.Creating scheduler pod on cluster. This may take some time._
◦ there are then 5x dask-root-xxx pods
created, where 1 of them is a dask scheduler - ie the scheduler no-longer sits within the prefect-job-xx
? Just wanted to check if this was expected/intended behaviour - I couldn't see any reference to it in the prefect release notes
• In addition, (and this is more a side note that I think the prefect k8s rbac needs updating) - I've had to add 2x more rulesets to my k8s RBAC to make it work - see these docs for what's now required. Here is specifically what's changed vs the prefect docs
Thanks!Billy McMonagle
08/03/2021, 8:24 PMprefect create project "My Project"
... is it safe to run this command multiple times?Jeff Baatz
08/03/2021, 9:00 PMNadav
08/03/2021, 9:22 PMAiden Price
08/04/2021, 2:32 AMkwargs
for the next task downstream?
Something like;
# returns {"path": "some/restapi/path", "start_time": datetime...}
tasks = generate_tasks(first_set)
# Needs a path and start_time argument
histories = fetch_history.map(**tasks)
But this gives me the error expression after ** must be a mapping with a "str" key
st dabu
08/04/2021, 3:37 AMwith Flow("NLP") as nlp_flow:
aws s3 cp blah blah
sometool /data/a.csv
Is it possible to add the steps as cli commands instead ?Ranu Goldan
08/04/2021, 4:01 AMhaven
08/04/2021, 6:16 AMclass MyFlowRunner(prefect.engine.flow_runner.FlowRunner)
(or subclassing another component - open to suggestions!) inside a custom package my-prefect
to send some HTTP requests before a Flow
starts, and after it ends. My questions are:
• where would be the best way for me to implement such a functionality?
along with the considerations:
• keep the Prefect API to be as pure as possible
• allow the implementation of the MyFlowRunner
(or some other custom component) to be uncoupled, i.e. if a project upgrades my-prefect
, the flow configuration doesn't have to change at all
Would appreciate comments/suggestions/discussions from whoever are interested/have had experience doing so!Ben Muller
08/04/2021, 6:50 AMSamuel Tober
08/04/2021, 8:05 AMMichael Fichtinger
08/04/2021, 8:36 AMJoseph Ellis
08/04/2021, 10:05 AMJai Deo
08/04/2021, 10:41 AMSumit Kumar Rai
08/04/2021, 11:47 AMdbt deps
dbt snapshot
dbt run
For now I'm running them by reusing the instance of DbtShellTask:
dbt_task = DbtShellTask(name="Running dbt commands", ...)
dbt_clean_task_invoke = dbt_task("dbt deps")
dbt_deps_task_invoke = dbt_task("dbt snapshot")
dbt_run_task_invoke = dbt_task("dbt run")
With having this setup, my schematic will have three boxes(tasks) with name Running dbt commands
which is difficult to identify which is which.
In order to fix that, do I have to instantiate for each command like below.
dbt_clean_task = DbtShellTask(name="Running dbt deps command", ...)
dbt_deps_task = DbtShellTask(name="Running dbt snapshot command", ...)
dbt_run_task = DbtShellTask(name="Running dbt run command", ...)
dbt_clean_task_invoke = dbt_clean_task("dbt deps")
dbt_deps_task_invoke = dbt_deps_task("dbt snapshot")
dbt_run_task_invoke = dbt_run_task("dbt run")
Italo Barros
08/04/2021, 1:15 PMGustavo de Paula
08/04/2021, 2:31 PMHarry Baker
08/04/2021, 4:00 PM@task(task_run_name="{name_val}")
and then passing in a name_val variable would do it, but in the dashboard everything is still showing up as the name of the function definitionNacho Rodriguez
08/04/2021, 4:04 PMDan Zhao
08/04/2021, 4:43 PMHarry Baker
08/04/2021, 5:58 PMDavid Elliott
08/04/2021, 6:36 PMobject.task_run_name
it prints the format string used to generate the mapped task name, not the actual task name. I'm using the pattern here where my task_run_name = "{table_name}"
- and I get {table_name}
output in the state handler, because that's what it's set as at the Task level, rather than it being computed at the TaskRun levelMichael Law
08/04/2021, 6:51 PMSamuel Kohlleffel
08/04/2021, 6:55 PMidempotency_key
used when calling flow.register()
would take into account both flow and task changes.
From what I can tell, flow.serialized_hash()
only changes if changes are made to the flow's metadata. It does not take into account if a task within a flow changes.Harish
08/04/2021, 7:17 PMBen Muller
08/04/2021, 11:03 PMHui Zheng
08/04/2021, 11:35 PMNo heartbeat detected from the remote task; retrying the run.This will be retry 1 of 2.
In most situation, we do want to retry the run. But we have a particular flow that we actually don’t want any retrying when the run lost the heartbeat. we just it to fail. How could we do that?Ben Muller
08/05/2021, 1:23 AMproject
and the flow name
and get a list of currently active flows and their state ( running, or complete etc ).
Is this possible ?
Cant seem to find anything for the use case...Dotan Asselmann
08/05/2021, 8:05 AMSamuel Hinton
08/05/2021, 8:49 AMSamuel Hinton
08/05/2021, 8:49 AMdavzucky
08/05/2021, 10:47 AMSamuel Hinton
08/05/2021, 10:50 AM