Thread
#prefect-server
    j

    jaehoon

    1 year ago
    i wanna pass result of task to flow as parameter, but error ocurred like this flow_run_id = client.create_flow_run( File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 1108, in create_flow_run res = self.graphql(create_mutation, variables=dict(input=inputs)) File "c:\users\krims\appdata\local\pypoetry\cache\virtualenvs\data-pipeline-i-bq0ir6-py3.9\lib\site-packages\prefect\client\client.py", line 302, in graphql params=dict(query=parse_graphql(query), variables=json.dumps(variables)), File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json_init_.py", line 231, in dumps return _default_encoder.encode(obj) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 199, in encode chunks = self.iterencode(o, _one_shot=True) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 257, in iterencode return _iterencode(o, 0) File "C:\Users\krims\AppData\Local\Programs\Python\Python39\lib\json\encoder.py", line 179, in default raise TypeError(f'Object of type {o.class.name} ' TypeError: Object of type TaskMetaclass is not JSON serializable help me!
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @jaehoon, do you have some code you can share?
    j

    jaehoon

    1 year ago
    Hi kevin! 😃 here is prefect code !!
    from prefect import Flow, task, Parameter, Task
    from prefect.tasks.prefect import StartFlowRun
    
    from PrefectLogger import logger
    
    flow_run = StartFlowRun(flow_name='inner-test-flow', project_name='pipeline')
    
    
    class Dummy(Task):
        def run(self):
            return dict(x='1')
    
    dummy = Dummy(name = 'dummy')
    
    def registeration():
        with Flow("admin-flow") as flow:
            flow_run.set_upstream(dummy, key='parameters')
        flow.register(project_name='pipeline')
    
    
    @task
    def logging(x):
        <http://logger.info|logger.info>(x)
    
    
    def register_test2():
        with Flow('inner-test-flow') as flow:
            x = Parameter('x', '0')
            logging(x)
        flow.register(project_name='pipeline')
    
    
    
    if __name__ == '__main__':
        registeration()
        register_test2()
        # run_tasks()
    Kevin Kho

    Kevin Kho

    1 year ago
    I think the issue here is
    flow_run.set_upstream(dummy, key='parameters')
    . dummy is still a task but hasn’t been used. Can you try something like this?
    class Dummy(Task):
        def run(self):
            return dict(x='1')
    
    dummy = Dummy(name = 'dummy')
    
    def registeration():
        with Flow("admin-flow") as flow:
            a = dummy()
            b = flow_run()
            b.set_upstream(a)
        flow.register(project_name='pipeline')
    This is how to use the task in the flow
    j

    jaehoon

    1 year ago
    Oh It works! I got how to use task in flow Today morning is happy thanks to you 😍
    Kevin Kho

    Kevin Kho

    1 year ago
    Happy to hear @jaehoon! 🙂