John Ramirez02/17/2020, 5:12 PM
Stephane Boisson02/17/2020, 10:13 PM
Stephane Boisson02/17/2020, 10:48 PM
will execute 2 tasks for
my_tak.map([1, 2, 3], ["A", "B"])
Is there an easy way to map over the 3x2 combos and having 6 tasks for
(1, "A"), (2, "B")
(1, "A"), (2, "A"), (3, "A"), (1, "B"), (2, "B"), (3, "B")
Greg Johnson02/18/2020, 2:06 AM
Greg Johnson02/18/2020, 2:09 AM
Arsenii02/18/2020, 6:22 AM
a function A over a list L, and then want another function B to continue working on elements of L in a parallel manner after A has finished. Is my understanding correct, that in order to implement this, function A should basically return the same element it got as an input, and pass it to B? i.e. in Python/pseudocode
This seems to work, but I wonder whether this is the most efficient way to do this. I also noticed that, if I have several consecutive `map`s set up in the above manner
def func_A(list_element): do_stuff(list_element) return list_element def func_B(list_element): list_element+=2 with Flow(...): L = Parameter("list", default=[1,2,3]) first_step = func_A.map(L) second_step = func_B.map(first_step)
, it will not begin the next step until the previous step has finished executing for all mapped elements. Is there a way to go around this? Is it because I'm using LocalExecutor and not something like DaskExecutor? Thanks!
A -> B -> C -> D-> ...
Chris O'Brien02/19/2020, 1:52 AM
is a blocking call?
Nathan Molby02/19/2020, 3:40 PM
It should create x first, and then alter X, and then get Z with the altered x. Instead, it tries to alter x before x has been created. I could add the upstream task x to alterX, but I thought it should do that automatically because it is a data dependency.
from prefect import Flow, task @task def createX(): return  @task def alterX(x): return x.append(5) @task def getZ(x): return len(x) with Flow("Main Flow") as flow: x = createX() alterX(x) z = getZ(x, upstream_tasks=[alterX]) state = flow.run()
Mark Koob02/19/2020, 3:54 PM
I eventually realized that this was because the model.fit() operation mutates the model object, and later prefect tries to serialize the mutated object
@task def get_fit_model(model, x, y): model.fit(x, y) return model.fit_model
, even though only a part of it was used downstream. I was able to get around this by making a deepcopy() of the untrained model, on which the training was performed. I imagine this is due to the "greedy serialization" change Chris White mentioned a month or so back. I suppose the lesson here is that all operands must be serializable at all times. I'm concerned that perhaps I would get better results if I was using result handlers. I'm also curious if this would have been easier to debug if I were running my flow in Prefect Cloud.
Amit Singh02/19/2020, 7:35 PM
Failed: DID NOT RAISE <class 'prefect.engine.signals.SUCCESS'>
Amit Singh02/19/2020, 7:37 PM
with raises(signals.SUCCESS) as excinfo: result = some_lib.some_function() print(result) assert excinfo.type == signals.SUCCESS assert excinfo.typename == 'SUCCESS'
Cab Maddux02/20/2020, 1:15 PM
my_secret = Secret('MY_SECRET') # Where the JSON credential for a GCP service account has been copied to a Secret named 'MY_SECRET' via the cloud UI upload_task = GCSUpload(project='foo', bucket='bar', ...) upload_task(data='This is my data', blob='path/to/blob.txt', credentials=my_secret)
Cab Maddux02/20/2020, 1:16 PM
trapped02/20/2020, 3:12 PM
RyanB02/20/2020, 7:28 PM
Luke Orland02/21/2020, 4:24 PM
. 1. Can it just run an existing ECS task definition, or does it always register a new task definition based on the specified family/taskDefinition? 2. If it can just run an existing ECS task definition, what would the minimal necessary arguments be, something like
, assuming AWS credentials are accessible by boto3 via the environment? 3. If it always generates/registers a new task definition, what would the minimal set of required arguments? I could probably go through the process of figuring that out by running
until I get the desired task definition running and then translate that into
arguments. Curious if someone has gone through this process already.
Pete Fein02/22/2020, 3:42 PM
Nate Joselson02/24/2020, 4:29 PM
itay livni02/24/2020, 7:07 PM
that takes a
. How do I set rhe
to run on
upstream tasks? https://docs.prefect.io/core/concepts/tasks.html#collections I tried a couple approaches including def_list = List(trigger=any_successful) and then calling
munged_defs_df = definitions.munge_dfs(def_list([df1, df2, df3])
Justin Shimkovitz02/24/2020, 7:27 PM
responses. When I send a list of elements to a task and one element sent to the task finishes and returns a response, is it possible to move that response to the next task before the rest of the original elements finish in the first task? Thanks in advance!
Jeff Brainerd02/24/2020, 10:21 PM
(a dependency of
) on Catalina (OSX 10.15). This thread helped us get past it: https://github.com/giampaolo/psutil/issues/1632
Jeremiah02/25/2020, 5:46 PM
Luke Orland02/25/2020, 9:56 PM
. "a list of upstream dependencies to map over". What would mapping over the upstream dependencies do?
Andrew Schechtman-Rook02/26/2020, 2:30 AM
John Ramirez02/26/2020, 6:40 PM
Nico Aiello02/26/2020, 9:14 PM
Nico Aiello02/26/2020, 9:17 PM
John Ramirez02/27/2020, 3:04 PM
Mark Williams02/27/2020, 5:13 PM
Andor Tóth02/28/2020, 2:20 PM
Andor Tóth02/28/2020, 2:20 PM
itay livni02/28/2020, 2:29 PM
from prefect import task @task def add_something(a): n = a + 1 return n n = add_something.run(a)
Andor Tóth02/28/2020, 2:31 PM
Jeremiah02/28/2020, 2:51 PM