p

    Phoebe Bright

    2 years ago
    Apologies for asking what is somewhere in the docs and I know I saw it somewhere but can't find it again. (is there some reason the community is not using stackoverflow?) . How can I get the value that was returned from a task? I'm passing a dict that get's added to by each task, and at the end I want to update my django model with the final dict. I could create another task that runs some sql but it would be really nice to just assign the values and save. For anyone familiar with django, this is my code - feedback very welcome! ''' def run_workflow(request=None): for item in Dataset.objects.pending(): # only handling these types if not (item.batch_mode == "PASS" or item.batch_type in ("IMG", "EDT")): raise ValueError("Unknown batch type %s" % item.batch_type) with Flow("GasCloud processing batch %s" % item.batchid[:4]) as flow: batchid = Parameter("batchid") # get details of batch batch_details = get_batch_details(batchid) # download the zip file to local drive downloaded = get_from_datastore(batch_details) # unzip unzipped = unzip(downloaded) if item.batch_mode == settings.BATCH_MODE_PASS: run_details = add_run(unzipped, "PASS") run_result = complete_run(run_details) elif item.batch_type == "IMG": run_details = add_run(unzipped, "IMG") processing = process_image(run_details) run_result = complete_run(processing) elif item.batch_type == "EDT": write_initial_readings_to_db(unzipped) run_details = add_run(unzipped, "EDT") # binary to csv csvs = binary2csv(run_details) processing = apply_temperature_compensation(csvs) run_result = complete_run(processing) # mark as done processed_dataset = processed_done(run_result) #rezip rezipped = rezip(processed_dataset) # push to datastore back_in_datastore = put_to_datastore(rezipped) # done final = alldone(back_in_datastore) item.batch_details = final . <----- this doesn't work of course! item.save() flow.run(batchid=item.batchid) #flow.visualize() if request: return HttpResponse("Done") '''
    Chris White

    Chris White

    2 years ago
    Hi @Phoebe Bright - thanks for sharing! To retrieve the return value from a task after the flow has run you’ll need to extract the value from the various Prefect State objects as follows:
    flow_state = flow.run()
    flow_state.result[final].result
    I think the main reason people haven’t used stackoverflow is just because our Slack channel has become so popular, most people usually get their issues resolved really quickly — but there’s definitely nothing wrong with getting a
    prefect
    tag started on stackoverflow so it’s more discoverable by search engines!
    Jeremiah

    Jeremiah

    2 years ago
    ^ Hear, hear!
    p

    Phoebe Bright

    2 years ago
    Thanks Chris for your fast response! This is the flow variable after doing flow.run(batchid=item.batchid). Where do I find result?
    Chris White

    Chris White

    2 years ago
    anytime! The
    flow
    object itself is never altered;
    flow.run
    returns a Prefect State object will contain all of the task results. So using your pattern:
    state = flow.run(batchid=item.batchid)
    state.result[final].result # the result of the final task
    p

    Phoebe Bright

    2 years ago
    Thanks for restating that so I read your comment! That has me mostly there. Here is my final bit of code now . '''flow.set_reference_tasks([processing]) #fails - reference tasks must be part of the flow. flow_state = flow.run(batchid=item.batchid) if flow_state.is_successful(): # do stuff'''
    And I have access to the successfully completed tasks, but strangely not to the last one.
    Have result of put_to_datastore but not alldone - is that what you would expect?
    Chris White

    Chris White

    2 years ago
    Hey Phoebe - so it sounds like you’re running into one of the subtleties of the functional API; anytime you “call” a Task when building a Flow, Prefect actually creates a copy of the task and returns that copy, which is what is added to the flow. Check out these docs here to learn more: https://docs.prefect.io/core/tutorials/task-guide.html#adding-tasks-to-flows
    so in the case above, the actual
    alldone
    object is not a key of the result dictionary, but rather the
    final
    object
    p

    Phoebe Bright

    2 years ago
    OK that's got it all running now - many thanks.
    Chris White

    Chris White

    2 years ago
    anytime!