hi folks, what is the best way to pass results fro...
# ask-community
l
hi folks, what is the best way to pass results from one task to another? making a cache?
1
a
You might find these two doc pages useful: https://docs.prefect.io/core/concepts/persistence.html https://docs.prefect.io/core/concepts/results.html My perspective having used prefect for a bit is that it would be easiest just to pass what one task returns as output to the downstream task, and let the checkpointing do its work, unless it turns out that the memory consumption is too high, then maybe it'll be better to persist the result directly instead of returning it, then the downstream task retrieves the persisted result for use.
l
hey, thanks for your answer! does the result_type PrefectResult store the result in the prefect databse when run in the ui?
Copy code
@task
def oauth(username,password,client_id,client_secret):
    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(
        token_url="<https://app.billwerk.com/oauth/token>",
        username=username,
        password=password,
        client_id=client_id,
        client_secret=client_secret,
    )
    return token

@task
def creating_json(token):
    bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
    return requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()

@task(cache_for=datetime.timedelta(hours=1), result=PrefectResult)
def last_entry(json):
    # Getting last contract id for while loop
    return json[199]['Id']

with Flow("billwerk-contracts") as flow:
    username = PrefectSecret("BW_Username")
    password = PrefectSecret("BW_Password")
    client_id = PrefectSecret("BW_Client_Id")
    client_secret = PrefectSecret("BW_Client_Secret")
    token = oauth(username=username, password=password, client_id=client_id, client_secret=client_secret)
    json = creating_json(token=token)
    last_entry = last_entry(json)
  

flow.storage = Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>", image_name="billwerk-contracts",python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "datetime", "oauth2client", "snowflake"])
flow.run_config = KubernetesRun()
flow.register('Billwerk')
this is my flow, but this gives me the message
@Amanda Wee
s
are you using server or cloud @Laura Vaida?
l
cloud
i switched to the result type google cloud storage now
s
ah okay never mind I see you're using Docker storage, I was going to say that i think result persistence is handled automatically (as per this documentation ) but that's not the case for docker storage
l
is the result automatically stored where i put my flow storage?
👍 1
s
I'm by no means an expert but could it be that instead of
Copy code
@task(cache_for=datetime.timedelta(hours=1), result=PrefectResult)
you need to do
Copy code
@task(cache_for=datetime.timedelta(hours=1), result=PrefectResult())
l
lety try
s
yep, for example I'm using S3 storage for my flows and my Flow results get pickled and stored in the same S3 bucket that my flows are being stored in
l
s
Prefect has its own default format for the S3 key / filename that it uses for storage – I don't know what it looks like for other storage mechanisms but I changed it because I didn't think the default was easy to read
l
just getting this error message now
s
okay so I that looks to me like now Prefect is actually trying to post the results to your google cloud storage instance, but you're not authorized to write to wherever you're actually trying to store it
l
yes, but i declared my gcp credentials in the ui
also my api account is a storage administrator
s
sorry what is that a screenshot of?
l
google cloud
s
okay I've never used GCS before so I'm going to be worthless here, but it does seem to me like Prefect is actually doing what you want it to do now, and this is just some auth. issue with google...although that list of roles does look like you should have full administrative access to your GCS resources
maybe there's something in the returned error message cut off in your second to last screenshot (where it says
Request failed with status for 403 Expected one of...
l
Copy code
Unexpected error: Forbidden('POST <https://storage.googleapis.com/upload/storage/v1/b/billwerk/o?uploadType=multipart>: {\n  "error": {\n    "code": 403,\n    "message": "Insufficient Permission",\n    "errors": [\n      {\n        "message": "Insufficient Permission",\n        "domain": "global",\n        "reason": "insufficientPermissions"\n      }\n    ]\n  }\n}\n: (\'Request failed with status code\', 403, \'Expected one of\', <HTTPStatus.OK: 200>)')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2343, in upload_from_file
    created_json = self._do_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2165, in _do_upload
    response = self._do_multipart_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 1728, in _do_multipart_upload
    response = upload.transmit(
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/requests/upload.py", line 149, in transmit
    self._process_response(response)
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/_upload.py", line 116, in _process_response
    _helpers.require_status_code(response, (http_client.OK,), self._get_status_code)
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/_helpers.py", line 99, in require_status_code
    raise common.InvalidResponse(
google.resumable_media.common.InvalidResponse: ('Request failed with status code', 403, 'Expected one of', <HTTPStatus.OK: 200>)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 898, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 77, in write
    self.gcs_bucket.blob(new.location).upload_from_string(binary_data)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2567, in upload_from_string
    self.upload_from_file(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2359, in upload_from_file
    _raise_from_invalid_response(exc)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 3886, in _raise_from_invalid_response
    raise exceptions.from_http_status(response.status_code, message, response=response)
google.api_core.exceptions.Forbidden: 403 POST <https://storage.googleapis.com/upload/storage/v1/b/billwerk/o?uploadType=multipart>: {
  "error": {
    "code": 403,
    "message": "Insufficient Permission",
    "errors": [
      {
        "message": "Insufficient Permission",
        "domain": "global",
        "reason": "insufficientPermissions"
      }
    ]
  }
}
: ('Request failed with status code', 403, 'Expected one of', <HTTPStatus.OK: 200>)
s
okay yeah I'm sorry I don't can't be of any further help, but Prefect is definitely doing its job here – it's getting to this line of GCSResult class...maybe confirm that you can upload things to whatever the GCS bucket is that you're trying to work with in a separate python script?
l
ok thanks for helping me! 🙂
s
no problem! I assume you've already seen this but just in case you haven't https://docs.prefect.io/api/latest/engine/results.html#gcsresult ... good luck
l
yes, already seen it but thanks! 🙂