Avi A
06/08/2020, 7:10 PMnicholas
06/08/2020, 7:47 PM.map
that handles group by; the way I'd recommend to handle that use case would be to implement the group by using either downstream filtering tasks on the entire dataset or by individual map-reduce tasks (as you've described). Curious of your use case to separate them into separate tasks though if this doesn't work for youAvi A
06/08/2020, 8:35 PM@task
def extract_partition_pointers():
return list(range(10))
@task
def extract_partition(p):
return [(p, 1)]*10 + [(p+1, 2)]*10 # dummy data with 2 keys
@task
def groupby_sum_partition(data, key=0, value=1):
reduced_data = defaultdict(lambda: 0)
for row in data:
reduced_data[row[key]] += row[value]
return list(reduced_data.items())
@task
def groupby_sum_all(data, key=0, value=1):
reduced_data = defaultdict(lambda: 0)
for rows in data:
for row in rows:
reduced_data[row[key]] += row[value]
return list(reduced_data.items())
with Flow("Groupby sum") as flow:
partition_pointers = extract_partition_pointers()
all_data = extract_partition.map(partition_pointers)
grouped_partitions = groupby_sum_partition.map(all_data)
grouped_data = groupby_sum_all(grouped_partitions)
But - since my groupby operation isn’t a simple sum, what I’m probably going to do is have one task that reads it to a dask dataframe and does the work. It’s not the prefectish way to do it but it’s way simplernicholas
06/08/2020, 9:54 PMAvi A
06/08/2020, 9:58 PMnicholas
06/08/2020, 10:28 PMAvi A
06/21/2020, 11:07 AMcontext
object, but the address itself an empty string. Is it possible it’s a bug?return RemoteEnvironment(
executor="prefect.engine.executors.DaskExecutor",
executor_kwargs={
'address': '<tcp://workstation-1:8786>',
},
)
the relevant part in the context
'engine': {'executor': {'dask': {'address': '',
'cluster_class': 'distributed.deploy.local.LocalCluster'},
'default_class': 'prefect.engine.executors.LocalExecutor'},
'flow_runner': {'default_class': 'prefect.engine.flow_runner.FlowRunner'},
'task_runner': {'default_class': 'prefect.engine.task_runner.TaskRunner'}},
nicholas
06/23/2020, 7:29 AMAvi A
06/23/2020, 7:31 AMengine
section here was part of the config
section so it’s just the default config. So there’s no bug here. The question goes back to: can I somehow know the executor details in a task?