https://prefect.io logo
Title
k

kwmiebach

09/02/2022, 2:46 PM
Hello šŸ™‚ I believe I ran into a threading problem yesterday. I am converting some data extraction pipelines to prefect 2 flows, which works fine for most of them. I just add the decorators and some logging. But one of the data pipelines uses sqlite for intermediate storage, and this is the error message I receive:
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140193261803264 and this is thread id 140192735962880.
I can also paste the part of the code where sqlite is called. I am also trying to guess the reason behind the error. There is an sqlite object created in a function. Within the function I define another function which uses this object. But both python functions are not prefect flows or tasks. They live within a bigger prefect flow. So here comes my first question: Does prefect 2 create a different thread for each nested function inside a flow or a task. Otherwise I cannot explain why the 2 parts of the code would run in different threads.
āœ… 1
k

Khuyen Tran

09/02/2022, 2:53 PM
Which version of Prefect are you in?
k

kwmiebach

09/02/2022, 2:59 PM
let me check
@Khuyen Tran 2.0.3
k

Khuyen Tran

09/02/2022, 3:01 PM
Can you upgrade to the latest version of Prefect and see if this error still persists?
r

Ryan Peden

09/02/2022, 3:03 PM
If the error still happens on 2.3.1, are you able to post any code? Or if not, even example code that shows how you are using the function and nested function would be useful. I have an idea of why this is happening but seeing code would make it easier to confirm. šŸ™‚
k

kwmiebach

09/02/2022, 3:04 PM
Yes I will upgrade and I just made a photo:
There are 3 parts. Part1 creates the DB_FIBU object, then there is function which useses this object. But I am not passing the object, I use the python
nonlocal
keyword.
In the the third part the function is used, and this is goes into a petl data pipeline, which is only executed later. The code in the photo only cinstructs the p[ipeline, but it is not yet running.
r

Ryan Peden

09/02/2022, 3:08 PM
does the pipeline execution happen in a Prefect task?
k

kwmiebach

09/02/2022, 3:09 PM
yes it does
I just upgraded to prefect 2.3.1 the error is still the same. And it is telling me it happens in line 68-76 from the photo
r

Ryan Peden

09/02/2022, 3:12 PM
I think that's why you are seeing this. Prefect 2's default task runner is
ConcurrentTaskRunner
, which uses a thread pool to run tasks. Your nested function creates a closure around the
sqlite3.Row
created on the main thread, so you see this error when you execute the function later on inside a task
k

kwmiebach

09/02/2022, 3:14 PM
ah I see. yes the closure was somehow necessary because of the lambda I believe. but another option
SequentialTaskRunner
also uses threads? but in sequence?
I am just looking at the code, Maybe I can get rid of the closure. Just pass
DB_FIBU
as a normal parameter
I just tried and it did not help.
r

Ryan Peden

09/02/2022, 3:24 PM
A couple of things to try: ā€¢ Call the function that returns this nested function from inside the task that executes the pipeline. Even if this isn't how you want to structure your code in your final flow, it would help narrow down the cause. ā€¢ Try running the flow using
SeqentialTaskRunner
and see if the problem goes away. Again, it might not be the solution you want to use for your production flow but would provide useful information.
k

kwmiebach

09/02/2022, 3:31 PM
Thank you very much for looking into this. In the meantime I tried some things, and I believe now that the reason was in the step that executes the pipeline, which is further below and cannot be seen in the photo. This was using a prefect task. I switched this to a non prefect function and now it seems to work. It works with both sequential and concurrent task runner. I am still trying to recreate the initial condition just to be sure.
OK, this seems to be the solution. When I recreated the initial state with the prefect task that runs the pipeline, the error is back. And this somehow makes sense. This is a photo further below, where the pipeline is executed:
I tried again back and forth several times now. I have the main flow configured with
SequentialTaskRunner
the whole time now. When I comment the 'task' line, where the pipeline is executed, it works. When I uncomment it, which means the pipeline is executed in a prefect 2 task, then the 'different threads' error re-appears.
So my guess would be that a task runs in its own thread, no matter what the flow that it lives in is configured. @Ryan Peden
I can live with that, I will change the code so it does not happen. šŸ™‚ Thanks again
r

Ryan Peden

09/02/2022, 3:48 PM
Thanks for sharing all of that - it is much appreciated
I'm not sure if it fits your use case (or if you've tried it already) but using aiosqlite might help as well. It is one of Prefect's dependencies so it will already be installed in your environment
k

kwmiebach

09/02/2022, 4:04 PM
Yes this looks good. I made a note in m source and next time I work on the sqlite part which I will soon I would love to swap it out. šŸ™‚
I had not noticed it exists