Hey everyone! Is there an easy way to delay the ex...
# ask-community
j
Hey everyone! Is there an easy way to delay the execution of a task. Im sending POST calls with a task but then I need to wait 30 sec because parsing the logs for data validation. is there an easy way to do this
k
Hey @John Ramirez, I think the easiest way is to just use
time.sleep()
, but maybe you should set an upstream dependency for the tasks?
j
the problen i see is that passing parameters downstream
k
Sorry, I’m not following fully here. Do you think you could give me a small code example? Or could you explain the passing parameters downstream a bit more?
j
so the sequences is 1. Post API call 2. Wait 30 secs 3. Parse Logs (using results from step 1) what I find is that the results from step 1 are lost
k
You’re saying the Post call gets the data and loses it within the Prefect flow?
j
yes
k
Can I see your code?
j
which part
k
The part where you do the post API call and pass it to number 3
j
Copy code
@task(name='Send Transactions', log_stdout=True)
def run_api_call(api_params: dict) -> list:
    url = '{{ URL }}'
    template_file_path = jinja2.FileSystemLoader('./resources')
    jinja_env = jinja2.Environment(loader=template_file_path)
    j_template = jinja_env.get_template('delivery-transaction.j2')

    j_template.globals['start_date'] = datetime.now(timezone.utc).astimezone().isoformat()
    j_template.globals['end_date'] = datetime.now(timezone.utc).astimezone().isoformat()
    j_template.globals['trx_number'] = ''.join(random.choices(string.digits, k=4))
    j_template.globals['invoice_number'] = ''.join(random.choices(string.digits, k=19))

    trace_id = ''.join(random.choices(string.digits, k=6))
    payload = j_template.render(api_params)
    headers = {'Content-Type': 'application/json'}
    requests.request(
        method='POST',
        url=url.format(trace_id),
        headers=headers,
        data=payload
    )
    return [api_params['runType'], trace_id, api_params['searchTerm'], datetime.utcnow()]


@task(name='Wait for Logs', log_stdout=True)
def task_delay():
    time.sleep(30)


@task(name='Parse Datadog Logs', log_stdout=True)
def search_logs(search_params: list) -> dict:
    # time.sleep(35)
    configuration = Configuration()
    m = re.compile('Transaction with trace ID {0} sent to {1}'.format(
        search_params[1],
        search_params[2]
    ))

    with ApiClient(configuration) as api_client:
        api_instance = logs_api.LogsApi(api_client)
        body = LogsListRequest(
            filter=LogsQueryFilter(
                _from=search_params[3].strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
                query='service:delivery-transaction-service AND env:dev',
                to=(search_params[3] + timedelta(minutes=1)).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
            ),
            options=LogsQueryOptions(
                time_offset=4,
                timezone='UTC',
            ),
            sort=LogsSort('timestamp'),
        )
        api_response = api_instance.list_logs(body=body)

        for i in api_response.data:
            val = i['attributes']['message']
            if not m.search(val):
                continue
            else:
                return {search_params[0]: 'SUCCESS'}

        return {search_params[0]: 'FAILURE'}
Copy code
with Flow('aap-deliver-transaction') as flow:
    params = get_api_params()
    trace_data = run_api_call.map(api_params=params)
    results = search_logs.map(
        search_params=trace_data,
        upstream_tasks=[
            task_delay(upstream_tasks=[trace_data]),
        ]
    )
    collect_results(result=results)

flow.run()
k
When do you this:
trace_data = run_api_call.map(_api_params_=params)
, params is a list of
[api_params['runType'], trace_id, api_params['searchTerm'], datetime.utcnow()]
. Do you want
run_api_call
to receive the full list? or are you running run_api_call for each item in this list?
j
i found the solution. I reduce the map output fro api_call in the task_delay function
k
Ah I see. Nice job!
214 Views