Karim Zaghw
04/11/2022, 6:27 PMAnna Geller
04/11/2022, 6:30 PMClient.create_flow_run_from_deployment()
more on that hereKarim Zaghw
04/11/2022, 6:40 PM>>> from prefect.client import get_client
>>> async with get_client() as client:
... response = await client.create_flow_run_from_deployment('842124ce-d421-4a2c-8775-95b4127ecd41')
... print(response)
...
Traceback (most recent call last):
File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "<console>", line 1, in <module>
File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/site-packages/prefect/client.py", line 88, in get_client
profile = prefect.context.get_profile_context()
File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/site-packages/prefect/context.py", line 282, in get_profile_context
raise MissingContextError("No profile context found.")
prefect.exceptions.MissingContextError: No profile context found.
Do you have an idea what might be missing? Thanks!Anna Geller
04/12/2022, 1:13 PMKarim Zaghw
04/12/2022, 1:18 PMVersion: 2.0b2
API version: 0.3.0
Python version: 3.8.13
Git commit: b2a048c2
Built: Thu, Mar 17, 2022 2:24 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: sqlite
SQLite version: 3.38.2
It says here that "Profile: default". Is this not right?Anna Geller
04/12/2022, 1:22 PMKarim Zaghw
04/12/2022, 1:24 PMAnna Geller
04/12/2022, 1:27 PMKarim Zaghw
04/12/2022, 1:52 PMAnna Geller
04/12/2022, 1:56 PMimport asyncio
from prefect.client import get_client
async def main():
async with get_client() as client:
depl_id = "074db2e5-229a-460e-85ad-fca31b379fd2"
response = await client.create_flow_run_from_deployment(depl_id)
print(response)
if __name__ == "__main__":
asyncio.run(main())
It seems to work on both local and Cloud API - LMK if this doesn't work for youKarim Zaghw
04/12/2022, 2:19 PMid=UUID('394af40d-c9be-43e8-badd-44f5cfadb843') name='garnet-vole' flow_id=UUID('d3ed22a1-c334-4c51-9ff3-ef6e2b61b150') state_id=UUID('474c63ba-4d9b-45dc-b2b2-c6b0096e27c6') deployment_id=UUID('44ef15ad-cfc4-4b47-9d4d-af1af2b9aea8') flow_version=None parameters={} idempotency_key=None context={} empirical_policy={} empirical_config={} tags=[] parent_task_run_id=None state_type=StateType.SCHEDULED run_count=0 expected_start_time=datetime.datetime(2022, 4, 12, 14, 21, 3, 630180, tzinfo=datetime.timezone.utc) next_scheduled_start_time=datetime.datetime(2022, 4, 12, 14, 21, 3, 630180, tzinfo=datetime.timezone.utc) start_time=None end_time=None total_run_time=datetime.timedelta(0) estimated_run_time=datetime.timedelta(0) estimated_start_time_delta=datetime.timedelta(microseconds=36167) auto_scheduled=False flow_runner=FlowRunnerSettings(type='subprocess', config={'env': {}, 'stream_output': True, 'condaenv': 'prefect1', 'virtualenv': None}) state=Scheduled(message=None, type=SCHEDULED, result=None, flow_run_id=394af40d-c9be-43e8-badd-44f5cfadb843)
However, there are two issues:
1. The printed response seems to arrive before the flow is actually executed so I don't know if there's an issue with the await command.
2. I can't seem to find the output or state of the flow in the above response.Anna Geller
04/12/2022, 2:27 PMKarim Zaghw
04/12/2022, 2:29 PMAnna Geller
04/12/2022, 2:40 PMKarim Zaghw
04/13/2022, 12:29 AMAnna Geller
04/13/2022, 10:35 AMbe able to get the result of the flow if it returns somethingSince in Prefect 2.0, you can run any Python code in your flow, you could easily call some function sending you info about this result before you return it. This seems like the easiest approach. And for the notification on Failure, you could use something similar to this function.
I basically want to be able to run a flow where each individual task can be run in its own environment and maybe even on its own server.I can definitely understand your use case - in Prefect 1.0, this was such a common scenario that we even introduced a special name for it called the "orchestrator pattern" to show that some central flow orchestrates other deployments/deployed flow runs that may run on different machines. Here is one example for 1.0. We don't have fully fleshed out docs on that, but the pattern you are describing is definitely possible and the Client.create_flow_run_from_deployment function is the right way of approaching it.