Hi All! Prefect2.0b7 Dockerflowrunner, dasktaskrun...
# prefect-community
f
Hi All! Prefect2.0b7 Dockerflowrunner, dasktaskrunner. I am getting the following error
_pickle.PicklingError: Can't pickle <function db_con at 0x7f85c986f880>: it's not the same object as __main__.db_con
when I run a task with querying a database. the code runs fine without that db connection task.
1
I am using SQLAlchemy, sessionmaker for db connection
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Customers(Base): #_TEST
__tablename__ = 'records'
id = Column(String, primary_key=True)
dt_ut = Column(Integer)
Base.metadata.create_all(engine)
Session = sessionmaker(bind = engine)
session = Session()
def ConvertToDict(a):
it = iter(a)
res_dct = dict(zip(it, it))
return res_dct
# Query db table and convert it to dictonary
def query_db(x):
qur = session.query(x.lid, x.dtUtDownload).all()
qur1= list(itertools.chain(*qur))
return ConvertToDict(qur1)
@task
def db_con(): res = query_db(LRD)
a
connections can't be pickled atm try moving the connection to the task that needs it without passing the connection between tasks
f
@Anna Geller still same error. I created a separate task for querying a db.
@flow(task_runner=DaskTaskRunner())
def C():
a1=db_con()
a
could you build a minimal example I could use to reproduce the same error?
👍 1
f
from pathlib import *
from prefect import task, flow
from prefect import get_run_logger
from prefect.blocks.storage import FileStorageBlock
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
from prefect.task_runners import SequentialTaskRunner, DaskTaskRunner
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, VARCHAR, text
from sqlalchemy.engine.url import URL
from sqlalchemy.dialects.postgresql import insert
import sqlalchemy
import itertools
url = URL.create('postgresql', 'admin', '123', 'localhost', '5432', 'postgres_db')
engine = create_engine(url, echo = False)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class LRD(Base):
__tablename__ = 'LastRecordedDate'
lid = Column(String, primary_key=True)
dtUtDownload = Column(String)
DtUtProcessed = Column(String)
Tmpcol = Column(String)
Base.metadata.create_all(engine)
Session = sessionmaker(bind = engine)
session = Session()
def ConvertToDict(a):
it = iter(a)
res_dct = dict(zip(it, it))
return res_dct
# Query db table and convert it to dictonary
def query_db(x):
qur = session.query(x.lid, x.dtUtDownload).all()
qur1= list(itertools.chain(*qur))
return ConvertToDict(qur1)
@task
def db_con():
LRD_dict = query_db(LRD)
print(LRD_dict)
@flow(task_runner=DaskTaskRunner())
def main1():
a=db_con()
print(a.result())
if __name__ == "__main__":
main1()
DeploymentSpec(
name="downloader_dask_01",
flow=main1,
flow_runner=DockerFlowRunner())
@Anna Geller I have had a successful run in python without prefect.
@Anna Geller
connections can't be pickled atm
try moving the connection to the task that needs it without passing the connection between tasks (edited)
What I found so far is that any db connection in task cannot be pickled. I tried with SQLalchemy and pony both giving me same error.
tried prefect 2.08b same result
a
Why are you using Dask here? Try without Dask, this will likely help fix your issue. You're not running anything in parallel here, so Dask causes only unnecessary trouble atm :)
f
@Anna Geller thats one section of code. original code scans nested directories, downloads and process the flies. A postgress database keeps tracks of the files. below snap is the parent dir. I running a for loop on a list of the directory names to download the files(parallel )
My code works like this:
scans dir >> findfiles>>check db if the file is downloaded Y/N>>download the files>>update db with the latest download date per folder i.e 1004 18-07-2022
before prefect I was using python multiprocessing and it would spawn a new process per folder name i .e 1004 depending on number of processors for 4 processors there would be 4 process running each following the same logic I explained above.
a