In a flow, I'm looking to collect the result of se...
# prefect-server
j
In a flow, I'm looking to collect the result of several different tasks, and then pump that final array of task results into another task (which basically sends the results out via email). I'm having some struggles with this - are we able to add task results to an array within a flow?
k
Hey @Jason Boorn, If the array already exists, you need some kind of intermediate task to do this. You some mean something like this?
Copy code
with Flow(...) as flow:
     a_list = [1,2,3]
     a = task_a()
     b = task_b()
     collection = (a_list, a,b) # returns list 
     email_task(collection)
j
ah ok - lemme try that
can I iterate on that collection like a normal python object in my email_task?
also - I'm getting intermittent errors like this one:
prefect.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': 'Unable to complete operation. An internal API error occurred.', 'extensions': {'code': 'API_ERROR'}}]
k
Is this Server or Cloud? You can iterate on a list inside a task yep.
Is your API container lacking resources maybe?
Do you get the error with a smaller amount of mapped tasks?
j
I dont have a ton of tasks right now (just 1)
it happens sometimes
when I do it again it seems to work
k
Did you use the docker compose with
prefect server start
? Maybe you need to bump up Docker memory?
j
I'm using cloud
k
Ah ok. Does it still happen and is your Flow simple enough to share? Seems like something may be off in the definition?
j
here is what I'm trying to do:
import prefect from prefect import task, Flow, case from prefect.executors import LocalDaskExecutor from prefect.tasks.notifications.email_task import EmailTask import mssql import json @task def generate_mail(results): logger = prefect.context.get("logger") logger.info(results) html = """ <div>Hi</div> <table> """ for result in results: json_result = json.loads(result); html += "<tr><td>" + json_result['name'] + "</td><td>" + json_result['description'] + "</td><td>" + json_result['result'] + "</td></tr>" html += "</table>" return html @task def send_mail(html): logger = prefect.context.get("logger") logger.info(html) email = EmailTask(subject="Alerts for TCA", email_to="jpb@lpcap.com", email_from="notifications@lpcap.com", smtp_server="mail.lpcap.com", smtp_port=25, smtp_type="INSECURE", msg_plain=None, email_to_cc=None, email_to_bcc=None, attachments=None) email.run(msg=html) with Flow('alpha_hurdle', executor=LocalDaskExecutor(scheduler="threads")) as flow: r = [] name = "firstcheck" description = "this is the first check" firstresult = mssql.dataval_exact(name, description, "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01") results = (r, firstresult) html = generate_mail(results) send_mail(html)
I've been working on this for a couple hours now and unable to get it to work
there seems to be something wrong with how the json that gets spat out by dataval_exact is being treated downstream
my use case is super simple - a task spits out a JSON structure which I collect into a list, then I construct an email with the structure
I can't figure out if what comes in to generate_mail is a string or a json structure?
either way seems to fail
k
Does mssql.dataval_exact just return JSON? Are there some connections involved there? Or does that method create a connection and close it?
j
@task def dataval_exact(name, description, db, query, checkvalue): logger = prefect.context.get("logger") fetchval = SqlServerFetch(db_name=db, user=user, host="gr-sqlutil01").run(query=query, password=password) if (fetchval[0][0] == checkvalue): logger.info("checkvalue is true") return {"name": name, "description": description, "result": True } else: logger.info("checkvalue is false") return {"name": name, "description": description, "result": False }
just returns a json structure
or rather a python dict I guess
k
Seems fine. Can you try changing
results = (r, firstresult)
to
results = task(lambda x, y: (x,y))(r, firstresult)
to defer the execution because
firstresult
is a task but
r
is not?
j
can I just do:
results = (firstresult)
will that make a collection?
now I'm getting Task 'generate_mail': Finished task run for task with final state: 'TriggerFailed'
when actually, the earlier task completes fine
k
If that is the case, you can just first it directly right?
Copy code
html = generate_mail(first_result)
    send_mail(html)
