Andreas Jung
12/14/2020, 11:04 AMimport time
from base64 import b64decode
from prefect import Flow, task
import reportparser
@task
def fetch_message():
ts = time.time()
report_data = b64decode(
b"ew4AAAAAAADmAgAABgAAAQAAH+EH3QAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
b"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIBPwgQAA4AAAAA"
)
print("fetch ", time.time())
print("fetch ", time.time() - ts)
return report_data
@task
def classify_message(msg):
msg_type = 0
ts = time.time()
print("classify", time.time())
print("classify", time.time() - ts)
return msg_type
@task
def parse_message(msg, msg_type):
ts = time.time()
result = reportparser.parse(msg)
print("parse ", time.time())
print("parse ", time.time() - ts)
return result
@task
def deliver_message(result):
ts = time.time()
print("deliver ", time.time())
print("deliver ", time.time() - ts)
def main():
with Flow("reportparser") as flow:
msg = fetch_message()
msg_type = classify_message(msg)
result = parse_message(msg, msg_type)
deliver_message(result)
for i in range(20):
print()
ts = time.time()
flow.run()
print("total", time.time() - ts)
if __name__ == "__main__":
main()
Execution time (absolute time, relative time per task):
fetch 1607943883.0552974
fetch 0.0001933574676513672
classify 1607943883.3690984
classify 0.00015401840209960938
parse 1607943883.6593442
parse 0.0009167194366455078
deliver 1607943883.9757109
deliver 0.0001609325408935547
total 1.3962466716766357
Marwan Sarieddine
12/14/2020, 6:16 PMmerge_dicts
) to be taking up considerable time (at least 1s on my machine as far as I recall)
see the issue I opened in case you want more details https://github.com/PrefectHQ/prefect/issues/2909