Arlo Bryer
10/29/2019, 3:58 PMGeorge Coyne
10/30/2019, 5:35 PMall_finished
trigger but for whatever reason the task is running first. Thanks in advanceAliza Rayman
10/31/2019, 2:20 PMWai Kiat Tan
11/01/2019, 1:54 AMitay livni
11/03/2019, 7:25 PMValueError: Flow.run received the following unexpected parameters: unused_param1, unused_param5
Perhaps there can be an error message for unused params?itay livni
11/03/2019, 7:26 PMArgemiro Neto
11/04/2019, 11:38 PMTsang Yong
11/05/2019, 5:45 PMTsang Yong
11/05/2019, 5:46 PMTsang Yong
11/05/2019, 5:48 PMimport IPython
from prefect import task, Flow, Parameter
my_list = [1, 3, 4]
@task
def map1_fn(item):
new_item = [item+1, item+1]
print("map1_fn: {}".format(item))
return new_item
@task
def map2_fn(item):
# expecting item = 2 but got [2, 2] etc
print("map2_fn: {}".format(item))
return item
with Flow("workflow") as flow:
my_other_list = map1_fn.map(item=my_list)
my_another_list = map2_fn.map(item=my_other_list)
flow.run()
RyanB
11/06/2019, 4:51 PMdhume
11/06/2019, 7:56 PMadd_flow
any flow that relies on those shared functions. Is there a simple work around without turning the shared functionality into it’s own library and adding it to python_dependencies
Tsang Yong
11/07/2019, 12:10 AMMaikel Penz
11/07/2019, 3:13 AMtask retries
. Shouldn't the code below retry
only every 30 seconds ?
When I run it the output goes from 1 to 40 in less than a second and finishes. I expected it would take 30(seconds) x 40 runs to finish (as per my if loop_count == 40:
to stop)
from prefect import Flow, task, context
from prefect.engine.signals import LOOP
import datetime
import time
@task(max_retries=50, retry_delay=datetime.timedelta(seconds=30))
def test_retry():
loop_count = context.get("task_loop_count", {})
print(loop_count)
if loop_count == 40:
return "finished"
raise LOOP(message="Next run")
with Flow("test-pipeline") as flow:
test_retry()
flow.run()
Tsang Yong
11/07/2019, 5:25 PMTsang Yong
11/07/2019, 7:57 PMMelanie Kwon
11/07/2019, 8:52 PMMitchell Bregman
11/08/2019, 9:48 PMExecutors
as well as the DaskExecutor
object - which seems to be the proper choice. Now, when I start exploring this idea of mapping
and connecting these task dependencies together, I get a little flustered without a more complex Prefect pipeline example... If it were possible, would you be able to point me to a larger scale example on GH or elsewhere; something that has multiple modules + a nicely defined project structure?Brad
11/09/2019, 12:19 AMclick
requirement from daskBraun Reyes
11/11/2019, 4:43 AMBraun Reyes
11/11/2019, 4:44 AMBrett Naul
11/11/2019, 4:55 AMdef add(x, y):
return x + y
def add_to_x(y):
x = Parameter('x')
total = task(add)(x, y)
with Flow('flow') as f:
add_to_x(1)
add_to_x(2)
in this case you could just pass in x
to the “factory” function instead, but in practice I have lots of parameters so it feels a bit clumsy. I’m sure there was a good reason for enforcing that parameters be unique; but doesn’t the fact that we’re able to assert that it’s unique mean that we could also just grab a reference to the already-existing task? 🤔Phoebe Bright
11/11/2019, 1:58 PMJason
11/11/2019, 4:19 PMJason
11/11/2019, 4:20 PMMitchell Bregman
11/11/2019, 9:28 PMget_module_metadata
i am returning a class object; is this the reason for the doubly directed dependency arrow? it seems as though in all other tasks, where I am returning standard python data types, do not have a doubly...Tsang Yong
11/12/2019, 12:26 AMdask-worker[65048]: distributed.core - INFO - Event loop was unresponsive in Worker for 3.21s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
itay livni
11/12/2019, 8:49 PMflow
from a flow
? Is it advisable? if so should I call main()
or the flow
directly? main()
being a function in the module with the other flow
.RyanB
11/13/2019, 12:16 AMBrett Naul
11/13/2019, 5:05 PM