https://prefect.io logo
Title
b

Bob Colner

07/02/2020, 9:58 PM
hey hey, I'm a bit stumped trying to implement a data ingestion pattern in prefect. Is it possible to access task parameter values when templating the result 'target' filename? I'm trying to have my results stored in a directory structure (aka 'hive' dir partitioning). Thanks for any insight
👀 1
the target path should reflect the task parameters if that makes sense
d

Dylan

07/02/2020, 10:06 PM
Hey @Bob Colner! That definitely makes sense. I don’t believe it’s possible at this time, but let me double-check with the rest of the team before I say that definitively
b

Bob Colner

07/02/2020, 10:07 PM
cool
d

Dylan

07/02/2020, 10:08 PM
@Bob Colner Great news! You totally can!
The task kwargs are available just like the items in context
b

Bob Colner

07/02/2020, 10:09 PM
oh nice, is there an example of how?
d

Dylan

07/02/2020, 10:09 PM
Something like:
@task(target="{param}/file.txt")
def my_task(param):
    pass
Should work on Prefect version 0.12.0 and above
👍 1
There is not
I’m going to open a ticket to better document this now
@Marvin open “Better document the fact that task kwargs can be used to template result targets”
b

Bob Colner

07/02/2020, 10:11 PM
I'll give this a try.
d

Dylan

07/02/2020, 10:12 PM
👍 let us know how it goes!
b

Bob Colner

07/02/2020, 10:12 PM
FYI I think this would make a good additional idiom -especially when we can do custom serializers (parquet etc.) common use case (API calls -> dir partitioned csv/parquet
d

Dylan

07/02/2020, 10:13 PM
That’s a great suggestion
b

Bob Colner

07/02/2020, 10:15 PM
speaking of, do you have an idea of when the custom/passthrough serialization will be ready?
d

Dylan

07/02/2020, 10:40 PM
I’m not sure
Let me see if I can find the issue
@Bob Colner looks like it’s on 0.12.1, so you’re good to go!
b

Bob Colner

07/02/2020, 10:44 PM
very cool
so this example in the docs in would work as a 'passthrough'?
class MySerializer(prefect.engine.serializers.Serializer):

    def serialize(self, value: Any) -> bytes:
        # transform a Python object into bytes
        pass

    def deserialize(self, value: Union[bytes, str]) -> Any:
        # recover a Python object from bytes
        pass