https://prefect.io logo
Title
j

Jason Boorn

10/25/2021, 6:46 PM
In a flow, I'm looking to collect the result of several different tasks, and then pump that final array of task results into another task (which basically sends the results out via email). I'm having some struggles with this - are we able to add task results to an array within a flow?
k

Kevin Kho

10/25/2021, 6:49 PM
Hey @Jason Boorn, If the array already exists, you need some kind of intermediate task to do this. You some mean something like this?
with Flow(...) as flow:
     a_list = [1,2,3]
     a = task_a()
     b = task_b()
     collection = (a_list, a,b) # returns list 
     email_task(collection)
j

Jason Boorn

10/25/2021, 6:57 PM
ah ok - lemme try that
can I iterate on that collection like a normal python object in my email_task?
also - I'm getting intermittent errors like this one:
prefect.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': 'Unable to complete operation. An internal API error occurred.', 'extensions': {'code': 'API_ERROR'}}]
k

Kevin Kho

10/25/2021, 7:00 PM
Is this Server or Cloud? You can iterate on a list inside a task yep.
Is your API container lacking resources maybe?
Do you get the error with a smaller amount of mapped tasks?
j

Jason Boorn

10/25/2021, 7:06 PM
I dont have a ton of tasks right now (just 1)
it happens sometimes
when I do it again it seems to work
k

Kevin Kho

10/25/2021, 7:10 PM
Did you use the docker compose with
prefect server start
? Maybe you need to bump up Docker memory?
j

Jason Boorn

10/25/2021, 7:15 PM
I'm using cloud
k

Kevin Kho

10/25/2021, 7:18 PM
Ah ok. Does it still happen and is your Flow simple enough to share? Seems like something may be off in the definition?
j

Jason Boorn

10/25/2021, 7:19 PM
here is what I'm trying to do:
import prefect from prefect import task, Flow, case from prefect.executors import LocalDaskExecutor from prefect.tasks.notifications.email_task import EmailTask import mssql import json @task def generate_mail(results): logger = prefect.context.get("logger") logger.info(results) html = """ <div>Hi</div> <table> """ for result in results: json_result = json.loads(result); html += "<tr><td>" + json_result['name'] + "</td><td>" + json_result['description'] + "</td><td>" + json_result['result'] + "</td></tr>" html += "</table>" return html @task def send_mail(html): logger = prefect.context.get("logger") logger.info(html) email = EmailTask(subject="Alerts for TCA", email_to="jpb@lpcap.com", email_from="notifications@lpcap.com", smtp_server="mail.lpcap.com", smtp_port=25, smtp_type="INSECURE", msg_plain=None, email_to_cc=None, email_to_bcc=None, attachments=None) email.run(msg=html) with Flow('alpha_hurdle', executor=LocalDaskExecutor(scheduler="threads")) as flow: r = [] name = "firstcheck" description = "this is the first check" firstresult = mssql.dataval_exact(name, description, "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01") results = (r, firstresult) html = generate_mail(results) send_mail(html)
I've been working on this for a couple hours now and unable to get it to work
there seems to be something wrong with how the json that gets spat out by dataval_exact is being treated downstream
my use case is super simple - a task spits out a JSON structure which I collect into a list, then I construct an email with the structure
I can't figure out if what comes in to generate_mail is a string or a json structure?
either way seems to fail
k

Kevin Kho

10/25/2021, 7:25 PM
Does mssql.dataval_exact just return JSON? Are there some connections involved there? Or does that method create a connection and close it?
j

Jason Boorn

10/25/2021, 7:25 PM
@task def dataval_exact(name, description, db, query, checkvalue): logger = prefect.context.get("logger") fetchval = SqlServerFetch(db_name=db, user=user, host="gr-sqlutil01").run(query=query, password=password) if (fetchval[0][0] == checkvalue): logger.info("checkvalue is true") return {"name": name, "description": description, "result": True } else: logger.info("checkvalue is false") return {"name": name, "description": description, "result": False }
just returns a json structure
or rather a python dict I guess
k

Kevin Kho

10/25/2021, 7:31 PM
Seems fine. Can you try changing
results = (r, firstresult)
to
results = task(lambda x, y: (x,y))(r, firstresult)
to defer the execution because
firstresult
is a task but
r
is not?
j

Jason Boorn

