Hi all, here is what I’m trying to do with 1.0 ver...
# prefect-community
a
Hi all, here is what I’m trying to do with 1.0 version. I have a database table
jobs
like following:
Copy code
id | name                 | query                                | template
-----------------------------------------------------------------------------------------
1  | user_count_query     | select count(*) as cnt from users    | user count is {{cnt}}
2  | projects_count_query | select count(*) as cnt from projects.| project count is {{cnt}}
I would like to 1. Use
fetch_jobs = PostgresFetch
task to get a list of all jobs in the table above 2. Fan out the results of
fetch_jobs
to new task say
run_job_query
which will for each row grab value of
query
column and execute another
PostgresFetch(query=query)
In a pseudo flow it would look something like:
Copy code
fetch = PostgresFetch()

@task
def send_to_slack():
  pass

with prefect.Flow() as flow:
    jobs = fetch(query="select query from jobs"), fetch="many")
    for job in jobs: 
       query_result = fetch(query=job["query"])
       send_to_slack(query_result)
The above code does not obviously work because Tasks are not iterable. Mapping won’t work too, since I cannot call another task in a task Any help appreciated.
k
You can chain together mapped tasks to accomplish fan-out/fan-it. I’m actively using this pattern. This is quick but it looks something like this:
Copy code
fetch = PostgresFetch()

@task
def get_query_results(query):
  query_results = get_cool_results()
  return query_results

@task
def send_to_slack(query_results):
  send_cool_results(query_results)

with prefect.Flow() as flow:
    jobs = fetch(query="select query from jobs"), fetch="many")
    query_results = get_query_results.map(jobs)
    send_to_slack.map(query_results)
upvote 1
a
Yes, this makes sense but what is
get_cool_results
is a task too?
k
nah, in this example it’s just a plain python method.
a
Thats my point
k
I think this will get you what you are wanting, I’m whipping up a little example/demo to show you what it will do. Hopefully it will be clear.
a
Sorry, I realized I wasn’t clear. WHAT IF I need
get_cool_results
to be a task
k
Catching up, here is a working example of a fan-out:
Copy code
import copy
import random
from typing import Any, Dict, List, Tuple
import prefect
from prefect import task, Flow

@task
def fetch_jobs() -> List[Dict[str, Any]]:
    return [
        {"id": 1, "name": "user_count_query", "query": "select count(*) as cnt from users", "template": "user count is {{cnt}}"},
        {"id": 2, "name": "projects_count_query", "query": "select count(*) as cnt from projects", "template": "project count is {{cnt}}"},
    ]

@task
def get_query_results(job: Dict[str, Any]) -> str:
    count = random.randint(1, 100)
    job_results = copy.deepcopy(job)
    job_results["email_template"] = job["template"].replace("{{cnt}}", str(count))
    return job_results

@task
def send_query_results(job: Dict[str, Any]) -> None:
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Sending email for job {job['name']} with template: {job['email_template']}")

with Flow("email-fan-out") as flow:
    jobs = fetch_jobs()
    query_results = get_query_results.map(jobs)
    send_query_results.map(query_results)
Results:
Copy code
└── 17:03:08 | INFO    | Beginning Flow run for 'email-fan-out'
└── 17:03:08 | INFO    | Task 'fetch_jobs': Starting task run...
└── 17:03:09 | INFO    | Task 'fetch_jobs': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO    | Task 'get_query_results': Starting task run...
└── 17:03:09 | INFO    | Task 'get_query_results': Finished task run for task with final state: 'Mapped'
└── 17:03:09 | INFO    | Task 'get_query_results[0]': Starting task run...
└── 17:03:09 | INFO    | Task 'get_query_results[0]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO    | Task 'get_query_results[1]': Starting task run...
└── 17:03:09 | INFO    | Task 'get_query_results[1]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO    | Task 'send_query_results': Starting task run...
└── 17:03:09 | INFO    | Task 'send_query_results': Finished task run for task with final state: 'Mapped'
└── 17:03:09 | INFO    | Task 'send_query_results[0]': Starting task run...
└── 17:03:09 | INFO    | Sending email for job user_count_query with template: user count is 29
└── 17:03:09 | INFO    | Task 'send_query_results[0]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO    | Task 'send_query_results[1]': Starting task run...
└── 17:03:09 | INFO    | Sending email for job projects_count_query with template: project count is 41
└── 17:03:09 | INFO    | Task 'send_query_results[1]': Finished task run for task with final state: 'Success'
└── 17:03:09 | INFO    | Flow run SUCCESS: all reference tasks succeeded
💯 1
As long as your count from input to output of the mapped tasks (in the case the number of jobs) stays the same, you can map the results of mapped tasks to connect them in a functional type of way (if that makes sense)
For your question there, I think I have what you are looking for as well (not tested, but I’ll type it up)
k
Also, just as an aside, while you do lose a vast majority of the “nice” stuff around tasks (part of the dag, retry, etc). You can define a python function with
@task
and within that, call
some_task.run(...)
k
Something like this, although you might want more state for your emailing and stuff, but this is the gist I think:
Copy code
@task
def build_fetch_query(results) -> str:
    return results["query"] # not sure how to get column from PostgresFetch

@task
def send_query_results(results) -> None:
    <http://prefect.context.logger.info|prefect.context.logger.info>(f"Sending email for job for results: {results}")

fetch = PostgresFetch(fetch="many")
with Flow("email-fan-out") as flow:
    jobs = fetch(query="select query from jobs")
    job_queries = build_fetch_query.map(jobs)
    job_query_results = fetch.map(query=job_queries)
    send_query_results.map(job_query_results)
a
thanks111
👍 1