https://prefect.io logo
k

Kieran

01/29/2021, 5:24 PM
Has anyone had an issue with Prefect not.
Copy code
Prefect automatically gathers mapped results into a list if they are needed by a non-mapped task
I have two mapped tasks, one feeding the other, then a third non-mapped task which expects a list. The logger context doesn't appear to be getting passed so I cant get any insight. The UI is showing the child-mapped tasks and I have logged out their content but their results are not being gathered together as suggested in the docs. Any pointers to help would be amazing!
n

nicholas

01/29/2021, 5:25 PM
Hi @Kieran - would you mind providing some minimal code for us to look at?
k

Kieran

01/29/2021, 5:26 PM
Sure @nicholas Here is an example:
Copy code
with Flow(
    name=extract_load_name,
    schedule=CronSchedule("0 8 * * *"),
    storage=default_flow_storage,
    run_config=default_flow_run_config
    ) as flow:
    token = auth()
    graphql_client = graphql_client(token)
    search_query_total = query_total(
        graphql_client,
        customer_search_total,
        customer_search_variables,
        customer_query_header
    )
    query_offsets = offsets(
        search_query_total,
        default_etl_limit_size
        )
    search_queries = generate_graphql_queries.map(
        offset=query_offsets,
        query=unmapped(customer_search_gql),
        chunk_size=unmapped(default_etl_limit_size)
    )
    extracted_data = extract.map(
        query=search_queries,
        graphql_client=unmapped(graphql_client),
        query_variables=unmapped(customer_search_variables),
        query_header=unmapped(customer_query_header)
    )
    deduped_data = dedupe(extracted_data)
n

nicholas

01/29/2021, 5:29 PM
Thank you @Kieran - could you try importing the logger from context in your task instead of in the flow?
👀 1
k

Kieran

01/29/2021, 5:29 PM
So, in this example I have
extracted_data
logging it's return result out and that is as I expect. Then the
dedupe()
task should receive a nested list results from the child mapped tasks. (if I understand the docs correctly).
@nicholas so I get the same behaviour after moving my logger. The UI logs show:
Copy code
17:32:22
DEBUG
CloudTaskRunner
Task 'extract[3]': Handling state change from Running to Success
17:32:22
INFO
CloudTaskRunner
Task 'extract[3]': Finished task run for task with final state: 'Success'
17:32:22
INFO
CloudTaskRunner
Task 'extract[4]': Starting task run...
17:32:22
DEBUG
CloudTaskRunner
Task 'extract[4]': Handling state change from Pending to Running
17:32:22
DEBUG
CloudTaskRunner
Task 'extract[4]': Calling task.run() method...
Then nothing. The extract and dedupe tasks are being marked as successful but do the logs suggest that there is a silent failure somewhere? (as I would expect them to continue logging or show an error)
(extract[4] has a success state fwiw)
n

nicholas

01/29/2021, 5:59 PM
Hm, could you show the contents of the
extract
and
dedupe
tasks?
k

Kieran

01/29/2021, 6:09 PM
Sure:
Copy code
@task
def extract(query: str, graphql_client: Client, query_variables: dict, query_header: str) -> list:
    logger = prefect.context.get("logger")
    result = graphql_client.execute(
        gql(query.replace('\n','')),
        variable_values=query_variables
    )
    result_items = dict(result)[query_header]['edges']
    listed_items = list(result_items)
    <http://logger.info|logger.info>(listed_items) <-- logging out exactly what I would expect in each of the child tasks
    return listed_items
Copy code
@task
def dedupe(data: list) -> pd.DataFrame:
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(data) <-- not showing anything
    flat_list = [item for sublist in data for item in sublist]
    result = [value['node'] for index, value in enumerate(flat_list)]
    deduped = pd.DataFrame(data=result).drop_duplicates(subset=['id'], inplace=True)
    return deduped
@nicholas
n

nicholas

01/29/2021, 6:13 PM
Thanks @Kieran - let me look into this a bit, nothing stands out to me immediately
k

Kieran

01/29/2021, 6:38 PM
Thanks @nicholas If I cant get it working then I will look at squashing my two map's together...
n

nicholas

01/30/2021, 12:05 AM
Hi @Kieran - I've put together a small example to show how this should work but I haven't figured out what's happening in your flow. I can't tell if your logs are going to the API but if you haven't already make sure to configure your logger as mentioned in these docs (in particular if you're using Server/Cloud you'll want to set
log_to_cloud=true
). Otherwise I've put together a small example of what this mapping pattern should look like:
Copy code
import prefect
from prefect import task, Flow


@task
def create_list():
    logger = prefect.context.get("logger")

    arr = [*range(0, 5, 1)]
    <http://logger.info|logger.info>(arr)
    return arr


@task
def log_and_transform_value(value):
    logger = prefect.context.get("logger")

    transformed_value = value * 2
    <http://logger.info|logger.info>(transformed_value)
    return transformed_value


@task
def log_values(values):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(values)
    return values


with Flow("Mapping Test") as flow:
    data = create_list()

    collect = log_and_transform_value.map(data)

    log_values(collect)


flow.run()
which would output something like this:
Copy code
[2021-01-29 19:00:00-0500] INFO - prefect.create_list | [0, 1, 2, 3, 4]
...
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[0] | 0
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[1] | 2
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[2] | 4
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[3] | 6
[2021-01-29 19:00:01-0500] INFO - prefect.log_and_transform_value[4] | 8
...
[2021-01-29 19:00:01-0500] INFO - prefect.log_values | [0, 2, 4, 6, 8]
which I think is what you're looking for
k

Kieran

01/30/2021, 7:00 PM
Thanks @nicholas, yes that is the behaviour I am expecting. I will set that config and see if that impacts my logs. I have my logger set up within my tasks like your example. Fwiw I'm testing this locally with server and local executor but the prod environment is using Cloud.
@nicholas so I got to the bottom of this. The underlying library
gql
that was being used for GraphQL queries was exiting swallowing all logs with it. That was the red herring which masked this problem. Stripping it out for the
requests
library has worked here. Thank you for your help!
🚀 1
3 Views