10/25/2021, 7:32 PM
can I just do:
results = (firstresult)
will that make a collection?
now I'm getting Task 'generate_mail': Finished task run for task with final state: 'TriggerFailed'
when actually, the earlier task completes fine
k

Kevin Kho

10/25/2021, 7:35 PM
If that is the case, you can just first it directly right?
html = generate_mail(first_result)
    send_mail(html)
Ah it completes fine? I thought you get the intermittent error? Is that during registration or runtime?
j

Jason Boorn

10/25/2021, 7:36 PM
so the intermittent error is on registering the task
happens maybe 50% of the time
but I'm less concerned about that one - what I really need to figure out is why I can't just collect task results as json and put them in an email
you know what - I actually think that the registration is a problem. I just changed the code to run mssql twice, and its not doing that
like its not registering new code?
k

Kevin Kho

10/25/2021, 7:44 PM
Sorry I’m pretty confused. The
mssql.dataval_exact
returns something like
{"name": name, "description": description, "result": True }
And then this gets passed on to
generate_mail
? Or is it a list of these dicts? What are you aiming to do with
results = (firstresult)
?
I think the intermittent error you mentioned is an error preventing registration
j

Jason Boorn

10/25/2021, 7:45 PM
what I want to do is run mssql a few times, each time generating a json string
then I want to collect all those strings and put them in an email basically
k

Kevin Kho

10/25/2021, 7:45 PM
Ah ok I fully understand I think. Let me think a bit
👍 1
j

Jason Boorn

