https://prefect.io logo
Title
j

jaehoon

05/20/2021, 1:30 PM
i wanna pass result of task to flow as parameter, but error ocurred like this flow_run_id = client.create_flow_run( File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 1108, in create_flow_run res = self.graphql(create_mutation, variables=dict(input=inputs)) File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 302, in graphql params=dict(query=parse_graphql(query), variables=json.dumps(variables)), File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\__init__.py", line 231, in dumps return _default_encoder.encode(obj) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 199, in encode chunks = self.iterencode(o, _one_shot=True) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 257, in iterencode return _iterencode(o, 0) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' TypeError: Object of type TaskMetaclass is not JSON serializable help me!
k

Kevin Kho

05/20/2021, 2:02 PM
Hey @jaehoon, do you have some code you can share?
j

jaehoon

05/21/2021, 4:53 AM
Hi kevin! 😃 here is prefect code !!
from prefect import Flow, task, Parameter, Task
from prefect.tasks.prefect import StartFlowRun

from PrefectLogger import logger

flow_run = StartFlowRun(flow_name='inner-test-flow', project_name='pipeline')


class Dummy(Task):
    def run(self):
        return dict(x='1')

dummy = Dummy(name = 'dummy')

def registeration():
    with Flow("admin-flow") as flow:
        flow_run.set_upstream(dummy, key='parameters')
    flow.register(project_name='pipeline')


@task
def logging(x):
    <http://logger.info|logger.info>(x)


def register_test2():
    with Flow('inner-test-flow') as flow:
        x = Parameter('x', '0')
        logging(x)
    flow.register(project_name='pipeline')



if __name__ == '__main__':
    registeration()
    register_test2()
    # run_tasks()
k

Kevin Kho

05/21/2021, 12:23 PM
I think the issue here is
flow_run.set_upstream(dummy, key='parameters')
. dummy is still a task but hasn’t been used. Can you try something like this?
class Dummy(Task):
    def run(self):
        return dict(x='1')

dummy = Dummy(name = 'dummy')

def registeration():
    with Flow("admin-flow") as flow:
        a = dummy()
        b = flow_run()
        b.set_upstream(a)
    flow.register(project_name='pipeline')
👍 1
This is how to use the task in the flow
🙏 1
💯 1
j

jaehoon

05/24/2021, 2:38 AM
Oh It works! I got how to use task in flow Today morning is happy thanks to you 😍
k

Kevin Kho

05/24/2021, 4:04 AM
Happy to hear @jaehoon! 🙂