https://prefect.io logo
Title
m

mithalee mohapatra

07/21/2020, 7:15 PM
Hi- I am trying to work on the same version of the flow I have uploaded to my S3 bucket. The issue I found is storage.flows() is empty and does not find my existing flow from S3 bucket. If I explicitly pass the flow name then I can access my flow through get_flow method.Else I get an error as "Flow not contained in the storage". Please let me know if I am missing anything. #define globally the flow name and flow location as storage.flows() is not able to find my flow from S3. dictFlows={'flows': {'ETL': 'etl/testflow'}, 'flow_location': 'etl/testflow'} def test_add_flow_to_S3(): storage = S3(bucket="test",key="etl/testflow") f = Flow("ETL") f.name not in storage with Flow("ETL") as f: e = extract() t = transform(e) l = load(t) flow_location=storage.add_flow(f) f.name in storage storage.build() def test_get_flow_S3(dictFlows): print("i am in get flow") storage = S3(bucket="test", key="etl/testflow") storage.flows=dictFlows['flows'] newflow=storage.get_flow('etl/testflow') print("S3 FLOW OUTPUT") newflow.run()
:upvote: 1
z

Zachary Hughes

07/21/2020, 7:25 PM
Hi @mithalee mohapatra! Just to clarify, the
test_get_flow_S3
function defined in your code snippet is the access pattern that does work for you, correct? Do you mind posting the code that doesn't work?
m

mithalee mohapatra

07/21/2020, 8:20 PM
This function does work for me now only after I defined my flow in this step:storage.flows=dictFlows['flows']. If I skip this part, my get_flow() failed with the error"FLow not in the Storage" def test_get_flow_S3(dictFlows): print("i am in get flow") storage = S3(bucket="test", key="etl/testflow") storage.flows=dictFlows['flows'] newflow=storage.get_flow('etl/testflow') print("S3 FLOW OUTPUT") newflow.run()
z

Zachary Hughes

07/21/2020, 8:35 PM
Great, thank you. Do you mind seeing what the value of
storage.flows.values()
is?
m

mithalee mohapatra

07/21/2020, 8:51 PM
def test_get_flow_S3(dictFlows):    #storage.flows=flows    print("i am in get flow")    storage = S3(bucket="lightbox-sandbox-dev", key="etl/testflow")    storage.flows=dictFlows['flows']    flow_value=storage.flows.values()    newflow=storage.get_flow('etl/testflow')    print("S3 FLOW OUTPUT")    newflow.run()
z

Zachary Hughes

07/21/2020, 9:10 PM
Ah, sorry-- should have been more specific. Can you run
storage.flows.values()
before modifying
storage.flows
?
m

mithalee mohapatra

07/21/2020, 9:22 PM
Commented out storage.flows() def test_get_flow_S3(dictFlows):     #storage.flows=flows     print("i am in get flow")     storage = S3(bucket="lightbox-sandbox- dev", key="etl/testflow")     #storage.flows=dictFlows['flows']     flow_value=storage.flows.values()      newflow=storage.get_flow('etl/testflow')     print("S3 FLOW OUTPUT")     newflow.run()
z

Zachary Hughes

07/21/2020, 9:34 PM
Okay, I think I know what's going on here. As it stands right now, you will need to manually set
storage.flows
like you are now. That said, I'm not sure there's a good reason for us to enforce that. If you don't mind, could you open an issue on Github explaining your use case?
m

mithalee mohapatra

07/21/2020, 9:49 PM
Thank you Zach for your quick response.I will open an issue in Github.Thank you so much again.