k

    Kieran

    1 year ago
    Has anyone had an issue with Prefect not.
    Prefect automatically gathers mapped results into a list if they are needed by a non-mapped task
    I have two mapped tasks, one feeding the other, then a third non-mapped task which expects a list. The logger context doesn't appear to be getting passed so I cant get any insight. The UI is showing the child-mapped tasks and I have logged out their content but their results are not being gathered together as suggested in the docs. Any pointers to help would be amazing!
    nicholas

    nicholas

    1 year ago
    Hi @Kieran - would you mind providing some minimal code for us to look at?
    k

    Kieran

    1 year ago
    Sure @nicholas Here is an example:
    with Flow(
        name=extract_load_name,
        schedule=CronSchedule("0 8 * * *"),
        storage=default_flow_storage,
        run_config=default_flow_run_config
        ) as flow:
        token = auth()
        graphql_client = graphql_client(token)
        search_query_total = query_total(
            graphql_client,
            customer_search_total,
            customer_search_variables,
            customer_query_header
        )
        query_offsets = offsets(
            search_query_total,
            default_etl_limit_size
            )
        search_queries = generate_graphql_queries.map(
            offset=query_offsets,
            query=unmapped(customer_search_gql),
            chunk_size=unmapped(default_etl_limit_size)
        )
        extracted_data = extract.map(
            query=search_queries,
            graphql_client=unmapped(graphql_client),
            query_variables=unmapped(customer_search_variables),
            query_header=unmapped(customer_query_header)
        )
        deduped_data = dedupe(extracted_data)
    nicholas

    nicholas

    1 year ago
    Thank you @Kieran - could you try importing the logger from context in your task instead of in the flow?
    k

    Kieran

    1 year ago
    So, in this example I have
    extracted_data
    logging it's return result out and that is as I expect. Then the
    dedupe()
    task should receive a nested list results from the child mapped tasks. (if I understand the docs correctly).
    @nicholas so I get the same behaviour after moving my logger. The UI logs show:
    17:32:22
    DEBUG
    CloudTaskRunner
    Task 'extract[3]': Handling state change from Running to Success
    17:32:22
    INFO
    CloudTaskRunner
    Task 'extract[3]': Finished task run for task with final state: 'Success'
    17:32:22
    INFO
    CloudTaskRunner
    Task 'extract[4]': Starting task run...
    17:32:22
    DEBUG
    CloudTaskRunner
    Task 'extract[4]': Handling state change from Pending to Running
    17:32:22
    DEBUG
    CloudTaskRunner
    Task 'extract[4]': Calling task.run() method...
    Then nothing. The extract and dedupe tasks are being marked as successful but do the logs suggest that there is a silent failure somewhere? (as I would expect them to continue logging or show an error)
    (extract[4] has a success state fwiw)
    nicholas

    nicholas

    1 year ago
    Hm, could you show the contents of the
    extract
    and
    dedupe
    tasks?
    k

    Kieran

    1 year ago
    Sure:
    @task
    def extract(query: str, graphql_client: Client, query_variables: dict, query_header: str) -> list:
        logger = prefect.context.get("logger")
        result = graphql_client.execute(
            gql(query.replace('\n','')),
            variable_values=query_variables
        )
        result_items = dict(result)[query_header]['edges']
        listed_items = list(result_items)
        <http://logger.info|logger.info>(listed_items) <-- logging out exactly what I would expect in each of the child tasks
        return listed_items
    @task
    def dedupe(data: list) -> pd.DataFrame:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(data) <-- not showing anything
        flat_list = [item for sublist in data for item in sublist]
        result = [value['node'] for index, value in enumerate(flat_list)]
        deduped = pd.DataFrame(data=result).drop_duplicates(subset=['id'], inplace=True)
        return deduped
    @nicholas
    nicholas

    nicholas

    1 year ago
    Thanks @Kieran - let me look into this a bit, nothing stands out to me immediately
    k

    Kieran

    1 year ago
    Thanks @nicholas If I cant get it working then I will look at squashing my two map's together...
    nicholas

    nicholas

    1 year ago
    Hi @Kieran - I've put together a small example to show how this should work but I haven't figured out what's happening in your flow. I can't tell if your logs are going to the API but if you haven't already make sure to configure your logger as mentioned in these docs (in particular if you're using Server/Cloud you'll want to set
    log_to_cloud=true
    ). Otherwise I've put together a small example of what this mapping pattern should look like:
    import prefect
    from prefect import task, Flow
    
    
    @task
    def create_list():
        logger = prefect.context.get("logger")
    
        arr = [*range(0, 5, 1)]
        <http://logger.info|logger.info>(arr)
        return arr
    
    
    @task
    def log_and_transform_value(value):
        logger = prefect.context.get("logger")
    
        transformed_value = value * 2
        <http://logger.info|logger.info>(transformed_value)
        return transformed_value
    
    
    @task
    def log_values(values):
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(values)
        return values
    
    
    with Flow("Mapping Test") as flow:
        data = create_list()
    
        collect = log_and_transform_value.map(data)
    
        log_values(collect)
    
    
    flow.run()
    which would output something like this:
    [2021-01-29 19:00:00-0500] INFO - prefect.create_list | [0, 1, 2, 3, 4]
    ...
    [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[0] | 0
    [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[1] | 2
    [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[2] | 4
    [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[3] | 6
    [2021-01-29 19:00:01-0500] INFO - prefect.log_and_transform_value[4] | 8
    ...
    [2021-01-29 19:00:01-0500] INFO - prefect.log_values | [0, 2, 4, 6, 8]
    which I think is what you're looking for
    k

    Kieran

    1 year ago
    Thanks @nicholas, yes that is the behaviour I am expecting. I will set that config and see if that impacts my logs. I have my logger set up within my tasks like your example. Fwiw I'm testing this locally with server and local executor but the prod environment is using Cloud.
    @nicholas so I got to the bottom of this. The underlying library
    gql
    that was being used for GraphQL queries was exiting swallowing all logs with it. That was the red herring which masked this problem. Stripping it out for the
    requests
    library has worked here. Thank you for your help!