Hi everyone. I have this situation where after ref...
# prefect-community
o
Hi everyone. I have this situation where after refactoring some of my code that builds the flow by adding tasks, there seems to be some memory leak as a result of an infinite loop cycle. I would like to compare the flow generated by the new code to the one generated by the old code. Is there a way of doing this? My dot visualization is too complex to inspect by hand (very big graph, about a hundred noes). I've tried using the
flow.sorted_tasks
function and output that result to a CSV to be able to compare the two projects, but it seems that the function returns different files even for the same program (I guess the internal sorting mechanism doesn't generate the same sort at every run). Any ideas would be appreciated. Thanks!
d
I have a snippet that may help you get a stable sort. I am saving a report on a CSV file ordered by task. Note that the order of
sorted_tasks
is a graph topological order (i.e. it orders with respect to the edges/dependencies). This is a partial order, which is why the order is not always the same. Here is a snippet that you can adapt to at least get the same order: I am using the topographical order, then I compare the task name to get a total order
Copy code
def state_report(flow_state, flow=None):
    rows = []
    sorted_tasks = flow.sorted_tasks() if flow else []
    for task in flow_state.result:
        state = flow_state.result[task]
        rows.append({
            'task class': type(task).__name__,
            'task name': task.name,
            'status': type(state).__name__,
            'message': state.message,
            'exception': extract_state_exception(state),
            'order': (sorted_tasks.index(task) if task in sorted_tasks else sys.maxsize, task.name, -1),
        })
        if state.is_mapped():
            for i, mapped_state in enumerate(state.map_states):
                rows.append({
                    'task class': type(task).__name__,
                    'task name': f'{task.name}[{i}]',
                    'status': type(mapped_state).__name__,
                    'message': mapped_state.message,
                    'exception': extract_state_exception(mapped_state),
                    'order': (sorted_tasks.index(task) if task in sorted_tasks else sys.maxsize, task.name, i),
                })

    df = (
        pd.DataFrame.from_records(rows)
        # Show tasks by their topological order, then reset the index
        .sort_values(by='order')
        .reset_index(drop=True)
    )
    return df