https://prefect.io logo
Title
d

Divya

07/22/2022, 5:33 PM
Hello, I am trying to schedule metadata ingestion flows for Amundsen using Prefect. The Amundsen code that I am trying to schedule has run() function. So on trying to run the code as part of task is failing. Do you have any suggestions to schedule such type of flows? We are currently using Perfect1.0 Thank you, Divya
1
k

Kevin Kho

07/22/2022, 5:38 PM
Can I see the task definition?
d

Divya

07/22/2022, 7:10 PM
Hi @Kevin Kho, Thanks for getting back. Please find the code below:
@task()
def postgres_task():
    loading_job = run_postgres_job()
    loading_job.launch()

@task()
def es_launch():
    job_es_table = create_es_publisher_sample_job(
        elasticsearch_index_alias='table_search_index',
        elasticsearch_doc_type_key='table',
        model_name='databuilder.models.table_elasticsearch_document.TableESDocument')
    job_es_table.launch()

def perfect_flow():
    with Flow(name = 'sample_postgres_loader') as flow:
        postgres_task()
        es_launch()
    return flow

if __name__ == "__main__":
    flow = perfect_flow()
    flow.run()
    flow.register(project_name="default")
``````
Also the Amundsen code is from the git repo attached. https://github.com/amundsen-io/amundsen
a

Anna Geller

07/23/2022, 1:37 PM
to schedule this flow, you would need to attach a schedule to the flow and register the flow but you may also consider doing this in Prefect 2.0 instead if you're getting started with Prefect now - docs: https://orion-docs.prefect.io/getting-started/overview/