Nadav
08/03/2021, 9:22 PMAiden Price
08/04/2021, 2:32 AMkwargs
for the next task downstream?
Something like;
# returns {"path": "some/restapi/path", "start_time": datetime...}
tasks = generate_tasks(first_set)
# Needs a path and start_time argument
histories = fetch_history.map(**tasks)
But this gives me the error expression after ** must be a mapping with a "str" key
st dabu
08/04/2021, 3:37 AMwith Flow("NLP") as nlp_flow:
aws s3 cp blah blah
sometool /data/a.csv
Is it possible to add the steps as cli commands instead ?Ranu Goldan
08/04/2021, 4:01 AMhaven
08/04/2021, 6:16 AMclass MyFlowRunner(prefect.engine.flow_runner.FlowRunner)
(or subclassing another component - open to suggestions!) inside a custom package my-prefect
to send some HTTP requests before a Flow
starts, and after it ends. My questions are:
• where would be the best way for me to implement such a functionality?
along with the considerations:
• keep the Prefect API to be as pure as possible
• allow the implementation of the MyFlowRunner
(or some other custom component) to be uncoupled, i.e. if a project upgrades my-prefect
, the flow configuration doesn't have to change at all
Would appreciate comments/suggestions/discussions from whoever are interested/have had experience doing so!Ben Muller
08/04/2021, 6:50 AMSamuel Tober
08/04/2021, 8:05 AMMichael Fichtinger
08/04/2021, 8:36 AMJoseph Ellis
08/04/2021, 10:05 AMJai Deo
08/04/2021, 10:41 AMSumit Kumar Rai
08/04/2021, 11:47 AMdbt deps
dbt snapshot
dbt run
For now I'm running them by reusing the instance of DbtShellTask:
dbt_task = DbtShellTask(name="Running dbt commands", ...)
dbt_clean_task_invoke = dbt_task("dbt deps")
dbt_deps_task_invoke = dbt_task("dbt snapshot")
dbt_run_task_invoke = dbt_task("dbt run")
With having this setup, my schematic will have three boxes(tasks) with name Running dbt commands
which is difficult to identify which is which.
In order to fix that, do I have to instantiate for each command like below.
dbt_clean_task = DbtShellTask(name="Running dbt deps command", ...)
dbt_deps_task = DbtShellTask(name="Running dbt snapshot command", ...)
dbt_run_task = DbtShellTask(name="Running dbt run command", ...)
dbt_clean_task_invoke = dbt_clean_task("dbt deps")
dbt_deps_task_invoke = dbt_deps_task("dbt snapshot")
dbt_run_task_invoke = dbt_run_task("dbt run")
Italo Barros
08/04/2021, 1:15 PMGustavo de Paula
08/04/2021, 2:31 PMHarry Baker
08/04/2021, 4:00 PM@task(task_run_name="{name_val}")
and then passing in a name_val variable would do it, but in the dashboard everything is still showing up as the name of the function definitionNacho Rodriguez
08/04/2021, 4:04 PMDan Zhao
08/04/2021, 4:43 PMHarry Baker
08/04/2021, 5:58 PMDavid Elliott
08/04/2021, 6:36 PMobject.task_run_name
it prints the format string used to generate the mapped task name, not the actual task name. I'm using the pattern here where my task_run_name = "{table_name}"
- and I get {table_name}
output in the state handler, because that's what it's set as at the Task level, rather than it being computed at the TaskRun levelMichael Law
08/04/2021, 6:51 PMSamuel Kohlleffel
08/04/2021, 6:55 PMidempotency_key
used when calling flow.register()
would take into account both flow and task changes.
From what I can tell, flow.serialized_hash()
only changes if changes are made to the flow's metadata. It does not take into account if a task within a flow changes.Harish
08/04/2021, 7:17 PMBen Muller
08/04/2021, 11:03 PMHui Zheng
08/04/2021, 11:35 PMNo heartbeat detected from the remote task; retrying the run.This will be retry 1 of 2.
In most situation, we do want to retry the run. But we have a particular flow that we actually don’t want any retrying when the run lost the heartbeat. we just it to fail. How could we do that?Ben Muller
08/05/2021, 1:23 AMproject
and the flow name
and get a list of currently active flows and their state ( running, or complete etc ).
Is this possible ?
Cant seem to find anything for the use case...Dotan Asselmann
08/05/2021, 8:05 AMSamuel Hinton
08/05/2021, 8:49 AMDan Zhao
08/05/2021, 9:27 AMChhaya Vankhede
08/05/2021, 11:05 AMflow_1
initializes the object and flow_2
uses that object, but I'm not able to achieve this. This is the example code not exact but similar to what I'm trying to achieve. adding code to threadPierre Monico
08/05/2021, 12:42 PMTim Enders
08/05/2021, 1:18 PMKeyError: 110
is? I think it is a Prefect error, though it may be Dask. The stack trace has it coming out of the executor.py code. (trace to follow in thread)