Suppose that in a flow of flows, flow A has a task...
# ask-community
a
Suppose that in a flow of flows, flow A has a task that generates a "complex" object, e.g. a dataclass, that is then consumed by flow B as a parameter:
Copy code
@task(result=LocalResult(...))
def a_do_something():
    return MyClass(1, 2, 3)

with Flow('A') as child_A:
    a_result = a_do_something() # 

with Flow('B') as child_B:
    b_param = Parameter('a_result_value')
    # option 3 below: b_param.serializer = MyClassJSONSerializer() or similar
    b_result = b_do_something(b_param)

with Flow('parent') as parent:
    a_id = create_flow_run(name=child_A.name)
    a_result = get_task_run_result(a_id, 'a_do_something')
    create_flow_run(name=child_B.name, parameters={'a_result_value': a_result})
    # also wait for a_id, set A upstream of B, wait for b_id
Because Parameters are/return? PrefectResults, they must be JSON serializable (indeed, if I just try the above I get a
Type Error: MyClass is not JSON serializable
or similar). Is the expectation that I should be messing with: 1. the serialization of LocalResult from
a_do_something
2. somehow injecting a serializer into Parameter (my attempts here have thus far failed - is this even possible?), or 3. modifying the
b_param
result (see the commented line in
child_B
flow) Note: based on my reading of several docs pages regardings Results and Serializers, option 2 strikes me as the most intuitive place to look. That is, it seems Prefect wants you to solve this there.
k
Hey @Adam Shamlian, what a tough and well researched question! I think if this were me, I would make MyClass JSON Serializable by adding a
to_json
method, and then make it reconstructable with a
from_json
method. That way, you can
return <http://MyClass.to|MyClass.to>_json()
and in Flow b reconstruct it with the
MyClass.from_json()
. If you want Prefect to call these methods for you, I think this is where serialization would take place. If you create a serializer and attach it to the result of
a_result
,
get_task_run_result
will use the serializer attached to that result. Does that make sense?
But for passing to parameter, I think you really need to call that method because the content really needs to be stored in our database. Maybe you can pickle it?
a
To make sure I understand you correctly:
Copy code
@task(result=LocalResult(...))
def a_do_something():
    return MyClass(1, 2, 3)

with Flow('A') as child_A:
    a_result = a_do_something()
    a_result.serializer = MyClassJSONSerializer() # <------- new line

with Flow('B') as child_B:
    b_param = Parameter('a_result_value') # because of the new line above,
    # this should be handled "transparently" (at least as far as this B flow is concerned)
    b_result = b_do_something(b_param)

with Flow('parent') as parent:
    a_id = create_flow_run(name=child_A.name)
    a_result = get_task_run_result(a_id, 'a_do_something')
    create_flow_run(name=child_B.name, parameters={'a_result_value': a_result})
    # also wait for a_id, set A upstream of B, wait for b_id
This is my interpretation of your second approach.
Essentially, this was option 1 above. I have tried this (via the @task, rather than the assignment after-the-fact) and for some odd reason it seems like LocalResult's serializer isnt being overridden with my custom JSON serializer.
The other option you mentioned is:
Copy code
@task(result=LocalResult(...))
def a_do_something():
    return MyClass(1, 2, 3)

with Flow('A') as child_A:
    a_result = a_do_something()

with Flow('B') as child_B:
    b_param = Parameter('a_result_value')
    b_result = b_do_something(b_param)

with Flow('parent') as parent:
    a_id = create_flow_run(name=child_A.name)
    a_result = get_task_run_result(a_id, 'a_do_something') # explicit json call ----- ↓
    create_flow_run(name=child_B.name, parameters={'a_result_value': a_result.value.to_json()})
    # also wait for a_id, set A upstream of B, wait for b_id
correct?
k
Actually thinking about it more, you’re gonna need to make the
to_json
with whatever approach you take just to get it in the Parameter. You can also serialize it as a binary blob though and pass that as the
Parameter
then unpickle in
child_B
. I’m not sure the serializer plays into effect to much? Have you tried
@task(result=LocalResult(serializer=xxx))
instead of setting it inside the Flow?
Am I right in thinking the serializer and parameter are two separate issues as opposed to 1? Because reading and writing the result between flows is one problem. Passing it as a parameter in JSON-serializble form is another. I think
to_json
and
from_json
might not see the exact methods because the result will convert them to json. Like it might be
to_dict
and
from_dict
. Just has to have a JSON Serializeable representation.
a
Have you tried 
@task(result=LocalResult(serializer=xxx))
 instead of setting it inside the Flow?
yeah. still seemed to be a PickleSerializer instead of my custom one, despite trying this. as for serialization and parameter being one or two issues, I suppose thats semantics 😛 ultimately, the question is "how can I pass a "complex" object as a Parameter?" so maybe the serialization element is in a sense an XY problem. to me, it seems really odd that I can't do the following in a flow:
Copy code
with Flow('test') as f:
    p = Parameter('my_complex_param', serializer=MyComplexParamJSONSerializer)
while I could break down the incoming result into a ton of primitive params, the primitives are always consumed together, so it makes sense to pass them as single param. im guessing the real expectation is "if you have a complex param, massage it into a dict." am I right to think that?
k
The
@task
syntax should work. Sorry I’m not trying to be pedantic or anything 😂. Just trying to think through the problem, but yes I think you’re right they are linked. I was thinking for a moment they are separate because solving serialization doesn’t solve passing to the Parameter but never mind. The thing with the
p = Parameter('my_complex_param', serializer=MyComplexParamJSONSerializer)
is that this assumes some complex object was sent to the Flow as a payload to the API request, and that’s where I think the difficulty is because the payloads are in JSON. Does that make sense? I do understand the thought though. Massage to a dict or list yes, I think that’s what you have to do.
a
ok, that makes sense. I think I'm getting somewhere with all of the above. For the sake of closure, I will report back with a summary.
Following up here: Passing via the
to_dict
and
from_dict
does work, however, attempting to serialize the result from A with a custom serializer does not. Looking at the source for Parameter, I definitely see the possibility for injecting a custom serializer (one that must be JSON-based), although in all of this, I've come to better understand what the philosophy is here (granted, I think it could be made more explicit in the docs). Parameters are meant to passed around as: • Primitive types • "Complex" objects via a massage-to/from-dict (dicts being primitive, themselves) • "Complex" objects via out-of-band storage and passing the URI as a parameter "proxy"