https://prefect.io logo
#prefect-community
Title
# prefect-community
k

Karim Zaghw

04/11/2022, 6:27 PM
Hi everyone! I just downloaded Prefect 2.0 and was wondering if it's possible to create a dependency between two different flows. The reason I'm asking is because I want each flow to run in a different conda environment so it needs to be on its own deployment, but I also want it to be possible for a flow to run depending on the state of another flow in a different environment. Any help would be really appreciated. Thanks!
a

Anna Geller

04/11/2022, 6:30 PM
We don't have the orchestrator pattern (calling a flow from another flow via an API call) fully exposed yet but you can already call a deployed flow from another flow using:
Copy code
Client.create_flow_run_from_deployment()
more on that here
k

Karim Zaghw

04/11/2022, 6:40 PM
@Anna Geller Thanks, I'll try it out.
👍 1
@Anna Geller I tried using the create_flow_run_from_deployment() function like you suggested but got the following error:
Copy code
>>> from prefect.client import get_client
>>> async with get_client() as client:
...     response = await client.create_flow_run_from_deployment('842124ce-d421-4a2c-8775-95b4127ecd41')
...     print(response)
...
Traceback (most recent call last):
  File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "<console>", line 1, in <module>
  File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/site-packages/prefect/client.py", line 88, in get_client
    profile = prefect.context.get_profile_context()
  File "/home/raisa_gpu/anaconda3/envs/prefect2/lib/python3.8/site-packages/prefect/context.py", line 282, in get_profile_context
    raise MissingContextError("No profile context found.")
prefect.exceptions.MissingContextError: No profile context found.
Do you have an idea what might be missing? Thanks!
a

Anna Geller

04/12/2022, 1:13 PM
Can you share the output of the "prefect version" command? what seems missing is that you have no default profile. You can try to upgrade your Orion version, reset the DB, recreate the deployment and try again.
for more on profiles and configs, check https://orion-docs.prefect.io/concepts/settings/
k

Karim Zaghw

04/12/2022, 1:18 PM
@Anna Geller This is the output when I ran "prefect version" command:
Copy code
Version:             2.0b2
API version:         0.3.0
Python version:      3.8.13
Git commit:          b2a048c2
Built:               Thu, Mar 17, 2022 2:24 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.38.2
It says here that "Profile: default". Is this not right?
a

Anna Geller

04/12/2022, 1:22 PM
Thanks. If this doesn't work for you with the ephemeral server, you may also try using Cloud 2.0: beta.prefect.io (free to use, no credit card required) I'll try to reproduce your issue and get back to you
k

Karim Zaghw

04/12/2022, 1:24 PM
@Anna Geller Thanks! Do I use the same function "Client.create_flow_run_from_deployment()" to create a flow run using the cloud or is there another function I should use?
a

Anna Geller

04/12/2022, 1:27 PM
everything is the same; you just connect to the remote API - sorry, I should have shared the docs for Cloud directly: https://orion-docs.prefect.io/ui/cloud/
k

Karim Zaghw

04/12/2022, 1:52 PM
@Anna Geller Thanks alot!
👍 1
a

Anna Geller

04/12/2022, 1:56 PM
btw, since you tagged me already 5 times here, I have to say it 😄 we have a policy that we don't tag anyone unless it's some sort of emergency. We check all the threads anyway, so no need for tagging
I was able to create a flow run from deployment using this code:
Copy code
import asyncio
from prefect.client import get_client


async def main():
    async with get_client() as client:
        depl_id = "074db2e5-229a-460e-85ad-fca31b379fd2"
        response = await client.create_flow_run_from_deployment(depl_id)
        print(response)


if __name__ == "__main__":
    asyncio.run(main())
It seems to work on both local and Cloud API - LMK if this doesn't work for you
k

Karim Zaghw

04/12/2022, 2:19 PM
Sorry for the tagging problem I didn't know this was an issue😅
👍 1
I ran the code snippet you gave me and got the following response:
Copy code
id=UUID('394af40d-c9be-43e8-badd-44f5cfadb843') name='garnet-vole' flow_id=UUID('d3ed22a1-c334-4c51-9ff3-ef6e2b61b150') state_id=UUID('474c63ba-4d9b-45dc-b2b2-c6b0096e27c6') deployment_id=UUID('44ef15ad-cfc4-4b47-9d4d-af1af2b9aea8') flow_version=None parameters={} idempotency_key=None context={} empirical_policy={} empirical_config={} tags=[] parent_task_run_id=None state_type=StateType.SCHEDULED run_count=0 expected_start_time=datetime.datetime(2022, 4, 12, 14, 21, 3, 630180, tzinfo=datetime.timezone.utc) next_scheduled_start_time=datetime.datetime(2022, 4, 12, 14, 21, 3, 630180, tzinfo=datetime.timezone.utc) start_time=None end_time=None total_run_time=datetime.timedelta(0) estimated_run_time=datetime.timedelta(0) estimated_start_time_delta=datetime.timedelta(microseconds=36167) auto_scheduled=False flow_runner=FlowRunnerSettings(type='subprocess', config={'env': {}, 'stream_output': True, 'condaenv': 'prefect1', 'virtualenv': None}) state=Scheduled(message=None, type=SCHEDULED, result=None, flow_run_id=394af40d-c9be-43e8-badd-44f5cfadb843)
However, there are two issues: 1. The printed response seems to arrive before the flow is actually executed so I don't know if there's an issue with the await command. 2. I can't seem to find the output or state of the flow in the above response.
a

Anna Geller

04/12/2022, 2:27 PM
1. this output shows that your flow run got scheduled for execution - the Scheduled state confirms that 2. the state is Scheduled
it moves to a Running state only if you have a work queue and an agent - check these docs to see how to set this up https://orion-docs.prefect.io/concepts/work-queues/
k

Karim Zaghw

04/12/2022, 2:29 PM
Yes I already have a work queue and an agent running so I was wondering if there was a way for me to wait for the flow to finish running and get the response after it finishes along with the results?
a

Anna Geller

04/12/2022, 2:40 PM
what's your end goal here? do you want to get notified when the flow run finishes?
k

Karim Zaghw

04/13/2022, 12:29 AM
Yes and also be able to tell if the flow finished successfully or if there was an error and to be able to get the result of the flow if it returns something. The reason I'm doing it this way is because I basically want to be able to run a flow where each individual task can be run in its own environment and maybe even on its own server. So to do this, I was thinking of placing each small task into its own flow and deployment, then calling it in one main flow using the Client.create_flow_run_from_deployment function.
a

Anna Geller

04/13/2022, 10:35 AM
be able to get the result of the flow if it returns something
Since in Prefect 2.0, you can run any Python code in your flow, you could easily call some function sending you info about this result before you return it. This seems like the easiest approach. And for the notification on Failure, you could use something similar to this function.
I basically want to be able to run a flow where each individual task can be run in its own environment and maybe even on its own server.
I can definitely understand your use case - in Prefect 1.0, this was such a common scenario that we even introduced a special name for it called the "orchestrator pattern" to show that some central flow orchestrates other deployments/deployed flow runs that may run on different machines. Here is one example for 1.0. We don't have fully fleshed out docs on that, but the pattern you are describing is definitely possible and the Client.create_flow_run_from_deployment function is the right way of approaching it.
26 Views