https://prefect.io logo
Title
f

Federico Zambelli

04/20/2023, 1:37 PM
Hey folks, I was wondering if it's possible to make our tasks and flows functions as methods of a class, without having the linter screaming at me (and without disabling type checking). Example of what I mean, try this:
class MyClass:
    @task
    def doStuffWithArgs(self, myArg: str):
        ...

    def doStuffWithoutArgs(self):
        ...

    @flow
    def doMoreStuff(self):
        self.doStuffWithArgs("stuff")
        self.doStuffWithoutArgs()
You will see that for
doStuffWithArgs
Pylance complains that no overload matches that call (see screenshot):
1
f

flapili

04/20/2023, 1:39 PM
in my opinion this design is not easy to maintain anyway
and probably not prefect friendly
f

Federico Zambelli

04/20/2023, 1:41 PM
Thanks for the feedback, could you elaborate on that? The reason I wanted a class is because I have certain objects that I want to reuse across different tasks. In my head it made more sense to be able to do
<http://this.my|this.my>_object
instead of either passing it as an arg of each task or making it global
what would you suggest as an alternative ?
f

flapili

04/20/2023, 1:41 PM
the issue is the args must be pickable
in order to retries failed tasks as example
f

Federico Zambelli

04/20/2023, 1:41 PM
uh dayum you're right
f

flapili

04/20/2023, 1:42 PM
passing the args is probably the best way to work with prefect
f

Federico Zambelli

04/20/2023, 1:43 PM
what if the arg itself ends up not being pickleable, but the result can? Such as, idk, a duckdb connection object
or an aiohttp session, for instance
f

flapili

04/20/2023, 1:44 PM
I believe tasks will not even run without validate_input kwargs in flow decorator
f

Federico Zambelli

04/20/2023, 1:45 PM
ah, fair enough
so my only option in that case is a variable set at the module level, i guess ?
f

flapili

04/20/2023, 1:47 PM
the less contextual objects tasks have the better they are 😅
f

Federico Zambelli

04/20/2023, 1:48 PM
pardon my ignorance, i'm not familiar with the term "contextual object", what do you mean with that ?
f

flapili

04/20/2023, 1:48 PM
tasks should be idempotent
f

Federico Zambelli

04/20/2023, 1:48 PM
tasks should be idempotent
this I'm aware
f

flapili

04/20/2023, 1:50 PM
by contextual object I would mean global/shared object
you could create a pure function which return an aiohttp session as example
and in this function you could implement pools of sessions or even singleton sessions
f

Federico Zambelli

04/20/2023, 1:53 PM
uhm ok I understand, altho I'm not sure in this case what would be the difference between a singleton session vs a global shared session object. it's not like im going to initialize it more than once
f

flapili

04/20/2023, 1:54 PM
you could also a global shared session object
but you can't pass it as args
f

Federico Zambelli

04/20/2023, 1:55 PM
seems fine by me, i don't see any drawbacks
f

flapili

04/20/2023, 1:58 PM
mostly DX issues
f

Federico Zambelli

04/20/2023, 1:58 PM
yeah, just that, but it's a personal project so im not too worried about it
f

flapili

04/20/2023, 1:58 PM
like typing, autcomplete, ect
from prefect import task, flow


def get_global_var():
    return 42


@task
def some_task():
    var = get_global_var()
    # do something with var
    return var


@flow
def main():
    r = some_task()
    print(r)
imagine get_global_var return an unpickable object
f

Federico Zambelli

04/20/2023, 1:59 PM
yup 👍
f

flapili

04/20/2023, 1:59 PM
and if you want reuse it you juste have to implete singleton
f

Federico Zambelli

04/20/2023, 2:00 PM
Since you're here, I have one last question if you don't mind: Is it possible to cache a task that returns nothing? E.g. imagine a task writes its results to some external storage, and I want it to skip execution if the passed args are the same. Does it make sense ?
(the purpose is idempotency ofc)
f

flapili

04/20/2023, 2:04 PM
you can, and prefect already implement it
I don't remember the name but there is a func that take input and hask it as cache key
but I'm wondering if you will not need a storage anyway
because it keep more than the result
the date as example
like you could tell to prefect "cache by args, use cache if result is less than X hours old"
f

Federico Zambelli

04/20/2023, 2:07 PM
task(cache_key_fn=task_input_hash, cache_expiration=None)
I assume you mean this?
but I'm wondering if you will not need a storage anyway
I'm mostly playing around. The idea behind my last question is as follows: Imagine im reading data from some API that returns unpredictable results, and I'm writing them to S3. I don't want duplicate results so if the
write_to_s3
function receives the same input (e.g. result_key), skip execution.
f

flapili

04/20/2023, 2:22 PM
not sure to understand
do you mean you can't predict result but it's undempotent ?
f

Federico Zambelli

04/20/2023, 2:23 PM
sorry, lemme explain better
f

flapili

04/20/2023, 2:24 PM
like of the api is a function f, f(5) will alway return the same result ?
but the result is hard/impossible to guess ?
f

Federico Zambelli

04/20/2023, 2:26 PM
like of the api is a function f, f(5) will alway return the same result ?
It should in theory, but in practice it doesn't because of unavailability of the API itself. f(5) sometimes can get 1000 results, sometimes can return only 100. And this 100 can be a subset of the 1000. Each result however has a unique key. So imagine I have two tasks:
get_results_from_api
--->
write_result_to_storage
For each result in
get_result_from_api
, write to storage ONLY if the
result_key
hasn't been seen before.
so I was thinking, if I use
cache
with the 2nd task, does it skip writing to storage if the passed
result_key
was seen before?
f

flapili

04/20/2023, 2:33 PM
cache are mostly for retries stuff
you could instead name the S3 object from input hash
because if prefect database or the storage block which cache results is not available for some reason
you could have indempotency issue
but you could for sure play "belt and suspender"
f

Federico Zambelli

04/20/2023, 2:36 PM
ok i understand. I have no idea what "belt and suspender" is tho 😅
f

flapili

04/20/2023, 2:36 PM
prefect cache + checks in the write tasks to ensure the job was not did in the past
IDK I translated literally the french expression "ceinture et bretelle"
f

Federico Zambelli

04/20/2023, 2:37 PM
ahaha, is this a programming pattern or something you just made up 😄 ?
f

flapili

04/20/2023, 2:37 PM
image.png
the true translation is belt and braces 😅
f

Federico Zambelli

04/20/2023, 2:38 PM
got it 👍 , thanks !
i had no idea this term existed
well thanks a lot for the help !
f

flapili

04/20/2023, 2:38 PM
np