Simone Cittadini
10/01/2020, 9:50 AMclass Process(Task):
def fu(self, rows):
for row in rows:
yield row + 100
def run(self, data):
output = {}
output["meta"] = data["meta"]
output["data"] = self.fu(data["rows"])
return output
@task
def pull(data):
for row in data["data"]:
print(row)
with Flow("test") as flow:
input_data = {"rows": [1,2,3,4], "meta": "fixedvalue"}
process = Process()
data = process(input_data)
drain = pull(data)
emre
10/01/2020, 11:33 AMLocalExecutor
setup without a prefect server/cloud backend. The caveat is the following:
generator objects aren’t serializable by cloudpickle.
To communicate results between dask workers, prefect needs to serialize outputs with cloudpickle, therefore DaskExecutor
won’t work.
AFAIK prefect also needs to serialize inputs and outputs if it needs to communicate with a backend server. I am not well versed on this matter, disabling checkpointing may enable you to use a server backend.LocalExecutor()
. I assume by disabling checkpointing when running with a backend server, your code would still work.
Alternatively, you can extend one of the Result
objects to handle generators explicitly, if your generator can easily be recreated by some custom code. That would enable you to still use checkpointing in the remainder of the flow.Simone Cittadini
10/01/2020, 12:44 PM