Arnaud Legendre
11/14/2019, 1:29 PMitay livni
11/14/2019, 8:57 PMmain
it runs a flow
. If I add a @task
decorator to main
is that an acceptable way to run a flow from a flow. or do you see any issues? ..Phoebe Bright
11/14/2019, 9:23 PMKyle Foreman (Convoy)
11/14/2019, 11:27 PMA -> B -> C
so that user 1 can create A -> B -> C -> X1 -> Y1
and user 2 can create A -> B -> C -> Q2
by simply importing and building off an existing A -> B -> C
flow?Walter Gillett
11/15/2019, 4:59 AMTobias Schmidt
11/18/2019, 9:47 AMA -> B -> C
. I want task A to be skippable via a command line argument to the Python script that implements that flow. What's the best way to do this? Can I mark A as skipped, set skip_on_upstream_skip
on B and C (or just B?) to False and then run the flow as usual? Or is there a more idiomatic way of doing this?Chris O'Brien
11/18/2019, 11:12 PMX1 -> X2 -> A -> B -> C
and Y1 -> A -> B -> C
. The idea is that A -> B -> C
are the methods for loading data into the database, so they take the transformed data from upstream and only run if they were successful.
Does this make any sense or is there a more Prefect way to attack this?Adam Roderick
11/19/2019, 11:41 PMThomas Adams
11/20/2019, 6:06 AMAliza Rayman
11/20/2019, 8:54 AMA->B->C
to pull and preprocess data, then in between processing which varies by flow, then G-H-I
to aggregate/ handle the result. It seems like update
isn't the ideal use case for this.
Right now I just have tasks in separate modules and import and run them all (so its a bit messy)Ron Van Buskirk
11/20/2019, 3:43 PM* having separate tasks for building each table (450+ tasks!):
Check_table1 -> ifelse -> Build_table1 |-> Check_table2 -> ifelse -> Build_table2 ...
|-> Check_table3 -> ifelse -> Build_table3 ...
|-> Check_table4 -> ifelse -> Build_table4 ...
* subclassing the Postgres execute task to create a check-and-build task:
Check_and_build_table1 |-> Check_and_build_table2 ...
|-> Check_and_build_table3 ...
|-> Check_and_build_table4 ...
* having a small number of tasks (check timestamp and existence, conditional, build) and using the map function to iterate the building of the required tables:
Check_table1 -> ifelse -> Build_table1 |-> Check_table.map(x) -> ifelse -> Build_table(x) ...
...
...
Still really new to Prefect... are any of these any good? Are there any other best practices I should consider?Agostino Calamia
11/20/2019, 4:32 PMChris O'Brien
11/21/2019, 2:09 AM@task
def condition():
if previous_task.Signal == 'FAIL':
return False
else:
return True
with Flow('test') as test:
output = task_1() #Fails with Exception
switch(condition, {True: yay_flow, False: boo_flow})
Or am I approaching this wrong?Steve Vandervalk
11/21/2019, 2:25 AMAliza Rayman
11/21/2019, 8:13 AMwith Flow('aggregate alerts') as flow:
ids = Parameter("ids")
response1 = task1.map(ids) # May return {"error": "error message"} or other data
response2 = task2.map(response1) # May return {"error": "error message"} or other data
...
result_file = taskX(responseX-1) # Aggreagtes all of the error messages with corresponding ids into 1 file
Thomas Adams
11/21/2019, 5:15 PMChris O'Brien
11/21/2019, 11:58 PMfrom prefect import task, Flow
from prefect.triggers import any_failed, some_failed
from prefect.tasks.control_flow.conditional import ifelse, merge, switch
@task
def three_outcomes():
return "dead_branch"
@task
def fail_branch():
print("i fail")
@task
def pass_branch():
print("i pass")
@task
def dead_branch():
print("im dead")
@task
def do_final_thing():
print("final")
with Flow("example") as flow:
switch(three_outcomes, dict(dead_branch=dead_branch, pass_branch=pass_branch, fail_branch=fail_branch))
do_final_thing.set_upstream(merge(pass_branch, fail_branch))
flow_state = flow.run()
flow.visualize(flow_state=flow_state)
Thomas Adams
11/22/2019, 1:25 AMGaurav Goel
11/22/2019, 10:30 AMagonen
11/24/2019, 7:33 PMKamil Okáč
11/25/2019, 9:14 PMfrom prefect import Flow, task
from prefect.engine.executors.dask import LocalDaskExecutor
@task
def add_ten(x):
return x + 10
if __name__ == '__main__':
with Flow('simple map') as flow:
mapped_result = add_ten.map([1, 2, 3])
executor = LocalDaskExecutor(scheduler='processes', num_workers=3)
flow.run(executor=executor)
The error is AssertionError: daemonic processes are not allowed to have children
Is there something wrong with this code or am I missing something else?Kamil Okáč
11/26/2019, 9:49 AMfrom prefect import Flow, task
@task
def arrtest(arr):
return ''.join(arr)
with Flow('ArrayTest') as flow:
r = arrtest(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k'])
print(flow.run().result[r].result)
Expected result: abcdefghijk
Actual result: ajkbcdefghisimon thelin
11/26/2019, 6:13 PMagent
. If I want to spin it up locally just to try I tried:
pipenv run prefect agent start
But I get that no token is provided, according to the DOC
it seems like: prefect.config.cloud.auth_token
should provide this to me automatically?
I don't really find the documentation in how to spin it up with the UI
really intuitive and if somebody can explain how to set the TOKEN
locally I would appreciate it.Jon Wolski
11/27/2019, 12:17 AMMatias
11/27/2019, 11:57 AMAttila
11/27/2019, 1:51 PMlaura
11/28/2019, 3:37 PMfrom prefect import task, Task, Flow, Parameter
class ParentExecutor(Task):
def run(self, parent_id):
# Do query on a database to return child id's
return ['child1', 'child2']
class ChildExecutor(Task):
def run(self, parent_id):
# Do query on a database to return child id's
return ['grandchild1', 'grandchild2']
class GrandchildExecutor(Task):
def run(self, parent_id):
# grab grandchild from db and do processing with it...
pass
with Flow("Test branching") as test_flow:
parent_id = Parameter('parent_id')
parent_exec = ParentExecutor()
child_ids = parent_exec(parent_id)
child_exec = ChildExecutor()
grandchild_ids = child_exec.map(child_ids)
grandchild_exec = GrandchildExecutor()
grandchild_exec.map(grandchild_ids)
state = test_flow.run(parent_id='parent_id123')
alvin goh
11/29/2019, 12:50 AMChristian Eik
11/29/2019, 11:49 AMpip install prefect[snowflake]
it tells me everything is already installed.Christian Eik
11/29/2019, 11:49 AMfrom prefect import Flow, task
from prefect.tasks.docker import CreateContainer
from prefect.tasks.snowflake import SnowflakeQuery
Christian Eik
11/29/2019, 11:49 AMfrom prefect import Flow, task
from prefect.tasks.docker import CreateContainer
from prefect.tasks.snowflake import SnowflakeQuery