Ah it completes fine? I thought you get the intermittent error? Is that during registration or runtime?
j
so the intermittent error is on registering the task
happens maybe 50% of the time
but I'm less concerned about that one - what I really need to figure out is why I can't just collect task results as json and put them in an email
you know what - I actually think that the registration is a problem. I just changed the code to run mssql twice, and its not doing that
like its not registering new code?
k
Sorry I’m pretty confused. The
mssql.dataval_exact
returns something like
{"name": name, "description": description, "result": True }
And then this gets passed on to
generate_mail
? Or is it a list of these dicts? What are you aiming to do with
results = (firstresult)
?
I think the intermittent error you mentioned is an error preventing registration
j
what I want to do is run mssql a few times, each time generating a json string
then I want to collect all those strings and put them in an email basically
k
Ah ok I fully understand I think. Let me think a bit
👍 1
j
Ok - the registration seems to be the issue
When I force register, I consistently get the error:
C:\prefect\Prefect>prefect register --project "Alpha Hurdle" --path C:\prefect\Prefect\alpha_hurdle.py -f Collecting flows... Processing 'C:\\prefect\\Prefect\\alpha_hurdle.py': Building
Local
storage... Registering 'alpha_hurdle'... Error Traceback (most recent call last): File "C:\Python39\lib\site-packages\prefect\cli\build_register.py", line 481, in build_and_register flow_id, flow_version, is_new = register_serialized_flow( File "C:\Python39\lib\site-packages\prefect\cli\build_register.py", line 405, in register_serialized_flow res = client.graphql( File "C:\Python39\lib\site-packages\prefect\client\client.py", line 569, in graphql raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': 'Unable to complete operation. An internal API error occurred.', 'extensions': {'code': 'API_ERROR'}}]
wonder if its my token
k
I don’t think so because the path there is the
create_flow_from_compressed_string
so there is something wrong as it serializes the flow.
j
this consistent API error - could it be related to us using a self-signed certificate within our environment?
I've seen us have lots of errors related to that
k
Ah even for simpler flows you see this? I asked the team and I’ll tell you when I hear back
j
I haven't seen it on this product, but I have seen it when trying to do other things - our original agent is on an EC2 instance and that works great. Now I'm deploying another agent to our internal network and having issues
When I try to connect to other products from inside, I often get an error about CNAME SSL stuff
I can normally suppress it with some switch, but it might be causing some issues - this is the first flow I'm trying to run from inside the network and its not working
Ya the more I think about this the more I think it could be some SSL thing - if there's a way to peek into the error that's happening that would be great
ill ping my guys over here to see if theres some weird config
I realized this is not an SSL issue - there's something wrong with how we're putting together the results then passing that to the other task
when I dont do that everything registers fine
this thread is getting too big - I think we're back where we started. I'm going to open another one
k
I did check with the team yesterday, and this error is normally seen when registering really large flows, which is not the case in your flow. I think your flow will lend itself well to a mapping structure but without mapping, I think you need an intermediate task to construct that list like this:
Copy code
from prefect import task, Flow
import prefect

@task
def r_append(r, result):
    r.append(result)
    return r

@task
def log_task(x):
    prefect.context.get("logger").info(x)

with Flow('alpha_hurdle') as flow:
    r = []
    result = task(lambda: {"a":1})()
    result2 = (result)
    results = r_append(r, result)
    results = r_append(results, result2)
    log_task.map(results)
j
ah ok - thanks I'll give that a try
so you know what? I dont think our issue is with the list
I changed the code to not use lists and it still fails:
import prefect
from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor
from prefect.tasks.notifications.email_task import EmailTask
import json
# TODO: make secrets
user = "xxxx"
host = "xxxx"
password = "xxxx"
@task
def dataval_exact(name, description, db, query, checkvalue):
logger = prefect.context.get("logger")
fetchval = SqlServerFetch(db_name=db, user=user, host=host).run(query=query, password=password)
if (fetchval[0][0] == checkvalue):
<http://logger.info|logger.info>("checkvalue is true")
return {"name": name, "description": description, "result": True }
else:
<http://logger.info|logger.info>("checkvalue is false")
return {"name": name, "description": description, "result": False }
@task
def generate_mail(firstresult, secondresult):
html = "asdf"
return html
with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow:
firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01")
secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22")
html = generate_mail(firstresult, secondresult)
if I comment out the last line it will register
I think it's having a problem with how dataval_exact is constructed or what its returning?
k
Thanks for the minimal example. I did check the task definition and it does seem to close the connection. Can we check if it is related to
SqlServerFetch
by commenting that out and just returning a dict?
@task
def dataval_exact(name, description, db, query, checkvalue):
return {"name": name, "description": description, "result": False }
Removing SqlServerFetch works for me so I am wondering what’s up there but I honestly don’t see anything because the connection is closed.
Actually I was able to register that code snippet you gave me also assuming SqlServerFetch is just imported from our task library
j
ya I missed the import
but you could register that code?
I can't
k
Yeah I was able to. What Prefect version are you on?
j
how do I get that?
I got it: 0.15.7
I slimmed down my job to a very simple one - still not registering
Copy code
import prefect
from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor
from prefect.tasks.sql_server.sql_server import SqlServerFetch
from prefect.tasks.notifications.email_task import EmailTask


@task
def dataval_exact(name, description, db, query, checkvalue):
    return {"name": name, "description": description, "result": False }


@task
def generate_mail(firstresult, secondresult):
    html = "asdf"
    return html


with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow:
    firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01")    
    secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22")
    html = generate_mail(firstresult, secondresult)
is it possible that my install is broken?
im running python 3.9
k
Python 3.9 normally works, though not officially supported. Maybe you can try Python 3.8 and a fresh install?
g
@Kevin Kho were you able to register this? I’m having a rough time getting pyodbc installed on my m1
k
conda install pyodbc and yes I was able to
j
honestly, you could take out all the dependent tasks
import prefect from prefect import task, Flow, case from prefect.executors import LocalDaskExecutor @task def dataval_exact(name, description, db, query, checkvalue): return {"name": name, "description": description, "result": False } @task def generate_mail(firstresult, secondresult): html = "asdf" return html with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow: firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01") secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22") #html = generate_mail(firstresult, secondresult)
so you dont need to install pyodbc
this is definitely the line that bonks it for me:
html = generate_mail(firstresult, secondresult)
k
Yeah this exact code works for me. Very confused why. Maybe you can try with a new environment?
j
ya I'll give that a shot
g
Did we roll back to 3.8?
j
I'm going to try that
I need my SA to do that so it might take some time
if it doesnt work I'll ping you guys
thanks for helping me on this 🙂
👍 1
no dice
I downgraded to python 3.8 and I'm getting the same error
k
Ok I’ll ask the team again. I personally don’t have any ideas.
j
big picture - we're using prefect for our alternative data ingestion and I was hoping to roll it out to the company for other things
if we can't get this then I'll have to use something else 😞
k
Definitely understand.