hi, what is that pointing at? ```Unexpected error:...
# ask-community
l
hi, what is that pointing at?
Copy code
Unexpected error: TypeError("'NoneType' object is not subscriptable")
z
Hi @Laura Vaida -- this means you have a variable that is
None
that you are attempting to access like a list ie
x = None; x[0]
l
Copy code
gcs_result = GCSResult(bucket='billwerk')



@task
def oauth(username,password,client_id,client_secret):
    oauth = OAuth2Session(client=LegacyApplicationClient(client_id=client_id))
    token = oauth.fetch_token(
        token_url="<https://app.billwerk.com/oauth/token>",
        username=username,
        password=password,
        client_id=client_id,
        client_secret=client_secret,
    )
    return token

@task
def creating_json(token):
    bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
    return requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()

@task(cache_for=datetime.timedelta(hours=1), result=gcs_result)
def last_entry(json):
    # Getting last contract id for while loop
    last_entry = json[199]['Id']
    return last_entry
here the flow
it gives the error for last_entry
z
I would presume
json
is
None
or
json[199]
is None
👀 1
l
Copy code
with Flow("billwerk-contracts") as flow:
    username = PrefectSecret("BW_Username")
    password = PrefectSecret("BW_Password")
    client_id = PrefectSecret("BW_Client_Id")
    client_secret = PrefectSecret("BW_Client_Secret")
    token = oauth(username=username, password=password, client_id=client_id, client_secret=client_secret)
    json = creating_json(token=token)
that's the flow
mmh but it worked like that before locally 😕
z
I would try logging the
json
object in the
last_entry
task so you can inspect it
l
ok can u give an example or reference how that would work?
l
ahh great thanks! this means print results should be visible then in the logs directly?
hi @Zanie do i have to specify any context here?
z
I'm not sure what you mean
l
Unexpected error: ValueError('Could not infer an active Flow context.') Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner new_state = method(self, state, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 863, in get_task_run_state value = prefect.utilities.executors.run_task_with_timeout( File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "<input>", line 17, in creating_json File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 955, in getitem return prefect.tasks.core.operators.GetItem( File "/usr/local/lib/python3.8/site-packages/prefect/core/task.py", line 656, in bind raise ValueError("Could not infer an active Flow context.") ValueError: Could not infer an active Flow context.
z
It sounds like you're trying to run your flow by starting the docker image manually?
l
Copy code
@task(log_stdout=True)
def creating_json(token):
    bw_contracts_call = requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token'])
    print(requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json())
    return requests.get('<https://app.billwerk.com/api/v1/contracts?access_token=>' + token['access_token']).json()

@task(cache_for=datetime.timedelta(hours=1), result=gcs_result, log_stdout=True)
def last_entry(json):
    # Getting last contract id for while loop
    last_entry = json[199]['Id']
    return last_entry
thats the task
Copy code
flow.storage = Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>", image_name="billwerk-contracts",
                      python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "datetime", "oauth2client", "snowflake"])
flow.run_config = KubernetesRun()
flow.register('Billwerk')
that the flow run
and then i start it from the ui
z
Whats your flow code look like?
l
Copy code
with Flow("billwerk-contracts") as flow:
    username = PrefectSecret("BW_Username")
    password = PrefectSecret("BW_Password")
    client_id = PrefectSecret("BW_Client_Id")
    client_secret = PrefectSecret("BW_Client_Secret")
    token = oauth(username=username, password=password, client_id=client_id, client_secret=client_secret)
    json = creating_json(token=token)
    last_entry = last_entry(json)
z
Hmm. It looks like you're overriding your task definition with the output of the task at
last_entry = last_entry(json)
which could cause problems.
l
mmh okey then i should rename the task?
that helped 🙂
now still getting this error with the last_entry task
Copy code
Unexpected error: Forbidden('POST <https://storage.googleapis.com/upload/storage/v1/b/billwerk/o?uploadType=multipart>: {\n  "error": {\n    "code": 403,\n    "message": "Insufficient Permission",\n    "errors": [\n      {\n        "message": "Insufficient Permission",\n        "domain": "global",\n        "reason": "insufficientPermissions"\n      }\n    ]\n  }\n}\n: (\'Request failed with status code\', 403, \'Expected one of\', <HTTPStatus.OK: 200>)')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2343, in upload_from_file
    created_json = self._do_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2165, in _do_upload
    response = self._do_multipart_upload(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 1728, in _do_multipart_upload
    response = upload.transmit(
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/requests/upload.py", line 149, in transmit
    self._process_response(response)
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/_upload.py", line 116, in _process_response
    _helpers.require_status_code(response, (http_client.OK,), self._get_status_code)
  File "/usr/local/lib/python3.8/site-packages/google/resumable_media/_helpers.py", line 99, in require_status_code
    raise common.InvalidResponse(
google.resumable_media.common.InvalidResponse: ('Request failed with status code', 403, 'Expected one of', <HTTPStatus.OK: 200>)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 898, in get_task_run_state
    result = self.result.write(value, **formatting_kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 77, in write
    self.gcs_bucket.blob(new.location).upload_from_string(binary_data)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2567, in upload_from_string
    self.upload_from_file(
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2359, in upload_from_file
    _raise_from_invalid_response(exc)
  File "/usr/local/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 3886, in _raise_from_invalid_response
    raise exceptions.from_http_status(response.status_code, message, response=response)
google.api_core.exceptions.Forbidden: 403 POST <https://storage.googleapis.com/upload/storage/v1/b/billwerk/o?uploadType=multipart>: {
  "error": {
    "code": 403,
    "message": "Insufficient Permission",
    "errors": [
      {
        "message": "Insufficient Permission",
        "domain": "global",
        "reason": "insufficientPermissions"
      }
    ]
  }
}
: ('Request failed with status code', 403, 'Expected one of', <HTTPStatus.OK: 200>)
but i have given all storage admin righty to my gc service account
z
I don't see where you've set a Flow/task
result
type in the code you gave me but it looks like this is where the error is occuring
l
sorry! here it is
Copy code
from prefect.engine.results import GCSResult


gcs_result = GCSResult(bucket='billwerk')
Copy code
@task(cache_for=datetime.timedelta(hours=1), result=gcs_result, log_stdout=True)
def defining_last_entry(json):
    # Getting last contract id for while loop
    last_entry = json[199]['Id']
    return last_entry
in this task
z
Ah I see.
It looks like your credentials aren't setup per https://docs.prefect.io/api/latest/engine/results.html#gcsresult
l
you mean that? i set it up in the secrets as the json file
You can also optionally provide your service account key to 
prefect.context.secrets.GCP_CREDENTIALS
 for automatic authentication - see Third Party Authentication for more information.
z
l
i had a look several times at that but how could i speify that in the context? im lacking of an example
z
One second let me make sure that applies to results as well as storage
🙏 1
Hmm it doesn't take that arg-- can you try adding
flow.add_task(PrefectSecret("GCP_CREDENTIALS"))
to your flow?
l
yes but outside the flow? or inside
z
inside
l
Copy code
with Flow("billwerk-contracts") as flow:
    username = PrefectSecret("BW_Username")
    password = PrefectSecret("BW_Password")
    client_id = PrefectSecret("BW_Client_Id")
    client_secret = PrefectSecret("BW_Client_Secret")
    token = oauth(username=username, password=password, client_id=client_id, client_secret=client_secret)
    json = creating_json(token=token)
    flow.add_task(PrefectSecret("GCP_CREDENTIALS"))
    last_entry = defining_last_entry(json)
like that?
z
Yep!
l
well the gcp task worked
but still the same error
sorry for the small screenshot
sould i define that as upstream?
mmh still that error
ans thats gcp...i dont get it
Copy code
/opt/prefect/healthcheck.py:153: UserWarning: Task <Task: defining_last_entry> has cache settings but some upstream dependencies do not have result types. See <https://docs.prefect.io/core/concepts/results.html> for more details.
  result_check(flows)
just getting this error in the flow registration part
z
Can you confirm that you updated the secret in the Prefect UI with the new key after you added the permissions?
l
yes i did
do you know what else i could test?
z
I would recommend running
flow.run()
locally and printing your credentials then testing a file upload using them to make sure the credentials that the flow uses are indeed permissioned correctly.
Ah I've heard back from someone else on the team and I think it'll work if you pass the secret name to Docker storage, like
Docker(... secrets=["GCP_CREDENTIALS"])
l
thanks that helped! 😄 😄