https://prefect.io logo
Title
m

Mark Koob

02/19/2020, 3:54 PM
Hi folks! We use Prefect core, and I recently updated Prefect from 0.7.1 to 0.9.2. This caused some issues with a task which used a model type which can't be serialized after training:
@task
def get_fit_model(model, x, y):
    model.fit(x, y)
    return model.fit_model
I eventually realized that this was because the model.fit() operation mutates the model object, and later prefect tries to serialize the mutated object
model
, even though only a part of it was used downstream. I was able to get around this by making a deepcopy() of the untrained model, on which the training was performed. I imagine this is due to the "greedy serialization" change Chris White mentioned a month or so back. I suppose the lesson here is that all operands must be serializable at all times. I'm concerned that perhaps I would get better results if I was using result handlers. I'm also curious if this would have been easier to debug if I were running my flow in Prefect Cloud.
c

Chris White

02/19/2020, 5:28 PM
Hi @Mark Koob! Unless you’re providing a result handler to your Flow and setting a checkpointing environment variable I wouldn’t expect Prefect to attempt to serialize your output here — could you share a traceback that you’re seeing?
m

Mark Koob

02/19/2020, 5:29 PM
I think I can come up with an MRP. I'll be back at my desk this afternoon and see if I can come up with something. I think the only traceback I have is crazy since it was running on a distributed cluster.
c

Chris White

02/19/2020, 5:30 PM
ahhh ok so that’s dask that’s doing the serialization then, not Prefect; dask has to
cloudpickle
your outputs in order to send them to other machines, and it seems that whatever you’re returning can’t be serialized
m

Mark Koob

02/20/2020, 4:36 PM
Ok finally got the MRP sorted. It was more complicated than I remembered. Before I share that, though, it involves this class:
class SubFlow(object):
    def __init__(self, mutated):
        with Flow("subflow") as f:
            success = mutate(mutated)
        self.flow = f
        self.output = success
I think this is obviously wrong, because we're referring to an object inside the flow context from outside without parameterizing it, and for some reason that is causing dask to want to serialize it after the flow has finished running. I think the correct question is how do I get the object from outside the flow into the flow? A Parameter seems like the obvious choice, but the guy stitching the subflows together into a higher level flow won't know the name to give it when he calls
composite_flow.run()
. Is there an obvious answer for this case?
Of course if you're interested in the entire example I'm happy to share that as well!
c

Chris White

02/20/2020, 5:29 PM
Hmm a Parameter would be the correct way - in general Prefect seeks to avoid the trap of implicit dependencies by representing everything as a task, especially runtime inputs. Your downstream user could inspect the available parameter names by calling `flow.parameters()`perhaps?
m

Mark Koob

02/20/2020, 6:40 PM
Hmmm, I already have some scaffolding for stitching flows together by connecting inputs and outputs. I think I can just make the object a task by constructing it inside of the flow context and then connecting it to the subflow the same way I have done with the other inputs. Thanks for your insight Chris.
👍 1