Jeremy Phelps
09/28/2021, 6:28 PMtask(result=OurGCSResult(bucket='our-bucket')), where OurGCSResult is a copied-and-modified version of the GCSResult class found in Prefect. The difference is that OurGCSResult is compatible with the old version of the Google Cloud Storage library that our code uses. We would not be able to use the standard GCSResult without first rewriting a significant portion of our code.
This class works fine for tasks that are not mapped, but something goes wrong for mapped tasks.
I dug around using the GraphQL client and noticed that all task runs with a non-negative map_index seem to fail to have any storage information:
{
"id": "eba36675-c15a-43ea-ad5b-2540936477b5",
"map_index": 0,
"name": null,
"serialized_state": {
"type": "Success",
"_result": {
"type": "Result",
"location": null, // WAT?!
"__version__": "0.14.22+9.g61192a3ee"
},
"context": {
"tags": []
},
"message": "Task run succeeded.",
"__version__": "0.14.22+9.g61192a3ee",
"cached_inputs": {}
}
}
As far as I can tell from the logs, no errors are happening, and the only possible result of running the task in question is a possibly-empty array being returned, or an exception being thrown (which would be logged).Kevin Kho
Jeremy Phelps
09/28/2021, 6:44 PM{
"id": "1ba64aab-482a-4e81-93c9-f01287cff21d",
"map_index": -1,
"name": null,
"serialized_state": {
"type": "Success",
"_result": {
"type": "Result",
"location": "2021/9/24/d13c57e3-63d0-4b00-b8ad-8899b843aac5.prefect_result",
"__version__": "0.14.22+9.g61192a3ee"
},
"context": {
"tags": [
"mysql-write"
]
},
"message": "Task run succeeded.",
"__version__": "0.14.22+9.g61192a3ee",
"cached_inputs": {}
}
}Kevin Kho
LocalResult class. Made it LocalResultCopy and I did get a location:
"serialized_state": {
"type": "Success",
"_result": {
"type": "Result",
"location": "/Users/kevinkho/Work/scratch/4.txt",
"__version__": "0.14.22+9.g61192a3ee"
},
But if the task returns nothing:
@task(result=LocalResultCopy(location='/Users/kevinkho/Work/scratch/{x}.txt'))
def abc(x):
if x == 2:
return
return x
The result is not persisted and the location ends up as null
"serialized_state": {
"type": "Success",
"_result": {
"type": "Result",
"location": null,
"__version__": "0.14.22+9.g61192a3ee"
},
"context": {
"tags": []
},
While the mapped runs with an output do return something:
"serialized_state": {
"type": "Success",
"_result": {
"type": "Result",
"location": "/Users/kevinkho/Work/scratch/3.txt",
"__version__": "0.14.22+9.g61192a3ee"
},Kevin Kho
Jeremy Phelps
09/28/2021, 6:50 PMKevin Kho
@task(result=LocalResultCopy(location='/Users/kevinkho/Work/scratch/{x}.txt'))
def abc(x):
if x == 2:
return
return x
and doing `abc.map([1,2,3,4])`will create 1.txt, 2.txt, 3.txt, 4.txt becuase for the {x}.txt where x is the input variable of the taskJeremy Phelps
09/28/2021, 6:52 PMKevin Kho
Jeremy Phelps
09/28/2021, 6:54 PMJeremy Phelps
09/28/2021, 6:55 PMJeremy Phelps
09/28/2021, 6:55 PM2021/9/24/d13c57e3-63d0-4b00-b8ad-8899b843aac5.prefect_resultKevin Kho
nullJeremy Phelps
09/28/2021, 6:58 PM[].Kevin Kho
Kevin Kho
Jeremy Phelps
09/28/2021, 7:15 PMJeremy Phelps
09/30/2021, 6:25 PMKevin Kho
null location since empty lists work for persisting results. I would need a minimum example to really be able to look into it.Jeremy Phelps
09/30/2021, 7:09 PMResult class and found several ways that the SDK can ask for a Result with a null location. I'll try overriding the behavior in OurGCSResult so it's impossible to have a null location.