shekhar koirala
09/14/2021, 8:01 AM@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def fetch_data(count,check):
data = []
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(data)
return data
def run():
with Flow("fetch-data") as local_flow:
count = Parameter("count", default=100)
check = Parameter("limit", default=10)
fetch_data(count,check)
local_flow.register(project_name="test")
id = local_flow.run()
print(id)
if __name__ == "__main__":
run()
Kevin Kho
File "/usr/lib/python3.7/pickle.py", line 524, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.lock objects
Kevin Kho
cloudpickle
so you can test if something is serializeable by doing cloudpickle.dumps
. Normally, connections and clients are not serializeable.
If you do not plan to use Dask, there is one thing you can do to avoid serialization, you can check this and use script-based storage so that Prefect won’t serialize the Flow.