Kieran
01/29/2021, 5:24 PMPrefect automatically gathers mapped results into a list if they are needed by a non-mapped task
I have two mapped tasks, one feeding the other, then a third non-mapped task which expects a list. The logger context doesn't appear to be getting passed so I cant get any insight. The UI is showing the child-mapped tasks and I have logged out their content but their results are not being gathered together as suggested in the docs.
Any pointers to help would be amazing!nicholas
Kieran
01/29/2021, 5:26 PMwith Flow(
name=extract_load_name,
schedule=CronSchedule("0 8 * * *"),
storage=default_flow_storage,
run_config=default_flow_run_config
) as flow:
token = auth()
graphql_client = graphql_client(token)
search_query_total = query_total(
graphql_client,
customer_search_total,
customer_search_variables,
customer_query_header
)
query_offsets = offsets(
search_query_total,
default_etl_limit_size
)
search_queries = generate_graphql_queries.map(
offset=query_offsets,
query=unmapped(customer_search_gql),
chunk_size=unmapped(default_etl_limit_size)
)
extracted_data = extract.map(
query=search_queries,
graphql_client=unmapped(graphql_client),
query_variables=unmapped(customer_search_variables),
query_header=unmapped(customer_query_header)
)
deduped_data = dedupe(extracted_data)
nicholas
Kieran
01/29/2021, 5:29 PMextracted_data
logging it's return result out and that is as I expect.
Then the dedupe()
task should receive a nested list results from the child mapped tasks. (if I understand the docs correctly).Kieran
01/29/2021, 5:35 PM17:32:22
DEBUG
CloudTaskRunner
Task 'extract[3]': Handling state change from Running to Success
17:32:22
INFO
CloudTaskRunner
Task 'extract[3]': Finished task run for task with final state: 'Success'
17:32:22
INFO
CloudTaskRunner
Task 'extract[4]': Starting task run...
17:32:22
DEBUG
CloudTaskRunner
Task 'extract[4]': Handling state change from Pending to Running
17:32:22
DEBUG
CloudTaskRunner
Task 'extract[4]': Calling task.run() method...
Then nothing.
The extract and dedupe tasks are being marked as successful but do the logs suggest that there is a silent failure somewhere? (as I would expect them to continue logging or show an error)Kieran
01/29/2021, 5:35 PMnicholas
extract
and dedupe
tasks?Kieran
01/29/2021, 6:09 PM@task
def extract(query: str, graphql_client: Client, query_variables: dict, query_header: str) -> list:
logger = prefect.context.get("logger")
result = graphql_client.execute(
gql(query.replace('\n','')),
variable_values=query_variables
)
result_items = dict(result)[query_header]['edges']
listed_items = list(result_items)
<http://logger.info|logger.info>(listed_items) <-- logging out exactly what I would expect in each of the child tasks
return listed_items
@task
def dedupe(data: list) -> pd.DataFrame:
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(data) <-- not showing anything
flat_list = [item for sublist in data for item in sublist]
result = [value['node'] for index, value in enumerate(flat_list)]
deduped = pd.DataFrame(data=result).drop_duplicates(subset=['id'], inplace=True)
return deduped
@nicholasnicholas
Kieran
01/29/2021, 6:38 PMnicholas
log_to_cloud=true
). Otherwise I've put together a small example of what this mapping pattern should look like:
import prefect
from prefect import task, Flow
@task
def create_list():
logger = prefect.context.get("logger")
arr = [*range(0, 5, 1)]
<http://logger.info|logger.info>(arr)
return arr
@task
def log_and_transform_value(value):
logger = prefect.context.get("logger")
transformed_value = value * 2
<http://logger.info|logger.info>(transformed_value)
return transformed_value
@task
def log_values(values):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(values)
return values
with Flow("Mapping Test") as flow:
data = create_list()
collect = log_and_transform_value.map(data)
log_values(collect)
flow.run()
which would output something like this:
[2021-01-29 19:00:00-0500] INFO - prefect.create_list | [0, 1, 2, 3, 4]
...
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[0] | 0
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[1] | 2
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[2] | 4
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[3] | 6
[2021-01-29 19:00:01-0500] INFO - prefect.log_and_transform_value[4] | 8
...
[2021-01-29 19:00:01-0500] INFO - prefect.log_values | [0, 2, 4, 6, 8]
which I think is what you're looking forKieran
01/30/2021, 7:00 PMKieran
02/02/2021, 7:29 AMgql
that was being used for GraphQL queries was exiting swallowing all logs with it. That was the red herring which masked this problem.
Stripping it out for the requests
library has worked here. Thank you for your help!