https://prefect.io logo
Title
f

Federico Zambelli

03/06/2023, 11:12 AM
Hello folks, say there's a pipeline running in a docker container on some EC2 instance. If Prefect is added to said pipeline, is it then possible to schedule/trigger the executions through Prefect Cloud?
c

Christopher Boyd

03/06/2023, 2:10 PM
yes
the agent is outbound only, so as long as your container has the ability to call out to the internet (api.prefect) for Prefect cloud. Even if you didn’t, you would still have the option to do it locally, but without outbound communication you wouldn’t see the runs reflected anywhere unless you were self hosting inside
f

Federico Zambelli

03/06/2023, 2:15 PM
I understood the 2nd part, not entirely the 1st. The agent is the process running when one starts
prefect agent start --work queue "default
so this would be running inside my docker container, correct? I still don't fully get how prefect cloud is supposed to interact with said container and say "run this specific flow now".
c

Christopher Boyd

03/06/2023, 2:27 PM
you don’t, the agent is outbound only and retrieves flow runs from the cloud
they would exist in the queue “default”
that the agent polls to retrieve and execute
the cloud schedules and orchestrates, so if it schedules 50 flow runs, they are just sitting in a queue cloud side on your workspace, and the agent(s) continuously poll to search for matching flow runs they need to execute
f

Federico Zambelli

03/06/2023, 2:54 PM
Ahhh I see ok, thanks for the explanation. I assume the same would apply in a situation where a successful flow run triggers a different flow (e.g. the one sitting in the container), did I get it right?
c

Christopher Boyd

03/06/2023, 2:56 PM
triggers in what sense? If they are sub-flows in the flows, yes; if it’s through something like an automation, it will technically spawn a new flow run. In other words - if it’s a subflow, it will run as part of the main flow run, unless you call / create a new flow_run (either programmatically, or via an automation) but the behavior is the same yes
f

Federico Zambelli

03/06/2023, 2:56 PM
i wasn't thinking a subflow, I was thinking a completely separate deployment
lemme try to come up with an example: Say I have 2 containers, 1 extracts some data from an API and saves it on S3. The other one reads from S3, does some transformations and then loads the data somewhere else, e.g. Snowflake. I want to execute the code in the 2nd container only when the 1st one finishes successfully. The situation you described above, would work even in this case, I assume
c

Christopher Boyd

03/06/2023, 3:00 PM
Yep, you can set a trigger automation in cloud ui , on completed , OR, you can use the python prefect api to create a flow run of the 2nd deployment
Either would work
Automations are easy, but are a cloud feature only
For the deployment create flow run , I can share the url in a minute
f

Federico Zambelli

03/06/2023, 3:02 PM
got it, ok thanks. And, I have one last doubt if you don't mind. Does DockerContainer infrastructure have anything to do with this? I followed a video that showcased an example of this feature. In the video, the teacher would create a custom image, copy the flow code in it, and upload said image to DockerHub. To be completely fair I didn't exactly understood the point.
c

Christopher Boyd

03/06/2023, 3:15 PM
it sort of depends, and this is a personal decision based on your environment and infrastructure. If all your code is in the same location as where you are running the agent (inside the docker container) then it’s perfectly okay to run it as a process (that just happens to be in a docker container)
but if you separate out your code, so one image contains one flow, and another image contains another flow, then you might need to move up a layer and either use a VM and the docker infrastructure block, or kubernetes to create multiple containers
in other words, I don’t think you can create containers in containers by running the agent inside docker
but if all your code is co-located, you don’t need to
f

Federico Zambelli

03/06/2023, 3:18 PM
so in other (very dumbed down) words, storage + infra blocks are reusable shortcuts to tell Prefect "take this code from <some-storage> and run it in <some-infrastructure>"
c

Christopher Boyd

03/06/2023, 3:18 PM
exactly
f

Federico Zambelli

03/06/2023, 3:19 PM
Ahhhhhh
c

Christopher Boyd

03/06/2023, 3:19 PM
with nothing specified (no storage block) it defaults to local with nothing specified (no infrastructure) it defaults to process
f

Federico Zambelli

03/06/2023, 3:19 PM
*enlightenment moment intensifies
ok the puzzle pieces are starting to connect
thanks for taking the time to explain
🙌 1
c

Christopher Boyd

03/06/2023, 3:20 PM
For sure, glad to help!
🙏 1
m

Maikel Penz

03/07/2023, 6:51 AM
@Christopher Boyd I deployed an EKS cluster with the 2.0 agent in it. I've given 2 private subnets to the cluster (internet access through nat gateway). The agent starts fine but it cannot do anything on Prefect Cloud, like creating a work queue that doesn't exist. See the error in the image Does the agent need to live on public subnets (with internet gateway)?
I've actually moved the agent to public subnets but still can't get it to work
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/httpcore/_exceptions.py", line 
10, in map_exceptions
    yield
  File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", 
line 111, in connect_tcp
    stream: anyio.abc.ByteStream = await anyio.connect_tcp(
  File "/usr/local/lib/python3.8/site-packages/anyio/_core/_sockets.py", line 
189, in connect_tcp
    gai_res = await getaddrinfo(
  File "/usr/local/lib/python3.8/site-packages/anyio/_core/_sockets.py", line 
496, in getaddrinfo
    gai_res = await get_asynclib().getaddrinfo(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", 
line 1754, in getaddrinfo
    result = await get_running_loop().getaddrinfo(
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 825, in 
getaddrinfo
    return await self.run_in_executor(
  File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.8/socket.py", line 918, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -3] Temporary failure in name resolution