Chris Gunderson
09/26/2022, 4:27 PMaws_credentials_block = AwsCredentials.load("****-user")
s3_block = S3.load("***-s3")
s3_bucket_block = S3Bucket(
bucket_name=s3_block.bucket_path,
aws_credentials=aws_credentials_block,
basepath=f"Allocations/{date.today().year}"
)
output_file = f'''SR Allocations {datetime.now().strftime('%Y%m%d-%H%M%S')}.csv'''
bytes_to_write = df_alloc.to_csv(None).encode()
csv_file = s3_bucket_block.write_path(path=output_file, content=bytes_to_write)
<http://logging.info|logging.info>('Filtering for **** Trades: Rows = %s, Accounts = %s' % (len(df_alloc), df_alloc.custodianAccnt.nunique()))
return csv_file
@flow(name = "***** Allocations")
def ****AllocationsFlow():
try:
slack_webhook_block = SlackWebhook.load("****-webhook")
state = allocations_process(return_state = True)
excel_file = state.result()
if 'Completed'== state.name:
slack_webhook_block.notify("**** Allocations was successful")
else:
slack_webhook_block.notify("***** Allocations failed")
Error:
cannot pickle 'coroutine' object
11:23:31.736 | INFO | Task run 'Fidelity Allocations-e728df66-0' - Crash detected! Execution was interrupted by an unexpected exception.
11:23:32.443 | ERROR | Flow run 'visionary-cat' - Finished in state Failed('1/1 states failed.')
Traceback (most recent call last):
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 82, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run
return state.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
state.result()
File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
TypeError: cannot pickle 'coroutine' objectstate = asyncio.run(allocations_process(return_state = True))
excel_file = state.result()
I tried this with no joy.Nate
09/26/2022, 6:19 PMallocations_process
look like?Chris Gunderson
09/26/2022, 6:20 PM@task(name = '***Allocations',
retries = 2,
retry_delay_seconds = 60)
def allocations_process():
srseConn = srseController()
df_exec = srseConn.query(db='spiderrock',
table='msgspdrparentexecution' ,
where =
[f"""
accnt like 'A.F%' or accnt like 'A.W%'
"""]
)
Nate
09/26/2022, 6:22 PMallocations_process
(if any)?Chris Gunderson
09/26/2022, 6:26 PMstate = allocations_process(return_state = True)
excel_file = state.result()
@task(name = 'Fidelity Allocations',
retries = 2,
retry_delay_seconds = 60)
def allocations_process() -> bytes:
#code to populate the dataframe
aws_credentials_block = AwsCredentials.load("***-user")
s3_block = S3.load("***-s3")
s3_bucket_block = S3Bucket(
bucket_name=s3_block.bucket_path,
aws_credentials=aws_credentials_block,
basepath=f"Allocations/{date.today().year}"
)
output_file = f'''SR Allocations {datetime.now().strftime('%Y%m%d-%H%M%S')}.csv'''
bytes_to_write = df_alloc.to_csv(None).encode()
csv_file = s3_bucket_block.write_path(path=output_file, content=bytes_to_write)
<http://logging.info|logging.info>('Filtering for Fido Trades: Rows = %s, Accounts = %s' % (len(df_alloc), df_alloc.custodianAccnt.nunique()))
return s3_bucket_block.read_path(csv_file)
Nate
09/26/2022, 6:56 PMChris Gunderson
09/26/2022, 7:07 PMNate
09/26/2022, 7:09 PM__init__
function?Chris Gunderson
09/26/2022, 7:10 PMNate
09/26/2022, 8:38 PMS3Bucket
above (thanks for raising that by the way), I've gone ahead and opened an issue (my PR to resolve it is being reviewed now) which will allow you to call S3Bucket
methods in a sync context without running into that TypeError: cannot pickle 'coroutine' object
error - you can track the progress hereChris Gunderson
09/26/2022, 9:57 PM