Adam Shamlian
08/18/2021, 7:35 PM@task(result=LocalResult(...))
def a_do_something():
return MyClass(1, 2, 3)
with Flow('A') as child_A:
a_result = a_do_something() #
with Flow('B') as child_B:
b_param = Parameter('a_result_value')
# option 3 below: b_param.serializer = MyClassJSONSerializer() or similar
b_result = b_do_something(b_param)
with Flow('parent') as parent:
a_id = create_flow_run(name=child_A.name)
a_result = get_task_run_result(a_id, 'a_do_something')
create_flow_run(name=child_B.name, parameters={'a_result_value': a_result})
# also wait for a_id, set A upstream of B, wait for b_id
Because Parameters are/return? PrefectResults, they must be JSON serializable (indeed, if I just try the above I get a Type Error: MyClass is not JSON serializable
or similar). Is the expectation that I should be messing with:
1. the serialization of LocalResult from a_do_something
2. somehow injecting a serializer into Parameter (my attempts here have thus far failed - is this even possible?), or
3. modifying the b_param
result (see the commented line in child_B
flow)
Note: based on my reading of several docs pages regardings Results and Serializers, option 2 strikes me as the most intuitive place to look. That is, it seems Prefect wants you to solve this there.Kevin Kho
to_json
method, and then make it reconstructable with a from_json
method. That way, you can return <http://MyClass.to|MyClass.to>_json()
and in Flow b reconstruct it with the MyClass.from_json()
.
If you want Prefect to call these methods for you, I think this is where serialization would take place. If you create a serializer and attach it to the result of a_result
, get_task_run_result
will use the serializer attached to that result.
Does that make sense?Kevin Kho
Adam Shamlian
08/18/2021, 7:52 PM@task(result=LocalResult(...))
def a_do_something():
return MyClass(1, 2, 3)
with Flow('A') as child_A:
a_result = a_do_something()
a_result.serializer = MyClassJSONSerializer() # <------- new line
with Flow('B') as child_B:
b_param = Parameter('a_result_value') # because of the new line above,
# this should be handled "transparently" (at least as far as this B flow is concerned)
b_result = b_do_something(b_param)
with Flow('parent') as parent:
a_id = create_flow_run(name=child_A.name)
a_result = get_task_run_result(a_id, 'a_do_something')
create_flow_run(name=child_B.name, parameters={'a_result_value': a_result})
# also wait for a_id, set A upstream of B, wait for b_id
This is my interpretation of your second approach.Adam Shamlian
08/18/2021, 7:53 PMAdam Shamlian
08/18/2021, 7:55 PM@task(result=LocalResult(...))
def a_do_something():
return MyClass(1, 2, 3)
with Flow('A') as child_A:
a_result = a_do_something()
with Flow('B') as child_B:
b_param = Parameter('a_result_value')
b_result = b_do_something(b_param)
with Flow('parent') as parent:
a_id = create_flow_run(name=child_A.name)
a_result = get_task_run_result(a_id, 'a_do_something') # explicit json call ----- ↓
create_flow_run(name=child_B.name, parameters={'a_result_value': a_result.value.to_json()})
# also wait for a_id, set A upstream of B, wait for b_id
correct?Kevin Kho
to_json
with whatever approach you take just to get it in the Parameter. You can also serialize it as a binary blob though and pass that as the Parameter
then unpickle in child_B
. @task(result=LocalResult(serializer=xxx))
instead of setting it inside the Flow?Kevin Kho
to_json
and from_json
might not see the exact methods because the result will convert them to json. Like it might be to_dict
and from_dict
. Just has to have a JSON Serializeable representation.Adam Shamlian
08/18/2021, 8:17 PMHave you triedyeah. still seemed to be a PickleSerializer instead of my custom one, despite trying this. as for serialization and parameter being one or two issues, I suppose thats semantics 😛 ultimately, the question is "how can I pass a "complex" object as a Parameter?" so maybe the serialization element is in a sense an XY problem. to me, it seems really odd that I can't do the following in a flow:instead of setting it inside the Flow?@task(result=LocalResult(serializer=xxx))
with Flow('test') as f:
p = Parameter('my_complex_param', serializer=MyComplexParamJSONSerializer)
while I could break down the incoming result into a ton of primitive params, the primitives are always consumed together, so it makes sense to pass them as single param. im guessing the real expectation is "if you have a complex param, massage it into a dict." am I right to think that?Kevin Kho
@task
syntax should work. Sorry I’m not trying to be pedantic or anything 😂. Just trying to think through the problem, but yes I think you’re right they are linked. I was thinking for a moment they are separate because solving serialization doesn’t solve passing to the Parameter but never mind.
The thing with the p = Parameter('my_complex_param', serializer=MyComplexParamJSONSerializer)
is that this assumes some complex object was sent to the Flow as a payload to the API request, and that’s where I think the difficulty is because the payloads are in JSON. Does that make sense? I do understand the thought though.
Massage to a dict or list yes, I think that’s what you have to do.Adam Shamlian
08/18/2021, 8:27 PMAdam Shamlian
08/19/2021, 2:04 AMto_dict
and from_dict
does work, however, attempting to serialize the result from A with a custom serializer does not. Looking at the source for Parameter, I definitely see the possibility for injecting a custom serializer (one that must be JSON-based), although in all of this, I've come to better understand what the philosophy is here (granted, I think it could be made more explicit in the docs).
Parameters are meant to passed around as:
• Primitive types
• "Complex" objects via a massage-to/from-dict (dicts being primitive, themselves)
• "Complex" objects via out-of-band storage and passing the URI as a parameter "proxy"