John Ramirez
05/28/2021, 1:17 PMKevin Kho
time.sleep()
, but maybe you should set an upstream dependency for the tasks?John Ramirez
05/28/2021, 3:08 PMKevin Kho
John Ramirez
05/28/2021, 5:00 PMKevin Kho
John Ramirez
05/28/2021, 5:02 PMKevin Kho
John Ramirez
05/28/2021, 5:07 PMKevin Kho
John Ramirez
05/28/2021, 5:09 PM@task(name='Send Transactions', log_stdout=True)
def run_api_call(api_params: dict) -> list:
url = '{{ URL }}'
template_file_path = jinja2.FileSystemLoader('./resources')
jinja_env = jinja2.Environment(loader=template_file_path)
j_template = jinja_env.get_template('delivery-transaction.j2')
j_template.globals['start_date'] = datetime.now(timezone.utc).astimezone().isoformat()
j_template.globals['end_date'] = datetime.now(timezone.utc).astimezone().isoformat()
j_template.globals['trx_number'] = ''.join(random.choices(string.digits, k=4))
j_template.globals['invoice_number'] = ''.join(random.choices(string.digits, k=19))
trace_id = ''.join(random.choices(string.digits, k=6))
payload = j_template.render(api_params)
headers = {'Content-Type': 'application/json'}
requests.request(
method='POST',
url=url.format(trace_id),
headers=headers,
data=payload
)
return [api_params['runType'], trace_id, api_params['searchTerm'], datetime.utcnow()]
@task(name='Wait for Logs', log_stdout=True)
def task_delay():
time.sleep(30)
@task(name='Parse Datadog Logs', log_stdout=True)
def search_logs(search_params: list) -> dict:
# time.sleep(35)
configuration = Configuration()
m = re.compile('Transaction with trace ID {0} sent to {1}'.format(
search_params[1],
search_params[2]
))
with ApiClient(configuration) as api_client:
api_instance = logs_api.LogsApi(api_client)
body = LogsListRequest(
filter=LogsQueryFilter(
_from=search_params[3].strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
query='service:delivery-transaction-service AND env:dev',
to=(search_params[3] + timedelta(minutes=1)).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
),
options=LogsQueryOptions(
time_offset=4,
timezone='UTC',
),
sort=LogsSort('timestamp'),
)
api_response = api_instance.list_logs(body=body)
for i in api_response.data:
val = i['attributes']['message']
if not m.search(val):
continue
else:
return {search_params[0]: 'SUCCESS'}
return {search_params[0]: 'FAILURE'}
John Ramirez
05/28/2021, 5:10 PMwith Flow('aap-deliver-transaction') as flow:
params = get_api_params()
trace_data = run_api_call.map(api_params=params)
results = search_logs.map(
search_params=trace_data,
upstream_tasks=[
task_delay(upstream_tasks=[trace_data]),
]
)
collect_results(result=results)
flow.run()
Kevin Kho
trace_data = run_api_call.map(_api_params_=params)
, params is a list of [api_params['runType'], trace_id, api_params['searchTerm'], datetime.utcnow()]
. Do you want run_api_call
to receive the full list? or are you running run_api_call for each item in this list?John Ramirez
05/28/2021, 5:17 PMKevin Kho