Hi Team - For Prefect 2.0, we would like to use ou...
# prefect-community
c
Hi Team - For Prefect 2.0, we would like to use our S3 bucket to store some allocations files. These will be emailed/sftp to clients. I'm not sure what I'm doing wrong with the state and await from the write_path function.
Copy code
aws_credentials_block = AwsCredentials.load("****-user")
s3_block = S3.load("***-s3")

s3_bucket_block = S3Bucket(
    bucket_name=s3_block.bucket_path,
    aws_credentials=aws_credentials_block,
    basepath=f"Allocations/{date.today().year}"
)
output_file = f'''SR Allocations {datetime.now().strftime('%Y%m%d-%H%M%S')}.csv'''
bytes_to_write = df_alloc.to_csv(None).encode()
csv_file = s3_bucket_block.write_path(path=output_file, content=bytes_to_write)

<http://logging.info|logging.info>('Filtering for **** Trades: Rows = %s, Accounts = %s' % (len(df_alloc), df_alloc.custodianAccnt.nunique()))

return csv_file

@flow(name = "***** Allocations")
def ****AllocationsFlow():
    try:
        slack_webhook_block = SlackWebhook.load("****-webhook")
        state = allocations_process(return_state = True)
        excel_file = state.result()
        if 'Completed'== state.name:
            slack_webhook_block.notify("**** Allocations was successful")
        else:
            slack_webhook_block.notify("***** Allocations failed")
Error: cannot pickle 'coroutine' object 112331.736 | INFO | Task run 'Fidelity Allocations-e728df66-0' - Crash detected! Execution was interrupted by an unexpected exception. 112332.443 | ERROR | Flow run 'visionary-cat' - Finished in state Failed('1/1 states failed.') Traceback (most recent call last): File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run return asynclib.run(func, *args, **backend_options) File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run return native_run(wrapper(), debug=debug) File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper return await func(*args) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 82, in with_injected_client return await fn(*args, **kwargs) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run return state.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result state.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result raise data TypeError: cannot pickle 'coroutine' object
Copy code
state = asyncio.run(allocations_process(return_state = True))
        excel_file = state.result()
I tried this with no joy.
n
what does your
allocations_process
look like?
c
I am querying the database and creating a data frame. The data frame is converted to a csv file and the content is the data of the csv file.
The data frame is hydrated and csv file data is created successfully.
Copy code
@task(name = '***Allocations',
      retries = 2,
      retry_delay_seconds = 60)
def allocations_process():
    srseConn = srseController()
    df_exec = srseConn.query(db='spiderrock',
                             table='msgspdrparentexecution' ,
                             where =
                             [f"""
                             accnt like 'A.F%' or accnt like 'A.W%'
                             """]
                             )
n
what is the return type of
allocations_process
(if any)?
c
🤦‍♂️ I didn't have one. It looks like bytes is appropriate from return s3_bucket_block.read_path(csv_file)
Trying that now.
Still getting this error related to the state
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run return state.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result state.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result raise data TypeError: cannot pickle 'coroutine' object
Copy code
state = allocations_process(return_state = True)
excel_file = state.result()
Copy code
@task(name = 'Fidelity Allocations',
      retries = 2,
      retry_delay_seconds = 60)
def allocations_process() -> bytes:

    #code to populate the dataframe
    aws_credentials_block = AwsCredentials.load("***-user")
    s3_block = S3.load("***-s3")

    s3_bucket_block = S3Bucket(
        bucket_name=s3_block.bucket_path,
        aws_credentials=aws_credentials_block,
        basepath=f"Allocations/{date.today().year}"
    )
    output_file = f'''SR Allocations {datetime.now().strftime('%Y%m%d-%H%M%S')}.csv'''
    bytes_to_write = df_alloc.to_csv(None).encode()
    csv_file = s3_bucket_block.write_path(path=output_file, content=bytes_to_write)

    <http://logging.info|logging.info>('Filtering for Fido Trades: Rows = %s, Accounts = %s' % (len(df_alloc), df_alloc.custodianAccnt.nunique()))

    return s3_bucket_block.read_path(csv_file)
When I use async and await there is an issue getting the database secret with the load function.
n
I was able to reproduce that error, but I was able to get it to work by making my task async (and then awaiting my block load and block method calls) - will look into why this error is occurring in a sync context
c
Like this:self.__secret_name = await String.load("***-database")
n
is that line of code in a class's
__init__
function?
c
Yes, so the ___init_ cannot be async
I'm creating a new task that is async, so it is separate from the other function
👍 1
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied 144550.617 | ERROR | Task run 'Create Allocations File-8f8b0ae0-0' - Finished in state Failed('Task run encountered an exception.') An error occurred (AccessDenied) when calling the PutObject operation: Access Denied ^^I'll need to ask a team member for permissions on a user to write to S3.
@Nate I was able to write the csv file to S3 after modifying the permissions. I just need to send an email attachment from S3 now. Has the Prefect UI project been deployed or when is the release? There was a defect for the EmailServerCredentials that needed to be deployed. https://github.com/PrefectHQ/prefect-email/issues/28
n
I believe that issue has been resolved and merged but is not yet released - not exactly sure when that will be included as for the issue you raised with
S3Bucket
above (thanks for raising that by the way), I've gone ahead and opened an issue (my PR to resolve it is being reviewed now) which will allow you to call
S3Bucket
methods in a sync context without running into that
TypeError: cannot pickle 'coroutine' object
error - you can track the progress here
c
Thanks Nate!