Kieran
01/29/2021, 5:24 PMPrefect automatically gathers mapped results into a list if they are needed by a non-mapped tasknicholas
Kieran
01/29/2021, 5:26 PMwith Flow(
    name=extract_load_name,
    schedule=CronSchedule("0 8 * * *"),
    storage=default_flow_storage,
    run_config=default_flow_run_config
    ) as flow:
    token = auth()
    graphql_client = graphql_client(token)
    search_query_total = query_total(
        graphql_client,
        customer_search_total,
        customer_search_variables,
        customer_query_header
    )
    query_offsets = offsets(
        search_query_total,
        default_etl_limit_size
        )
    search_queries = generate_graphql_queries.map(
        offset=query_offsets,
        query=unmapped(customer_search_gql),
        chunk_size=unmapped(default_etl_limit_size)
    )
    extracted_data = extract.map(
        query=search_queries,
        graphql_client=unmapped(graphql_client),
        query_variables=unmapped(customer_search_variables),
        query_header=unmapped(customer_query_header)
    )
    deduped_data = dedupe(extracted_data)nicholas
Kieran
01/29/2021, 5:29 PMextracted_datadedupe()Kieran
01/29/2021, 5:35 PM17:32:22
DEBUG
CloudTaskRunner
Task 'extract[3]': Handling state change from Running to Success
17:32:22
INFO
CloudTaskRunner
Task 'extract[3]': Finished task run for task with final state: 'Success'
17:32:22
INFO
CloudTaskRunner
Task 'extract[4]': Starting task run...
17:32:22
DEBUG
CloudTaskRunner
Task 'extract[4]': Handling state change from Pending to Running
17:32:22
DEBUG
CloudTaskRunner
Task 'extract[4]': Calling task.run() method...Kieran
01/29/2021, 5:35 PMnicholas
extractdedupeKieran
01/29/2021, 6:09 PM@task
def extract(query: str, graphql_client: Client, query_variables: dict, query_header: str) -> list:
    logger = prefect.context.get("logger")
    result = graphql_client.execute(
        gql(query.replace('\n','')),
        variable_values=query_variables
    )
    result_items = dict(result)[query_header]['edges']
    listed_items = list(result_items)
    <http://logger.info|logger.info>(listed_items) <-- logging out exactly what I would expect in each of the child tasks
    return listed_items@task
def dedupe(data: list) -> pd.DataFrame:
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(data) <-- not showing anything
    flat_list = [item for sublist in data for item in sublist]
    result = [value['node'] for index, value in enumerate(flat_list)]
    deduped = pd.DataFrame(data=result).drop_duplicates(subset=['id'], inplace=True)
    return dedupednicholas
Kieran
01/29/2021, 6:38 PMnicholas
log_to_cloud=trueimport prefect
from prefect import task, Flow
@task
def create_list():
    logger = prefect.context.get("logger")
    arr = [*range(0, 5, 1)]
    <http://logger.info|logger.info>(arr)
    return arr
@task
def log_and_transform_value(value):
    logger = prefect.context.get("logger")
    transformed_value = value * 2
    <http://logger.info|logger.info>(transformed_value)
    return transformed_value
@task
def log_values(values):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(values)
    return values
with Flow("Mapping Test") as flow:
    data = create_list()
    collect = log_and_transform_value.map(data)
    log_values(collect)
flow.run()[2021-01-29 19:00:00-0500] INFO - prefect.create_list | [0, 1, 2, 3, 4]
...
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[0] | 0
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[1] | 2
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[2] | 4
[2021-01-29 19:00:00-0500] INFO - prefect.log_and_transform_value[3] | 6
[2021-01-29 19:00:01-0500] INFO - prefect.log_and_transform_value[4] | 8
...
[2021-01-29 19:00:01-0500] INFO - prefect.log_values | [0, 2, 4, 6, 8]Kieran
01/30/2021, 7:00 PMKieran
02/02/2021, 7:29 AMgqlrequests