Hey, what would you say is the “prefect” way to im...
# prefect-community
a
Hey, what would you say is the “prefect” way to implement a simple groupBy (or some other reduce operation)? Suppose I have data from several sources (for example partitions) and would like to perform a groupBy+count on the data. With Dask I’d simply load it to a dask dataframe and do the operation on the dataframe. With prefect, the only example shows how to do a simple sum and you basically implement the map-reduce at a low level instead of specifying it logically. So, how would you recommend doing this?
n
Hi @Avi A - there's not a specific task like
.map
that handles group by; the way I'd recommend to handle that use case would be to implement the group by using either downstream filtering tasks on the entire dataset or by individual map-reduce tasks (as you've described). Curious of your use case to separate them into separate tasks though if this doesn't work for you
a
I’m not sure what you mean by the filtering tasks
What I meant was something like this
Copy code
@task
def extract_partition_pointers():
    return list(range(10))

@task
def extract_partition(p):
    return [(p, 1)]*10 + [(p+1, 2)]*10  # dummy data with 2 keys

@task
def groupby_sum_partition(data, key=0, value=1):
    reduced_data = defaultdict(lambda: 0)
    for row in data:
        reduced_data[row[key]] += row[value]
    return list(reduced_data.items())

@task
def groupby_sum_all(data, key=0, value=1):
    reduced_data = defaultdict(lambda: 0)
    for rows in data:
        for row in rows:
            reduced_data[row[key]] += row[value]
    return list(reduced_data.items())

with Flow("Groupby sum") as flow:
    partition_pointers = extract_partition_pointers()
    all_data = extract_partition.map(partition_pointers)
    grouped_partitions = groupby_sum_partition.map(all_data)
    grouped_data = groupby_sum_all(grouped_partitions)
But - since my groupby operation isn’t a simple sum, what I’m probably going to do is have one task that reads it to a dask dataframe and does the work. It’s not the prefectish way to do it but it’s way simpler
n
Got it; I think what you've described is fine, is there some more granularity you're hoping to get that you're not able to with the standard Python conventions you're using?
a
Not sure yet but I think it’s a bit tedious to write the flow this way, don’t you think? I mean, it’s way simpler to have Dask (or even Spark, for that matter) do all of that in one task, and also manage the resources. But the thing is that Prefect aims towards small tasks, and that kinda goes against that.
n
@Avi A I wouldn't say we encourage small tasks, rather that we allow them so as to let you be as granular as possible; in this case if creating a single task to handle your groupby by interfacing directly with dask makes the most sense for your pipeline, by all means that's a perfectly valid pattern in Prefect. Particularly if the transformations/groupings in your data don't need to individually leverage Prefect conventions (like retries or result handling) but is just as well served by those things at the group level, it would be just fine to do in one task. 👍
P 1
a
following up here: is there a way to pass the Dask scheduler / client details to the task so that it will use the scheduler when doing stuff with a dask dataframe?
I see the executor information appears in the
context
object, but the address itself an empty string. Is it possible it’s a bug?
my environment:
Copy code
return RemoteEnvironment(
            executor="prefect.engine.executors.DaskExecutor",
            executor_kwargs={
                'address': '<tcp://workstation-1:8786>',
            },
        )
the relevant part in the
context
Copy code
'engine': {'executor': {'dask': {'address': '',
                                  'cluster_class': 'distributed.deploy.local.LocalCluster'},
                                  'default_class': 'prefect.engine.executors.LocalExecutor'},
            'flow_runner': {'default_class': 'prefect.engine.flow_runner.FlowRunner'},
            'task_runner': {'default_class': 'prefect.engine.task_runner.TaskRunner'}},
n
Hi @Avi A, it's possible that's a bug, please open a GitHub ticket with your details and a min reproducible code sample, if possible.
a
Sure, thanks!
👍 1
@nicholas I was about to open a bug but I not sure it’s a bug. See, the
engine
section here was part of the
config
section so it’s just the default config. So there’s no bug here. The question goes back to: can I somehow know the executor details in a task?