Stephane Boisson
02/17/2020, 10:13 PMStephane Boisson
02/17/2020, 10:48 PMmy_tak.map([1, 2, 3], ["A", "B"])
will execute 2 tasks for (1, "A"), (2, "B")
Is there an easy way to map over the 3x2 combos and having 6 tasks for (1, "A"), (2, "A"), (3, "A"), (1, "B"), (2, "B"), (3, "B")
?Greg Johnson
02/18/2020, 2:06 AMGreg Johnson
02/18/2020, 2:09 AMArsenii
02/18/2020, 6:22 AMmap
a function A over a list L, and then want another function B to continue working on elements of L in a parallel manner after A has finished.
Is my understanding correct, that in order to implement this, function A should basically return the same element it got as an input, and pass it to B? i.e. in Python/pseudocode
def func_A(list_element):
do_stuff(list_element)
return list_element
def func_B(list_element):
list_element+=2
with Flow(...):
L = Parameter("list", default=[1,2,3])
first_step = func_A.map(L)
second_step = func_B.map(first_step)
This seems to work, but I wonder whether this is the most efficient way to do this. I also noticed that, if I have several consecutive `map`s set up in the above manner A -> B -> C -> D-> ...
, it will not begin the next step until the previous step has finished executing for all mapped elements. Is there a way to go around this? Is it because I'm using LocalExecutor and not something like DaskExecutor?
Thanks!Chris O'Brien
02/19/2020, 1:52 AMFlow.run()
is a blocking call?Nathan Molby
02/19/2020, 3:40 PMfrom prefect import Flow, task
@task
def createX():
return []
@task
def alterX(x):
return x.append(5)
@task
def getZ(x):
return len(x)
with Flow("Main Flow") as flow:
x = createX()
alterX(x)
z = getZ(x, upstream_tasks=[alterX])
state = flow.run()
It should create x first, and then alter X, and then get Z with the altered x. Instead, it tries to alter x before x has been created. I could add the upstream task x to alterX, but I thought it should do that automatically because it is a data dependency.Mark Koob
02/19/2020, 3:54 PM@task
def get_fit_model(model, x, y):
model.fit(x, y)
return model.fit_model
I eventually realized that this was because the model.fit() operation mutates the model object, and later prefect tries to serialize the mutated object model
, even though only a part of it was used downstream. I was able to get around this by making a deepcopy() of the untrained model, on which the training was performed. I imagine this is due to the "greedy serialization" change Chris White mentioned a month or so back. I suppose the lesson here is that all operands must be serializable at all times.
I'm concerned that perhaps I would get better results if I was using result handlers. I'm also curious if this would have been easier to debug if I were running my flow in Prefect Cloud.Amit Singh
02/19/2020, 7:35 PMFailed: DID NOT RAISE <class 'prefect.engine.signals.SUCCESS'>
Amit Singh
02/19/2020, 7:37 PMwith raises(signals.SUCCESS) as excinfo:
result = some_lib.some_function()
print(result)
assert excinfo.type == signals.SUCCESS
assert excinfo.typename == 'SUCCESS'
Cab Maddux
02/20/2020, 1:15 PMmy_secret = Secret('MY_SECRET') # Where the JSON credential for a GCP service account has been copied to a Secret named 'MY_SECRET' via the cloud UI
upload_task = GCSUpload(project='foo', bucket='bar', ...)
upload_task(data='This is my data', blob='path/to/blob.txt', credentials=my_secret)
Cab Maddux
02/20/2020, 1:16 PMtrapped
02/20/2020, 3:12 PMRyanB
02/20/2020, 7:28 PMLuke Orland
02/21/2020, 4:24 PMFargateTaskEnvironment
.
1. Can it just run an existing ECS task definition, or does it always register a new task definition based on the specified family/taskDefinition?
2. If it can just run an existing ECS task definition, what would the minimal necessary arguments be, something like FargateTaskEnvironment(taskDefinition='my-ecr-task-definition')
, assuming AWS credentials are accessible by boto3 via the environment?
3. If it always generates/registers a new task definition, what would the minimal set of required arguments?
I could probably go through the process of figuring that out by running boto3.client('ecs').run_task(*args)
until I get the desired task definition running and then translate that into FargateTaskEnvironment
arguments. Curious if someone has gone through this process already.Pete Fein
02/22/2020, 3:42 PMNate Joselson
02/24/2020, 4:29 PMitay livni
02/24/2020, 7:07 PMtask
in a Flow
that takes a list
. How do I set rhe List
Trigger
to run on any_successful
upstream tasks?
https://docs.prefect.io/core/concepts/tasks.html#collections
I tried a couple approaches including def_list = List(trigger=any_successful) and then calling
munged_defs_df = definitions.munge_dfs(def_list([df1, df2, df3])
Justin Shimkovitz
02/24/2020, 7:27 PMmap
responses. When I send a list of elements to a task and one element sent to the task finishes and returns a response, is it possible to move that response to the next task before the rest of the original elements finish in the first task? Thanks in advance!Jeff Brainerd
02/24/2020, 10:21 PMpsutil
(a dependency of distributed
) on Catalina (OSX 10.15). This thread helped us get past it: https://github.com/giampaolo/psutil/issues/1632Jeremiah
Luke Orland
02/25/2020, 9:56 PMupstream_tasks
arg on Task.map
. "a list of upstream dependencies to map over".
What would mapping over the upstream dependencies do?Andrew Schechtman-Rook
02/26/2020, 2:30 AMJohn Ramirez
02/26/2020, 6:40 PMNico Aiello
02/26/2020, 9:14 PMNico Aiello
02/26/2020, 9:17 PMJohn Ramirez
02/27/2020, 3:04 PMMark Williams
02/27/2020, 5:13 PMAndor Tóth
02/28/2020, 2:20 PMAndor Tóth
02/28/2020, 2:20 PM