Kieran01/29/2021, 5:24 PM
I have two mapped tasks, one feeding the other, then a third non-mapped task which expects a list. The logger context doesn't appear to be getting passed so I cant get any insight. The UI is showing the child-mapped tasks and I have logged out their content but their results are not being gathered together as suggested in the docs. Any pointers to help would be amazing!
Prefect automatically gathers mapped results into a list if they are needed by a non-mapped task
Kieran01/29/2021, 5:26 PM
with Flow( name=extract_load_name, schedule=CronSchedule("0 8 * * *"), storage=default_flow_storage, run_config=default_flow_run_config ) as flow: token = auth() graphql_client = graphql_client(token) search_query_total = query_total( graphql_client, customer_search_total, customer_search_variables, customer_query_header ) query_offsets = offsets( search_query_total, default_etl_limit_size ) search_queries = generate_graphql_queries.map( offset=query_offsets, query=unmapped(customer_search_gql), chunk_size=unmapped(default_etl_limit_size) ) extracted_data = extract.map( query=search_queries, graphql_client=unmapped(graphql_client), query_variables=unmapped(customer_search_variables), query_header=unmapped(customer_query_header) ) deduped_data = dedupe(extracted_data)
Kieran01/29/2021, 5:29 PM
logging it's return result out and that is as I expect. Then the
task should receive a nested list results from the child mapped tasks. (if I understand the docs correctly).
Then nothing. The extract and dedupe tasks are being marked as successful but do the logs suggest that there is a silent failure somewhere? (as I would expect them to continue logging or show an error)
17:32:22 DEBUG CloudTaskRunner Task 'extract': Handling state change from Running to Success 17:32:22 INFO CloudTaskRunner Task 'extract': Finished task run for task with final state: 'Success' 17:32:22 INFO CloudTaskRunner Task 'extract': Starting task run... 17:32:22 DEBUG CloudTaskRunner Task 'extract': Handling state change from Pending to Running 17:32:22 DEBUG CloudTaskRunner Task 'extract': Calling task.run() method...
Kieran01/29/2021, 6:09 PM
@task def extract(query: str, graphql_client: Client, query_variables: dict, query_header: str) -> list: logger = prefect.context.get("logger") result = graphql_client.execute( gql(query.replace('\n','')), variable_values=query_variables ) result_items = dict(result)[query_header]['edges'] listed_items = list(result_items) <http://logger.info|logger.info>(listed_items) <-- logging out exactly what I would expect in each of the child tasks return listed_items
@task def dedupe(data: list) -> pd.DataFrame: logger = prefect.context.get("logger") <http://logger.info|logger.info>(data) <-- not showing anything flat_list = [item for sublist in data for item in sublist] result = [value['node'] for index, value in enumerate(flat_list)] deduped = pd.DataFrame(data=result).drop_duplicates(subset=['id'], inplace=True) return deduped
Kieran01/29/2021, 6:38 PM
). Otherwise I've put together a small example of what this mapping pattern should look like:
which would output something like this:
import prefect from prefect import task, Flow @task def create_list(): logger = prefect.context.get("logger") arr = [*range(0, 5, 1)] <http://logger.info|logger.info>(arr) return arr @task def log_and_transform_value(value): logger = prefect.context.get("logger") transformed_value = value * 2 <http://logger.info|logger.info>(transformed_value) return transformed_value @task def log_values(values): logger = prefect.context.get("logger") <http://logger.info|logger.info>(values) return values with Flow("Mapping Test") as flow: data = create_list() collect = log_and_transform_value.map(data) log_values(collect) flow.run()
which I think is what you're looking for
[2021-01-29 19:00:00-0500] INFO - prefect.create_list | [0, 1, 2, 3, 4] ... [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value | 0 [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value | 2 [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value | 4 [2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value | 6 [2021-01-29 19:00:01-0500] INFO - prefect.log_and_transform_value | 8 ... [2021-01-29 19:00:01-0500] INFO - prefect.log_values | [0, 2, 4, 6, 8]
Kieran01/30/2021, 7:00 PM
that was being used for GraphQL queries was exiting swallowing all logs with it. That was the red herring which masked this problem. Stripping it out for the
library has worked here. Thank you for your help!