Hi, probably a dumb question, but just to be sure,...
# prefect-community
s
Hi, probably a dumb question, but just to be sure, tasks returning generators as part of their output, are they always guaranteed to work ? Something like this : ( note, I will never use a Dask runner, this is a special case working on huge files )
class Process(Task):
def fu(self, rows):
for row in rows:
yield row + 100
def run(self, data):
output = {}
output["meta"] = data["meta"]
output["data"] = self.fu(data["rows"])
return output
@task
def pull(data):
for row in data["data"]:
print(row)
with Flow("test") as flow:
input_data = {"rows": [1,2,3,4], "meta": "fixedvalue"}
process = Process()
data = process(input_data)
drain = pull(data)
e
This will work for a basic
LocalExecutor
setup without a prefect server/cloud backend. The caveat is the following: generator objects aren’t serializable by cloudpickle. To communicate results between dask workers, prefect needs to serialize outputs with cloudpickle, therefore
DaskExecutor
won’t work. AFAIK prefect also needs to serialize inputs and outputs if it needs to communicate with a backend server. I am not well versed on this matter, disabling checkpointing may enable you to use a server backend.
upvote 2
From the docs: https://docs.prefect.io/core/advanced_tutorials/using-results.html Checkpointing is disabled for core runs by default, which is why your code works on
LocalExecutor()
. I assume by disabling checkpointing when running with a backend server, your code would still work. Alternatively, you can extend one of the
Result
objects to handle generators explicitly, if your generator can easily be recreated by some custom code. That would enable you to still use checkpointing in the remainder of the flow.
upvote 1
s
Ty ! Yes it's a special / simple case where its not a problem giving up in Dask / Checkpointing, but ETL needs to work on huge files