j

    John Ramirez

    1 year ago
    Hey everyone! Is there an easy way to delay the execution of a task. Im sending POST calls with a task but then I need to wait 30 sec because parsing the logs for data validation. is there an easy way to do this
    Kevin Kho

    Kevin Kho

    1 year ago
    Hey @John Ramirez, I think the easiest way is to just use
    time.sleep()
    , but maybe you should set an upstream dependency for the tasks?
    j

    John Ramirez

    1 year ago
    the problen i see is that passing parameters downstream
    Kevin Kho

    Kevin Kho

    1 year ago
    Sorry, I’m not following fully here. Do you think you could give me a small code example? Or could you explain the passing parameters downstream a bit more?
    j

    John Ramirez

    1 year ago
    so the sequences is1. Post API call 2. Wait 30 secs 3. Parse Logs (using results from step 1) what I find is that the results from step 1 are lost
    Kevin Kho

    Kevin Kho

    1 year ago
    You’re saying the Post call gets the data and loses it within the Prefect flow?
    j

    John Ramirez

    1 year ago
    yes
    Kevin Kho

    Kevin Kho

    1 year ago
    Can I see your code?
    j

    John Ramirez

    1 year ago
    which part
    Kevin Kho

    Kevin Kho

    1 year ago
    The part where you do the post API call and pass it to number 3
    j

    John Ramirez

    1 year ago
    @task(name='Send Transactions', log_stdout=True)
    def run_api_call(api_params: dict) -> list:
        url = '{{ URL }}'
        template_file_path = jinja2.FileSystemLoader('./resources')
        jinja_env = jinja2.Environment(loader=template_file_path)
        j_template = jinja_env.get_template('delivery-transaction.j2')
    
        j_template.globals['start_date'] = datetime.now(timezone.utc).astimezone().isoformat()
        j_template.globals['end_date'] = datetime.now(timezone.utc).astimezone().isoformat()
        j_template.globals['trx_number'] = ''.join(random.choices(string.digits, k=4))
        j_template.globals['invoice_number'] = ''.join(random.choices(string.digits, k=19))
    
        trace_id = ''.join(random.choices(string.digits, k=6))
        payload = j_template.render(api_params)
        headers = {'Content-Type': 'application/json'}
        requests.request(
            method='POST',
            url=url.format(trace_id),
            headers=headers,
            data=payload
        )
        return [api_params['runType'], trace_id, api_params['searchTerm'], datetime.utcnow()]
    
    
    @task(name='Wait for Logs', log_stdout=True)
    def task_delay():
        time.sleep(30)
    
    
    @task(name='Parse Datadog Logs', log_stdout=True)
    def search_logs(search_params: list) -> dict:
        # time.sleep(35)
        configuration = Configuration()
        m = re.compile('Transaction with trace ID {0} sent to {1}'.format(
            search_params[1],
            search_params[2]
        ))
    
        with ApiClient(configuration) as api_client:
            api_instance = logs_api.LogsApi(api_client)
            body = LogsListRequest(
                filter=LogsQueryFilter(
                    _from=search_params[3].strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
                    query='service:delivery-transaction-service AND env:dev',
                    to=(search_params[3] + timedelta(minutes=1)).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
                ),
                options=LogsQueryOptions(
                    time_offset=4,
                    timezone='UTC',
                ),
                sort=LogsSort('timestamp'),
            )
            api_response = api_instance.list_logs(body=body)
    
            for i in api_response.data:
                val = i['attributes']['message']
                if not m.search(val):
                    continue
                else:
                    return {search_params[0]: 'SUCCESS'}
    
            return {search_params[0]: 'FAILURE'}
    with Flow('aap-deliver-transaction') as flow:
        params = get_api_params()
        trace_data = run_api_call.map(api_params=params)
        results = search_logs.map(
            search_params=trace_data,
            upstream_tasks=[
                task_delay(upstream_tasks=[trace_data]),
            ]
        )
        collect_results(result=results)
    
    flow.run()
    Kevin Kho

    Kevin Kho

    1 year ago
    When do you this:
    trace_data = run_api_call.map(_api_params_=params)
    , params is a list of
    [api_params['runType'], trace_id, api_params['searchTerm'], datetime.utcnow()]
    . Do you want
    run_api_call
    to receive the full list? or are you running run_api_call for each item in this list?
    j

    John Ramirez

    1 year ago
    i found the solution. I reduce the map output fro api_call in the task_delay function
    Kevin Kho

    Kevin Kho

    1 year ago
    Ah I see. Nice job!