10/25/2021, 7:50 PM
Ok - the registration seems to be the issue
When I force register, I consistently get the error:
C:\prefect\Prefect>prefect register --project "Alpha Hurdle" --path C:\prefect\Prefect\alpha_hurdle.py -f Collecting flows... Processing 'C:\\prefect\\Prefect\\alpha_hurdle.py': Building
Local
storage... Registering 'alpha_hurdle'... Error Traceback (most recent call last): File "C:\Python39\lib\site-packages\prefect\cli\build_register.py", line 481, in build_and_register flow_id, flow_version, is_new = register_serialized_flow( File "C:\Python39\lib\site-packages\prefect\cli\build_register.py", line 405, in register_serialized_flow res = client.graphql( File "C:\Python39\lib\site-packages\prefect\client\client.py", line 569, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': 'Unable to complete operation. An internal API error occurred.', 'extensions': {'code': 'API_ERROR'}}]
wonder if its my token
k

Kevin Kho

10/25/2021, 7:55 PM
I don’t think so because the path there is the
create_flow_from_compressed_string
so there is something wrong as it serializes the flow.
j

Jason Boorn

10/25/2021, 7:58 PM
this consistent API error - could it be related to us using a self-signed certificate within our environment?
I've seen us have lots of errors related to that
k

Kevin Kho

10/25/2021, 7:59 PM
Ah even for simpler flows you see this? I asked the team and I’ll tell you when I hear back
j

Jason Boorn

10/25/2021, 8:00 PM
I haven't seen it on this product, but I have seen it when trying to do other things - our original agent is on an EC2 instance and that works great. Now I'm deploying another agent to our internal network and having issues
When I try to connect to other products from inside, I often get an error about CNAME SSL stuff
I can normally suppress it with some switch, but it might be causing some issues - this is the first flow I'm trying to run from inside the network and its not working
Ya the more I think about this the more I think it could be some SSL thing - if there's a way to peek into the error that's happening that would be great
ill ping my guys over here to see if theres some weird config
I realized this is not an SSL issue - there's something wrong with how we're putting together the results then passing that to the other task
when I dont do that everything registers fine
this thread is getting too big - I think we're back where we started. I'm going to open another one
k

Kevin Kho

10/26/2021, 2:32 PM
I did check with the team yesterday, and this error is normally seen when registering really large flows, which is not the case in your flow. I think your flow will lend itself well to a mapping structure but without mapping, I think you need an intermediate task to construct that list like this:
from prefect import task, Flow
import prefect

@task
def r_append(r, result):
    r.append(result)
    return r

@task
def log_task(x):
    prefect.context.get("logger").info(x)

with Flow('alpha_hurdle') as flow:
    r = []
    result = task(lambda: {"a":1})()
    result2 = (result)
    results = r_append(r, result)
    results = r_append(results, result2)
    log_task.map(results)
j

Jason Boorn

10/26/2021, 2:36 PM
ah ok - thanks I'll give that a try
so you know what? I dont think our issue is with the list
I changed the code to not use lists and it still fails:
import prefect
from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor
from prefect.tasks.notifications.email_task import EmailTask
import json
# TODO: make secrets
user = "xxxx"
host = "xxxx"
password = "xxxx"
@task
def dataval_exact(name, description, db, query, checkvalue):
logger = prefect.context.get("logger")
fetchval = SqlServerFetch(db_name=db, user=user, host=host).run(query=query, password=password)
if (fetchval[0][0] == checkvalue):
<http://logger.info|logger.info>("checkvalue is true")
return {"name": name, "description": description, "result": True }
else:
<http://logger.info|logger.info>("checkvalue is false")
return {"name": name, "description": description, "result": False }
@task
def generate_mail(firstresult, secondresult):
html = "asdf"
return html
with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow:
firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01")
secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22")
html = generate_mail(firstresult, secondresult)
if I comment out the last line it will register
I think it's having a problem with how dataval_exact is constructed or what its returning?
k

Kevin Kho

10/26/2021, 2:52 PM
Thanks for the minimal example. I did check the task definition and it does seem to close the connection. Can we check if it is related to
SqlServerFetch
by commenting that out and just returning a dict?
@task
def dataval_exact(name, description, db, query, checkvalue):
return {"name": name, "description": description, "result": False }
Removing SqlServerFetch works for me so I am wondering what’s up there but I honestly don’t see anything because the connection is closed.
Actually I was able to register that code snippet you gave me also assuming SqlServerFetch is just imported from our task library
j

Jason Boorn

10/26/2021, 4:01 PM
ya I missed the import
but you could register that code?
I can't
k

Kevin Kho

10/26/2021, 4:02 PM
Yeah I was able to. What Prefect version are you on?
j

Jason Boorn

10/26/2021, 4:07 PM
how do I get that?
I got it: 0.15.7
I slimmed down my job to a very simple one - still not registering
import prefect
from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor
from prefect.tasks.sql_server.sql_server import SqlServerFetch
from prefect.tasks.notifications.email_task import EmailTask


@task
def dataval_exact(name, description, db, query, checkvalue):
    return {"name": name, "description": description, "result": False }


@task
def generate_mail(firstresult, secondresult):
    html = "asdf"
    return html


with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow:
    firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01")    
    secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22")
    html = generate_mail(firstresult, secondresult)
is it possible that my install is broken?
im running python 3.9
k

Kevin Kho

10/26/2021, 4:13 PM
Python 3.9 normally works, though not officially supported. Maybe you can try Python 3.8 and a fresh install?
g

George Coyne

10/26/2021, 4:36 PM
@Kevin Kho were you able to register this? I’m having a rough time getting pyodbc installed on my m1
k

Kevin Kho

10/26/2021, 4:38 PM
conda install pyodbc and yes I was able to
j

Jason Boorn

10/26/2021, 4:41 PM
honestly, you could take out all the dependent tasks
import prefect from prefect import task, Flow, case from prefect.executors import LocalDaskExecutor @task def dataval_exact(name, description, db, query, checkvalue): return {"name": name, "description": description, "result": False } @task def generate_mail(firstresult, secondresult): html = "asdf" return html with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow: firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01") secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22") #html = generate_mail(firstresult, secondresult)
so you dont need to install pyodbc
this is definitely the line that bonks it for me:
html = generate_mail(firstresult, secondresult)
k

Kevin Kho

10/26/2021, 4:46 PM
Yeah this exact code works for me. Very confused why. Maybe you can try with a new environment?
j

Jason Boorn

10/26/2021, 4:47 PM
ya I'll give that a shot
g

George Coyne

10/26/2021, 4:47 PM
Did we roll back to 3.8?
j

Jason Boorn

10/26/2021, 4:48 PM
I'm going to try that
I need my SA to do that so it might take some time
if it doesnt work I'll ping you guys
thanks for helping me on this 🙂
👍 1
no dice
I downgraded to python 3.8 and I'm getting the same error
k

Kevin Kho

10/26/2021, 7:04 PM
Ok I’ll ask the team again. I personally don’t have any ideas.
j

Jason Boorn

10/26/2021, 7:04 PM
big picture - we're using prefect for our alternative data ingestion and I was hoping to roll it out to the company for other things
if we can't get this then I'll have to use something else 😞
k

Kevin Kho

10/26/2021, 7:05 PM
Definitely understand.