Artem Vysotsky
04/22/2022, 9:35 PMjobs
like following:
id | name | query | template
-----------------------------------------------------------------------------------------
1 | user_count_query | select count(*) as cnt from users | user count is {{cnt}}
2 | projects_count_query | select count(*) as cnt from projects.| project count is {{cnt}}
I would like to
1. Use fetch_jobs = PostgresFetch
task to get a list of all jobs in the table above
2. Fan out the results of fetch_jobs
to new task say run_job_query
which will for each row grab value of query
column and execute another PostgresFetch(query=query)
In a pseudo flow it would look something like:
fetch = PostgresFetch()
@task
def send_to_slack():
pass
with prefect.Flow() as flow:
jobs = fetch(query="select query from jobs"), fetch="many")
for job in jobs:
query_result = fetch(query=job["query"])
send_to_slack(query_result)
The above code does not obviously work because Tasks are not iterable. Mapping won’t work too, since I cannot call another task in a task
Any help appreciated.Kevin Mullins
04/22/2022, 9:47 PMfetch = PostgresFetch()
@task
def get_query_results(query):
query_results = get_cool_results()
return query_results
@task
def send_to_slack(query_results):
send_cool_results(query_results)
with prefect.Flow() as flow:
jobs = fetch(query="select query from jobs"), fetch="many")
query_results = get_query_results.map(jobs)
send_to_slack.map(query_results)
Artem Vysotsky
04/22/2022, 9:49 PMget_cool_results
is a task too?Kevin Mullins
04/22/2022, 9:50 PMArtem Vysotsky
04/22/2022, 9:50 PMKevin Mullins
04/22/2022, 9:55 PMArtem Vysotsky
04/22/2022, 9:58 PMget_cool_results
to be a taskKevin Mullins
04/22/2022, 10:03 PMimport copy
import random
from typing import Any, Dict, List, Tuple
import prefect
from prefect import task, Flow
@task
def fetch_jobs() -> List[Dict[str, Any]]:
return [
{"id": 1, "name": "user_count_query", "query": "select count(*) as cnt from users", "template": "user count is {{cnt}}"},
{"id": 2, "name": "projects_count_query", "query": "select count(*) as cnt from projects", "template": "project count is {{cnt}}"},
]
@task
def get_query_results(job: Dict[str, Any]) -> str:
count = random.randint(1, 100)
job_results = copy.deepcopy(job)
job_results["email_template"] = job["template"].replace("{{cnt}}", str(count))
return job_results
@task
def send_query_results(job: Dict[str, Any]) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Sending email for job {job['name']} with template: {job['email_template']}")
with Flow("email-fan-out") as flow:
jobs = fetch_jobs()
query_results = get_query_results.map(jobs)
send_query_results.map(query_results)
Results:
└── 17:03:08 | INFO | Beginning Flow run for 'email-fan-out'
└── 17:03:08 | INFO | Task 'fetch_jobs': Starting task run...
└── 17:03:09 | INFO | Task 'fetch_jobs': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO | Task 'get_query_results': Starting task run...
└── 17:03:09 | INFO | Task 'get_query_results': Finished task run for task with final state: 'Mapped'
└── 17:03:09 | INFO | Task 'get_query_results[0]': Starting task run...
└── 17:03:09 | INFO | Task 'get_query_results[0]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO | Task 'get_query_results[1]': Starting task run...
└── 17:03:09 | INFO | Task 'get_query_results[1]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO | Task 'send_query_results': Starting task run...
└── 17:03:09 | INFO | Task 'send_query_results': Finished task run for task with final state: 'Mapped'
└── 17:03:09 | INFO | Task 'send_query_results[0]': Starting task run...
└── 17:03:09 | INFO | Sending email for job user_count_query with template: user count is 29
└── 17:03:09 | INFO | Task 'send_query_results[0]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO | Task 'send_query_results[1]': Starting task run...
└── 17:03:09 | INFO | Sending email for job projects_count_query with template: project count is 41
└── 17:03:09 | INFO | Task 'send_query_results[1]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO | Flow run SUCCESS: all reference tasks succeeded
Kyle McChesney
04/22/2022, 10:10 PM@task
and within that, call some_task.run(...)
Kevin Mullins
04/22/2022, 10:17 PM@task
def build_fetch_query(results) -> str:
return results["query"] # not sure how to get column from PostgresFetch
@task
def send_query_results(results) -> None:
<http://prefect.context.logger.info|prefect.context.logger.info>(f"Sending email for job for results: {results}")
fetch = PostgresFetch(fetch="many")
with Flow("email-fan-out") as flow:
jobs = fetch(query="select query from jobs")
job_queries = build_fetch_query.map(jobs)
job_query_results = fetch.map(query=job_queries)
send_query_results.map(job_query_results)
Artem Vysotsky
04/22/2022, 10:30 PM