I appear to be up and running, but noticed the flo...
# marvin-ai
b
I appear to be up and running, but noticed the flow run does not appear on the Prefect web console. I am authenticated and was under the impression that even if I execute a flow locally, the logs/results of the flow would go to prefect. Is that not the case with Controlflow? I apologize if this is a basic question, but still trying to wrap my head around Prefect + Controlflow.
j
That should be correct and CF will inherit whatever profile you use for Prefect locally
You may need to check “prefect profile ls” to see what profile is being used
upvote 2
b
Thanks for such a quick reply.
prefect profile ls
shows
Copy code
┏━━━━━━━━━━━━━━━━━━━━━┓
┃ Available Profiles: ┃
┡━━━━━━━━━━━━━━━━━━━━━┩
│             * local │
└─────────────────────┘
and in the same shell I execute
python myflow.py
and I see the prefect flow firing with data being logged to the web console. I am still learning, but not sure what the difference is.
b
Hey Brock! Is the flow run not making it to the UI at all? Or is it that just the logs are missing? Ah you already mentioned the flow run being absent, apologies.
If you run
prefect profile inspect
, what is your
PREFECT_API_URL
is set to?
b
Thanks for reaching out.
Copy code
No name provided, defaulting to 'local'
PREFECT_API_URL='<https://api.prefect.cloud/api/accounts/{acccount_id}/workspaces/{workspace}>
71868aff'
PREFECT_API_KEY='key_here'
I am certain there is a very fundamental concept I am still not grasping, but it runs locally, I just don't see the output in the cloud, though I do if it's a local prefect flow.
b
Hmm..that all looks right to me. Especially if you're able to run a regular flow and have it show up in the UI. Can you share the CF code that you're running?
b
Sure thing:
Copy code
import vertexai
from vertexai.generative_models import GenerativeModel
from google.oauth2.service_account import Credentials

from prefect_gcp import GcpCredentials
gcp_credentials_block = GcpCredentials.load("GCP_PROJECT_ID")

service_account_json_str = gcp_credentials_block.service_account_info.get_secret_value()
credentials = Credentials.from_service_account_info(service_account_json_str)

from langchain_google_vertexai import ChatVertexAI
model = ChatVertexAI(model="gemini-1.5-flash", credentials=credentials)

import controlflow as cf 


cf.defaults.model = model



emails = [
    "Hello, I need an update on the project status.",
    "Subject: Exclusive offer just for you!",
    "Urgent: Project deadline moved up by one week.",
]


reply = cf.run(
    "Write a polite reply to an email",
    context=dict(email=emails[0])
)

