Michael Reeves
07/17/2020, 8:09 PMChris White
07/17/2020, 8:14 PMMichael Reeves
07/17/2020, 8:20 PMChris White
07/17/2020, 8:23 PMMichael Reeves
07/17/2020, 8:31 PMChris White
07/17/2020, 8:34 PMMichael Reeves
07/17/2020, 8:37 PM@task()
def connection():
prefect.context['db'] = DATABASE(url, user, pass)
@task()
def operation():
db = prefect.context['db']
# do stuf....
db.send('{"asdf": aaaa}')
with Flow('stuff') as flow:
connection()
operation()
close_connection()
Chris White
07/17/2020, 8:42 PMcontext
is not mutable in that way; it will get refreshed with each task run; at this exactly moment you’ll most likely need to duplicate your DATABASE
code within each task that requires the connectionMichael Reeves
07/17/2020, 8:50 PMcontext
, which doesn't seem possible yet per your first comment (since resource managers would be instantiated with the flow, which would make it un-serializable :/Chris White
07/17/2020, 8:54 PMMichael Reeves
07/17/2020, 8:55 PMChris White
07/17/2020, 9:08 PM