Jason Boorn
10/25/2021, 6:46 PMKevin Kho
10/25/2021, 6:49 PMwith Flow(...) as flow:
a_list = [1,2,3]
a = task_a()
b = task_b()
collection = (a_list, a,b) # returns list
email_task(collection)
Jason Boorn
10/25/2021, 6:57 PMKevin Kho
10/25/2021, 7:00 PMJason Boorn
10/25/2021, 7:06 PMKevin Kho
10/25/2021, 7:10 PMprefect server start
? Maybe you need to bump up Docker memory?Jason Boorn
10/25/2021, 7:15 PMKevin Kho
10/25/2021, 7:18 PMJason Boorn
10/25/2021, 7:19 PMKevin Kho
10/25/2021, 7:25 PMJason Boorn
10/25/2021, 7:25 PMKevin Kho
10/25/2021, 7:31 PMresults = (r, firstresult)
to results = task(lambda x, y: (x,y))(r, firstresult)
to defer the execution because firstresult
is a task but r
is not?Jason Boorn
10/25/2021, 7:32 PMKevin Kho
10/25/2021, 7:35 PMhtml = generate_mail(first_result)
send_mail(html)
Jason Boorn
10/25/2021, 7:36 PMKevin Kho
10/25/2021, 7:44 PMmssql.dataval_exact
returns something like
{"name": name, "description": description, "result": True }
And then this gets passed on to generate_mail
? Or is it a list of these dicts? What are you aiming to do with results = (firstresult)
?Jason Boorn
10/25/2021, 7:45 PMKevin Kho
10/25/2021, 7:45 PMJason Boorn
10/25/2021, 7:50 PMLocal
storage...
Registering 'alpha_hurdle'... Error
Traceback (most recent call last):
File "C:\Python39\lib\site-packages\prefect\cli\build_register.py", line 481, in build_and_register
flow_id, flow_version, is_new = register_serialized_flow(
File "C:\Python39\lib\site-packages\prefect\cli\build_register.py", line 405, in register_serialized_flow
res = client.graphql(
File "C:\Python39\lib\site-packages\prefect\client\client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['create_flow_from_compressed_string'], 'message': 'Unable to complete operation. An internal API error occurred.', 'extensions': {'code': 'API_ERROR'}}]Kevin Kho
10/25/2021, 7:55 PMcreate_flow_from_compressed_string
so there is something wrong as it serializes the flow.Jason Boorn
10/25/2021, 7:58 PMKevin Kho
10/25/2021, 7:59 PMJason Boorn
10/25/2021, 8:00 PMKevin Kho
10/26/2021, 2:32 PMfrom prefect import task, Flow
import prefect
@task
def r_append(r, result):
r.append(result)
return r
@task
def log_task(x):
prefect.context.get("logger").info(x)
with Flow('alpha_hurdle') as flow:
r = []
result = task(lambda: {"a":1})()
result2 = (result)
results = r_append(r, result)
results = r_append(results, result2)
log_task.map(results)
Jason Boorn
10/26/2021, 2:36 PMimport prefect
from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor
from prefect.tasks.notifications.email_task import EmailTask
import json
# TODO: make secrets
user = "xxxx"
host = "xxxx"
password = "xxxx"
@task
def dataval_exact(name, description, db, query, checkvalue):
logger = prefect.context.get("logger")
fetchval = SqlServerFetch(db_name=db, user=user, host=host).run(query=query, password=password)
if (fetchval[0][0] == checkvalue):
<http://logger.info|logger.info>("checkvalue is true")
return {"name": name, "description": description, "result": True }
else:
<http://logger.info|logger.info>("checkvalue is false")
return {"name": name, "description": description, "result": False }
@task
def generate_mail(firstresult, secondresult):
html = "asdf"
return html
with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow:
firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01")
secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22")
html = generate_mail(firstresult, secondresult)
Kevin Kho
10/26/2021, 2:52 PMSqlServerFetch
by commenting that out and just returning a dict?
@task
def dataval_exact(name, description, db, query, checkvalue):
return {"name": name, "description": description, "result": False }
Jason Boorn
10/26/2021, 4:01 PMKevin Kho
10/26/2021, 4:02 PMJason Boorn
10/26/2021, 4:07 PMimport prefect
from prefect import task, Flow, case
from prefect.executors import LocalDaskExecutor
from prefect.tasks.sql_server.sql_server import SqlServerFetch
from prefect.tasks.notifications.email_task import EmailTask
@task
def dataval_exact(name, description, db, query, checkvalue):
return {"name": name, "description": description, "result": False }
@task
def generate_mail(firstresult, secondresult):
html = "asdf"
return html
with Flow('alpha_hurdle', executor=LocalDaskExecutor()) as flow:
firstresult = dataval_exact("firstcheck", "this is the first check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-01-01")
secondresult = dataval_exact("secondcheck", "this is the second check", "TradeDesk", "select convert(varchar, max(end_date_time), 23) as dbresult from tca_by_orderrefid_final", "2021-10-22")
html = generate_mail(firstresult, secondresult)
Kevin Kho
10/26/2021, 4:13 PMGeorge Coyne
10/26/2021, 4:36 PMKevin Kho
10/26/2021, 4:38 PMJason Boorn
10/26/2021, 4:41 PMKevin Kho
10/26/2021, 4:46 PMJason Boorn
10/26/2021, 4:47 PMGeorge Coyne
10/26/2021, 4:47 PMJason Boorn
10/26/2021, 4:48 PMKevin Kho
10/26/2021, 7:04 PMJason Boorn
10/26/2021, 7:04 PMKevin Kho
10/26/2021, 7:05 PM