a

    Avi A

    2 years ago
    Hey, what would you say is the “prefect” way to implement a simple groupBy (or some other reduce operation)? Suppose I have data from several sources (for example partitions) and would like to perform a groupBy+count on the data. With Dask I’d simply load it to a dask dataframe and do the operation on the dataframe. With prefect, the only example shows how to do a simple sum and you basically implement the map-reduce at a low level instead of specifying it logically. So, how would you recommend doing this?
    nicholas

    nicholas

    2 years ago
    Hi @Avi A - there's not a specific task like
    .map
    that handles group by; the way I'd recommend to handle that use case would be to implement the group by using either downstream filtering tasks on the entire dataset or by individual map-reduce tasks (as you've described). Curious of your use case to separate them into separate tasks though if this doesn't work for you
    a

    Avi A

    2 years ago
    I’m not sure what you mean by the filtering tasks
    What I meant was something like this
    @task
    def extract_partition_pointers():
        return list(range(10))
    
    @task
    def extract_partition(p):
        return [(p, 1)]*10 + [(p+1, 2)]*10  # dummy data with 2 keys
    
    @task
    def groupby_sum_partition(data, key=0, value=1):
        reduced_data = defaultdict(lambda: 0)
        for row in data:
            reduced_data[row[key]] += row[value]
        return list(reduced_data.items())
    
    @task
    def groupby_sum_all(data, key=0, value=1):
        reduced_data = defaultdict(lambda: 0)
        for rows in data:
            for row in rows:
                reduced_data[row[key]] += row[value]
        return list(reduced_data.items())
    
    with Flow("Groupby sum") as flow:
        partition_pointers = extract_partition_pointers()
        all_data = extract_partition.map(partition_pointers)
        grouped_partitions = groupby_sum_partition.map(all_data)
        grouped_data = groupby_sum_all(grouped_partitions)
    But - since my groupby operation isn’t a simple sum, what I’m probably going to do is have one task that reads it to a dask dataframe and does the work. It’s not the prefectish way to do it but it’s way simpler
    nicholas

    nicholas

    2 years ago
    Got it; I think what you've described is fine, is there some more granularity you're hoping to get that you're not able to with the standard Python conventions you're using?
    a

    Avi A

    2 years ago
    Not sure yet but I think it’s a bit tedious to write the flow this way, don’t you think? I mean, it’s way simpler to have Dask (or even Spark, for that matter) do all of that in one task, and also manage the resources. But the thing is that Prefect aims towards small tasks, and that kinda goes against that.
    nicholas

    nicholas

    2 years ago
    @Avi A I wouldn't say we encourage small tasks, rather that we allow them so as to let you be as granular as possible; in this case if creating a single task to handle your groupby by interfacing directly with dask makes the most sense for your pipeline, by all means that's a perfectly valid pattern in Prefect. Particularly if the transformations/groupings in your data don't need to individually leverage Prefect conventions (like retries or result handling) but is just as well served by those things at the group level, it would be just fine to do in one task. 👍
    a

    Avi A

    2 years ago
    following up here: is there a way to pass the Dask scheduler / client details to the task so that it will use the scheduler when doing stuff with a dask dataframe?
    I see the executor information appears in the
    context
    object, but the address itself an empty string. Is it possible it’s a bug?
    my environment:
    return RemoteEnvironment(
                executor="prefect.engine.executors.DaskExecutor",
                executor_kwargs={
                    'address': '<tcp://workstation-1:8786>',
                },
            )
    the relevant part in the
    context
    'engine': {'executor': {'dask': {'address': '',
                                      'cluster_class': 'distributed.deploy.local.LocalCluster'},
                                      'default_class': 'prefect.engine.executors.LocalExecutor'},
                'flow_runner': {'default_class': 'prefect.engine.flow_runner.FlowRunner'},
                'task_runner': {'default_class': 'prefect.engine.task_runner.TaskRunner'}},
    nicholas

    nicholas

    2 years ago
    Hi @Avi A, it's possible that's a bug, please open a GitHub ticket with your details and a min reproducible code sample, if possible.
    a

    Avi A

    2 years ago
    Sure, thanks!
    @nicholas I was about to open a bug but I not sure it’s a bug. See, the
    engine
    section here was part of the
    config
    section so it’s just the default config. So there’s no bug here. The question goes back to: can I somehow know the executor details in a task?