Morning. I'm looking for a way to manually trigger a Cached state from within a task, is it at all possible?
k
Kevin Kho
04/14/2021, 2:15 PM
Hi @tash lai, could you give me a bit more details about what you’re trying to do?
t
tash lai
04/14/2021, 4:23 PM
@Kevin Kho well basically it's just a way to save memory
Copy code
@task
def download_and_process_big_json(url, mongo_url):
client = MongoClient(mongo_url)
db = client.get_default_database()
data = db['big_json'].find_one({'url': url})
if data:
<set task state to Cached>
else:
data = requests.get(url).json()
db['big_json'].insert_one({'url': url, **data})
client.close()
process_big_json(data)
So i want to simulate output caching using mongo but without actually returning anything from the task. Of course i can live without changing the task state to Cached and it will work fine but i kinda like to see if the data did already exist in the database via the prefect UI, you know
Cached: 198 Success: 300
etc
tried that, it says something like "cannot find signal for state "Cached""
k
Kevin Kho
04/14/2021, 4:38 PM
Ok will look more.
Kevin Kho
04/14/2021, 5:42 PM
I think the best thing to do here is to
raise SUCCESS
with a custom message.
z
Zanie
04/14/2021, 5:54 PM
Hey @tash lai -- this doesn't look possible right now because the task runner coerces Cached states (which are Success subclasses) into Success states. I'm not quite sure what the actual mechanism is here so we'll have to look into it. I'll open an issue to track supporting this.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.