print(reply)
It see the output in the shell, its just not flowing through. In part, I suppose thats ok, but I am trying to learn as I go. 🙂 ``````
n
hi @Brock! does this show the same api url you're seeing above? also there's
prefect config view
which is the best way I know to ask "what prefect backend am I hooked up to right now?"
Copy code
python -c "from prefect.settings import Settings; print(Settings().api.url)"
b
Hey Nate.
prefect config view
->
Copy code
🚀 you are connected to:
<https://app.prefect.cloud/account/{account_id}>
PREFECT_PROFILE='local'
PREFECT_API_KEY='********' (from profile)
PREFECT_API_URL='<https://api.prefect.cloud/api/accounts/{account_id}/workspaces/{workspace_id}>' (from profile)
The python command above also yields the same PREFECT_API_URL
n
interesting! and you're still not seeing any activity in your dashboard in that workspace?
b
Yeah, the code sample above runs. I see the output in the shell, but no dice in the Cloud console.
Copy code
$ python gcp-flow2.py 
23:42:32.521 | WARNING | controlflow.llm.models - The default LLM model could not be created. ControlFlow will continue to work, but you must manually provide an LLM model for each agent. For more information, please see <https://controlflow.ai/guides/configure-llms>. The error was:
The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable
23:42:34.357 | WARNING | langchain_google_vertexai.functions_utils - Key 'additionalProperties' is not supported in schema, ignoring
23:42:34.358 | WARNING | langchain_google_vertexai.functions_utils - Key 'additionalProperties' is not supported in schema, ignoring
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  ✅ Tool call: "mark_task_036cc2c9_successful"                                                   │
│                                                                                                  │
│     Tool args: {'task_result': 'Hello, thank you for reaching out. I will provide an update on   │
│     the project status as soon as possible.'}                                                    │
│                                                                                                  │
│     Tool result: Task #036cc2c9 ("Write a polite reply to an email") marked successful.          │
│                                                                                                  │
╰──────────────────────────────────────────────────────────────────────────────────── 11:42:35 PM ─╯
Hello, thank you for reaching out. I will provide an update on the project status as soon as possible.
23:42:35.367 | WARNING | EventsWorker - Still processing items: 2 items remaining...
n
can you try the following?
Copy code
PREFECT_LOGGING_LEVEL=DEBUG python gcp-flow2.py
j
Wait I know why I think — cf.run on its own won’t create a flow run, it’s an individual task run
In the UI if you toggle to task runs you’ll see it (this is suboptimal)
If you put it inside a prefect or controlflow flow I think it might be as expected
b
This is what I saw for output with the logging level added
Copy code
$ PREFECT_LOGGING_LEVEL=DEBUG python gcp-flow2.py
00:22:29.395 | DEBUG   | prefect.profiles - Using profile 'local'
00:22:30.923 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/996e7e98-1ad6-4529-a9cc-13a5d71928e7/workspaces/f6e0b7c4-93ed-4ac9-b92f-7e3d71868aff/>
00:22:31.401 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/996e7e98-1ad6-4529-a9cc-13a5d71928e7/workspaces/f6e0b7c4-93ed-4ac9-b92f-7e3d71868aff/>
00:22:32.828 | WARNING | controlflow.llm.models - The default LLM model could not be created. ControlFlow will continue to work, but you must manually provide an LLM model for each agent. For more information, please see <https://controlflow.ai/guides/configure-llms>. The error was:
The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable
00:22:33.046 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/996e7e98-1ad6-4529-a9cc-13a5d71928e7/workspaces/f6e0b7c4-93ed-4ac9-b92f-7e3d71868aff/>
00:22:33.071 | DEBUG   | Task run 'run_tasks' - Created task run 'run_tasks' for task 'run_tasks'
00:22:33.075 | DEBUG   | Task run 'Run task: Task #73e6ba09 ("Write a polite reply to an email")' - Renamed task run 'run_tasks' to 'Run task: Task #73e6ba09 ("Write a polite reply to an email")'
00:22:33.078 | DEBUG   | Task run 'Run task: Task #73e6ba09 ("Write a polite reply to an email")' - Executing task 'run_tasks' for task run 'Run task: Task #73e6ba09 ("Write a polite reply to an email")'...
00:22:33.137 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/996e7e98-1ad6-4529-a9cc-13a5d71928e7/workspaces/f6e0b7c4-93ed-4ac9-b92f-7e3d71868aff/>
00:22:33.192 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/996e7e98-1ad6-4529-a9cc-13a5d71928e7/workspaces/f6e0b7c4-93ed-4ac9-b92f-7e3d71868aff/>
00:22:33.199 | DEBUG   | Task run 'run' - Created task run 'run' for task 'run'
00:22:33.201 | DEBUG   | Task run 'Orchestrator.run()' - Renamed task run 'run' to 'Orchestrator.run()'
00:22:33.204 | DEBUG   | Task run 'Orchestrator.run()' - Executing task 'run' for task run 'Orchestrator.run()'...
00:22:33.249 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/996e7e98-1ad6-4529-a9cc-13a5d71928e7/workspaces/f6e0b7c4-93ed-4ac9-b92f-7e3d71868aff/>
00:22:33.255 | DEBUG   | Task run 'run_agent_turn' - Created task run 'run_agent_turn' for task 'run_agent_turn'
00:22:33.257 | DEBUG   | Task run 'Agent turn: Marvin' - Renamed task run 'run_agent_turn' to 'Agent turn: Marvin'
00:22:33.261 | DEBUG   | Task run 'Agent turn: Marvin' - Executing task 'run_agent_turn' for task run 'Agent turn: Marvin'...
00:22:33.611 | DEBUG   | prefect.client - Connecting to API at <https://api.prefect.cloud/api/accounts/996e7e98-1ad6-4529-a9cc-13a5d71928e7/workspaces/f6e0b7c4-93ed-4ac9-b92f-7e3d71868aff/>
00:22:33.616 | DEBUG   | Task run '_run_model' - Created task run '_run_model' for task '_run_model'
00:22:33.618 | DEBUG   | Task run 'Call LLM' - Renamed task run '_run_model' to 'Call LLM'
00:22:33.623 | DEBUG   | Task run 'Call LLM' - Executing task '_run_model' for task run 'Call LLM'...
00:22:33.626 | WARNING | langchain_google_vertexai.functions_utils - Key 'additionalProperties' is not supported in schema, ignoring
00:22:33.628 | WARNING | langchain_google_vertexai.functions_utils - Key 'additionalProperties' is not supported in schema, ignoring
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  ⠼ Tool call: "mark_task_73e6ba09_successful"                                                    │
│                                                                                                  │
│    Tool args: {'task_result': 'Dear [Name], \\n\\nThank you for your email. I am working on      │
│    getting you an update as soon as possible. Please let me know if you have any other           │
│    questions. \\n\\nSincerely, \\nMarvin'}                                                       │
│                                                                                                  │
╰──────────────────────────────────────────────────────────────────────────────────── 12:22:34 AM ─╯00:22:34.634 | DEBUG   | prefect.clien
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  ✅ Tool call: "mark_task_73e6ba09_successful"                                                   │
╭─ Agent: Marvin ──────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                  │
│  ✅ Tool call: "mark_task_73e6ba09_successful"                                                   │
│                                                                                                  │
│     Tool args: {'task_result': 'Dear [Name], \\n\\nThank you for your email. I am working on     │
│     getting you an update as soon as possible. Please let me know if you have any other          │
│     questions. \\n\\nSincerely, \\nMarvin'}                                                      │
│                                                                                                  │
│     Tool result: Task #73e6ba09 ("Write a polite reply to an email") marked successful.          │
│                                                                                                  │
╰──────────────────────────────────────────────────────────────────────────────────── 12:22:34 AM ─╯
00:22:34.738 | INFO    | Task run 'Orchestrator.run()' - Finished in state Completed()
00:22:34.745 | INFO    | Task run 'Run task: Task #73e6ba09 ("Write a polite reply to an email")' - Finished in state Completed()
Dear [Name], \n\nThank you for your email. I am working on getting you an update as soon as possible. Please let me know if you have any other questions. \n\nSincerely, \nMarvin
n
yeah I think jeremiah had the right idea, youre sending metadata to the right workspace, its just that they're task runs
b
I don't see the data anywhere, including
I don't know if its related, but I started playing with control flow in earnest last night. Starting yesterday, my scheduled flows, which have been running without issues for weeks, began crashing. I am eastern time, and the first job fires at 7:15. It crashed about 15 minutes ago, second day in a row.
n
bear with me for a potentially silly/annoying question, but did you check both the
Flow Runs
and
Task Runs
tabs on that
Runs
page?
b
Not a silly question, I am learning. And holy moly I have never noticed that tab! Yes, they are there.
n
catjam
b
Wow, what an oversight. Thank you all for your patience
n
no worries feel free to share logs from those crashes if you have them!
b
I am on the free tier with a managed work pool, I don't believe I have access to those
j
Nice! And nah I think this just isn’t a good UX it’s unclear
upvote 1
n
yeah 💯 we should surface infra level issues that prevent the flow run from starting up, regardless of the worker type (when possible, which is possible in many more cases than we show at this time)
b
I just manually fired the scheduled flow as a quick run, and it ran as expected e2e. This is what I see on the interface in case it helps.
n
wherever you have
.deploy()
are you able to change that to simply
.serve()
(like this)? if it runs like that fine then we can rule out a class of problems RE
infra level issues that prevent the flow run from starting up
that "exited with a status code 1" is sort of an information black hole when you're not running the worker yourself (and able to look at logs), which is something we're actively working on most commonly this happens because: • missing 3rd party deps remotely that arent present on the prefect base image • failure to retrieve source code
b
I deploy with scripts, so in my case,
Copy code
from prefect import flow

if __name__ == "__main__":
    flow.from_source(
        source="<https://github.com/>",
        entrypoint="prefect/flows/etl.py:etl_flow",
    ).deploy(
        name="aws-blogs-etl",
        work_pool_name="brock-pool1",
        job_variables={"env": {"BROCK": "loves-to-code"},
                       "pip_packages": ["pandas", "requests"]},
        cron="15 0 * * *",
        tags=["prod"],
        description="The pipeline to populate the stage schema with the newest posts.  Version is just for illustration",
        version="1.0.0",
    )
would become
Copy code
from prefect import flow

if __name__ == "__main__":
    flow.from_source(
        source="<https://github.com/git>",
        entrypoint="prefect/flows/etl.py:etl_flow",
    ).serve(
        name="aws-blogs-etl",
        work_pool_name="brock-pool1",
        job_variables={"env": {"BROCK": "loves-to-code"},
                       "pip_packages": ["pandas", "requests"]},
        cron="15 0 * * *",
        tags=["prod"],
        description="The pipeline to populate the stage schema with the newest posts.  Version is just for illustration",
        version="1.0.0",
    )
n
yep thats right
except you can comment out
work_pool_name
and
job_variables
kwargs
b
Copy code
$ python deploy-etl-serve.py 
Your flow 'aws-blogs-etl-flow' is being served and polling for scheduled runs!

To trigger a run for this flow, use the following command:

        $ prefect deployment run 'aws-blogs-etl-flow/aws-blogs-etl'

You can also run your flow via the Prefect UI: <https://app.prefect.cloud/account/{account}/workspace/{worksplace}/deployments/deployment/cef3eeec-ab3f-4a1a-b0f9-ab463975e429>
and the trick here is that my terminal stays open.
n
yep yep thats the thing with serve, its basically a process worker coupled to some flows it can run, rolled into one. i was just thinking as a debugging step if that runs when serving it, you can rule out some problems i bet its related to requirements, so id guess the locally served flow works fine
b
I also opened a new terminal, ran the deployment, e2e all good
Not sure thats helpful, but that flow isn't yelling at me either, just when it gets triggered for the cron job,
n
ooh can you elaborate on this?
just when it gets triggered for the cron job,
b
The deployment is scheduled to run 1x/day. Its been working for weeks. Yesterday's run was the first that crashed. Crashed again tonight, but if I manually set a quick run, it runs e2e.
All on the free version on a managed worker pool, nothing changed.
That's what I have been having the course use, it's easy to grok. We develop locally, deploy to cloud, and leverage the free tier/managed worker pool with a schedule to orchestrate our pipelines.
n
gotcha. If I were you I'd be itching to reproduce this in a situation where I'm using
.serve
or running my own worker with
prefect worker start
so I could see the logs it spits out, bc they likely contain the cause of error (again, in lieu of better observability for managed workers that were working on)
b
I'll see what happens tomorrow. What's curious to me is that I haven't changed anything in the github repo or the logic. I will follow up with a new thread if it crashes again. I am trying to keep the surface area small for the students, and until last night, everything was working perfectly. Thanks again for everything, I really appreciate it.
n
sure thing! > What's curious to me is that I haven't changed anything in the github repo or the logic this is a common thing, upstream dependencies can break things unexpectedly if you don't have everything pinned when you use
pip_packages
on that pool, you're opting into installing deps at runtime, which can be more risky in that regard for example, just today 😅
b
I saw you post that, dependencies are fun! 🤣. All that’s needed in my flow is requests as it invoking google cloud functions that encapsulates the logic. But to your point, it’s not pinned in my deployment script which I should add