https://prefect.io logo
Title
m

Michael Reeves

07/17/2020, 8:09 PM
hey quick question: I want to be able to send data to my database over the course of a flow run in order to update multiple different database entries for objects that are created/used during various tasks within this flow. What I'm planning to do is create a database object at the beginning of my flow and store it in my flows context. I'll probably put this functionality into a task. Is there a better way to do this? This way throughout the flow I'm able to send/update database entries by grabbing a reference to this database connection from the context, or maybe just using it directly. What I really want to be able to do is have this database be a property of the flow, so that I can have a Flow-database-object referencable across all my tasks, but that doesn't seem to be possible at the moment, and the context variable seems to be the best way to do this. Another solution I just thought of is to break all these database connections into smaller tasks, and have each task have a callback function but then I'll essentially have WAY to many tasks in my flow. I'm curious if others have had this problem, and what solutions they had.
c

Chris White

07/17/2020, 8:14 PM
Hey Michael - we’ve been exploring this pattern recently; on Prefect master we recently introduced a “Resource Manager” that you could use to create a database connection and clean it up: https://github.com/PrefectHQ/prefect/pull/2913 This will get released with 0.12.5 next week. Note that if you open up a database connection as your resource, then this flow will not be able to run on Dask because open DB connections are typically not-serializable, but it will be able to run with a local executor.
m

Michael Reeves

07/17/2020, 8:20 PM
Yeah I need to run it on a dask, so I thought if I open the database connection in a task and save it to the context, then it would be serializable? I'm guessing the resource manager is serializable? this will actually be pretty useful for other areas I'll need to manage
c

Chris White

07/17/2020, 8:23 PM
Unfortunately no, there is no way to make an open database connection a serializable object; you’ll have to open a new connection in the new process / machine that is running the code. We’re looking at how to support this pattern (e.g., you tell us “this is an unpickleable object” and we will create it in every new process / machine) but it breaks our state semantics so we haven’t quite nailed it down yet
m

Michael Reeves

07/17/2020, 8:31 PM
dang 😕 not even if I create the connection as part of a task in the flow? That way the db object is never instantiated until the flow is sent to the dask runner, or it may be I don't understand how flows are run across a dask cluster. To my understanding, when the flow.run() sends Flowrunner object to a dask scheduler on a dask cluster (I have dask executor setup as my default executor), it serializes all the tasks within that flow and a dask runner executes each task. So if I open the db in task it won't need to be serialized to the dask runner, since its only instantiated at the dask runner when the flowrunner is executed? Is there a piece I am missing, sorry if this explanation is unclear
thanks for the help btw 😄
👍 1
c

Chris White

07/17/2020, 8:34 PM
Ah yes so if you actually instantiate / create the connection within each task that needs it, and you don’t return the connection from the task then you should be good!
m

Michael Reeves

07/17/2020, 8:37 PM
ah so I can't store it in the context? I'm guessing this operation below is what resource manager is created for? What I planned on doing was the following:
@task()
def connection():
    prefect.context['db'] = DATABASE(url, user, pass)

@task()
def operation():
    db = prefect.context['db']
    # do stuf.... 
    db.send('{"asdf": aaaa}')
    

with Flow('stuff') as flow: 
    connection()
    operation()
    close_connection()
c

Chris White

07/17/2020, 8:42 PM
Ah no,
context
is not mutable in that way; it will get refreshed with each task run; at this exactly moment you’ll most likely need to duplicate your
DATABASE
code within each task that requires the connection
(but we hope to improve this soon!)
m

Michael Reeves

07/17/2020, 8:50 PM
Darn 😕 I suppose I could do something like that, by passing in a variable that is a reference to the connection into every task, it would just be really annoying. Would be great if I could do something similar with a resource manager and instantiate the resource in a task and pass it across tasks, but I suppose I would still want something similar to using
context
, which doesn't seem possible yet per your first comment (since resource managers would be instantiated with the flow, which would make it un-serializable :/
I technically could just make api calls with requests, but then that defeats the point of the wrapper database class that I'm using
Unless theres something useful in master, or a better way to achieve that desired functionality, I suppose for now I'll stick to passing the references to the connections as a paramter to each task
c

Chris White

07/17/2020, 8:54 PM
Yea, if you’d like to open an issue requesting the ability to share non-serializable objects across tasks, please do! It’s something we’ve had internal discussions about so it’d be a nice to have a place to track this
m

Michael Reeves

07/17/2020, 8:55 PM
Great, glad theres not a better work around I may have missed! i can open one up
💯 2
c

Chris White

07/17/2020, 9:08 PM
Awesome, thanks Michael! Anytime!
👍 1