my code is simple as this ```@task(max_retries=3, ...
# ask-community
s
my code is simple as this
Copy code
@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def fetch_data(count,check):
    data = []
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>(data)
    return data
    

def run():
    with Flow("fetch-data") as local_flow:
        count = Parameter("count", default=100)
        check = Parameter("limit", default=10)
        fetch_data(count,check)

    local_flow.register(project_name="test")
    id = local_flow.run()
    print(id)
        
if __name__ == "__main__":
    run()
k
Hi @shekhar koirala, I deleted the traceback because it was taking too much space in the main channel. Adding it to the thread here:
Copy code
File "/usr/lib/python3.7/pickle.py", line 524, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread.lock objects
So I ran this code and it works for me. In general, task outputs needs to be serializeable because it’s a requirement to move data/functions around to Dask workers. Prefect uses
cloudpickle
so you can test if something is serializeable by doing
cloudpickle.dumps
. Normally, connections and clients are not serializeable. If you do not plan to use Dask, there is one thing you can do to avoid serialization, you can check this and use script-based storage so that Prefect won’t serialize the Flow.