https://prefect.io logo
f

Felipe Saldana

01/28/2021, 9:57 PM
If anyone could provide any guidance that would be great. I am trying to pass an instance of a custom task to a “sub flow”. Just wanted to know if that is even possible. My main flow internally calls StartFlowRun for another flow which can take parameters/results from the main flow. The dependencies and DAG look fine in the UI. If I pass a string for "author_info" my subflow/subtask gets that string. If I try to pass author_instance I get an exception:
Unexpected error: TypeError("Object of type 'TestTask' is not JSON serializable",)
TestTask is a custom task type I created and the run method returns a custom return type I created as well.
I am trying to accomplish as few things • Have the author_instance complete successfully before more_tranformations starts (this looks to be fine) • Pass data from a main flow to a “sub flow” (this looks to be fine as a string) • Access my custom return type from the sub flow (this is where I get the exception as I can’t pass author_instance) If I need to go about this another way please let me know
I saw this change in another thread and this got me a step closer. Now I am getting an exception stating that my custom class is not serializable ... this makes more sense: Unexpected error: TypeError("Object of type 'CustomRetVal' is not JSON serializable",)
Copy code
business_logic_layer = StartFlowRun(flow_name='more_tranformations',
                                       project_name="Felipe First",
                                       wait=True
                                        )(parameters = {"author_info": author_instance})
Does my class need to inherit from a base prefect class?
Copy code
class CustomRetVal():
    def __init__(self, val):
        self._val = val

    def get_val(self) -> str:
        return self._val
Copy code
def run(self, name, path_to_json, jd) -> CustomRetVal: .......
f

Fina Silva-Santisteban

01/29/2021, 10:08 PM
@Felipe Saldana we’re using Task Classes as well! That’s right, you’ll need to inherit from the prefect task class and you’ll need to override the run method. Example:
Copy code
from prefect import Task


class TaskDoSomething(Task):
    def run(self, args1, args2):
       # Do something
Alternatively, you can just create functions and use the task decorator
@task
. I hope this helps!
f

Felipe Saldana

01/29/2021, 10:56 PM
Hey @Fina Silva-Santisteban thanks for the feedback. I actually do have my custom task class inheriting from the Prefect Task ... that part is working great. The run() then returns a custom/plain class as well. This is the part that is causing an exception. Custom class from the run() works fine within the same flow but throws the exception when attempting to go into a subflow. A primitive type, such as str, works both within the same flow and going into a subflow.