Hello all, I have a more conceptual question today...
# prefect-community
m
Hello all, I have a more conceptual question today: My company has existing ETL tasks that are all in their own docker containers. I have been successful in having prefect run the container as is. My question, however, is how would we be able to get the most out of prefect. Right now each ETL task lives in its own docker container with its own sort of logging handled by python's logging class. We are wanting to move everything over to being handled by prefect and I am looking for input and advice on how best to utilize the capabilities of Prefect for Flow deployment. I know that flows can be restarted at any stage in which they fail, but how would we be able to utilize this if the task the flow is running on is in a docker container? Are they any docs that I could read, as well as examples? We are wanting to be able to get alerts at any point in the process chain upon failure, i.e. if the API call times out or the task of loading the data into our databases fail. Thank you in advance for your help and advice!
👀 1
n
Hi @Matthew Blau, there's a lot to unpack here so I'll try to break down your question into some Prefect-y concepts: Logging Prefect makes it dead simple to log to Prefect Server/Cloud, using the logger shipped with the context of any flow/task. You can use the logger like this:
Copy code
import prefect

@task
def some_task():
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("An info message.")
    logger.warning("A warning message.")
(notice that you can set the logging level there, which will be reflected in the Prefect UI). Any Python logging you've got can probably be switched over with minimal effort. In addition, you can pass
log_stdout=True
to your task (
@task(log_stdout=True
) to capture print statements 🙂 You can read more on logging here. Restarting / Using Results One of the really nice things about Prefect is that you can configure results external to the execution environment, through the result interface. This pattern enables retries and restarts even when using short lived workers and/or containers. You can configure results at the flow level OR on a per-task basis, giving you tons of flexibility. When you've configured results on your tasks, restarting flow runs from failure points becomes as simple as clicking the restart button in the UI. More on using results here. Orchestration You may already be using these, but if not I'd encourage you to take a look at the Kubernetes agent (which submits your runs as Kubernetes jobs) or the ECS agent (which will deploy flow runs as ECS tasks), and the Dask executor, which will unlock run parallelization (particularly useful when using mapping on large ETL pipelines). For more on this, Jim wrote an excellent blog post walking through some of the specifics of the new run config interface. Hopefully that answers some of your questions!
m
@nicholas Thank you for the detailed reply! I am looking through the links you have provided and have at least one follow-up question: it seems that to implement the logging and possibly the results interface I would need to re-engineer the ETL task itself, which would require the docker image to be re-built with prefect installed into it, is that correct? Our tasks are pretty straightforward right now. Grab data from an API call -> throw it into a CSV -> load into database on a recurring basis. Ideally would want to be able to get success/failure info from each step.
n
That sounds correct @Matthew Blau - you'll need Prefect one way or another for that though I would say it could be more additive than refactoring since prefect logging could be separate from your current logging and results could be configured just at the task level... it sounds like you could get away with changing very little of your current code and instead using the parts of Prefect that make the most sense in the current pipeline without changing it
m
@nicholas so what you're saying is that I can swap out in place the logging logic we have built out with prefects so it can be exposed in the UI, correct? And that I can add to the existing ETL task logic for configuring the results where it makes sense to do so, correct?
n
Yup, that's exactly right. Note that when you don't configure some sort of result for a task, restarting from failure is unlikely to succeed (unless of course that task doesn't need a result, for example if nothing downstream is relying on it)
m
@nicholas Ah, awesome! So it's pretty minimal re-engineering effort outside of rebuilding our docker container images to incorporate prefect into it so it can expose itself to the flow run, is that correct? Or is there some form of configuration that would need to happen on the Prefect core side of things? In either case, is there some documentation that I can read on how to accomplish this task?
n
Hm, actually would you be able to provide a minimal example of what you're doing? I want to make sure I understand your question a little better before I say definitively
m
right now we have an integration that does some API calls to get information on ad performance and the various metrics, load the information from the API call into a CSV file, and then take that data in the CSV and load it into a database
It is in a docker container and ran with docker-compose up currently
The task itself handles the date range and everything, which, with prefect we can remove
It is run on a schedule determined by cron
@nicholas does that help?
n
I see, so if I could give you a minimum example of what that might look like, I'll assume something like this:
Copy code
def make_api_call():
  return request("GET")

def create_csv(data):
  with open("my.csv", "w") as f:
    print(data) # in place of a python logger
    f.writer(f).writerow(data)

  return "my.csv"


def load_to_db(ref):
  # some db logic here


data = await make_api_call()
ref = create_csv(data)
load_to_db(ref)
Does that make sense?
only that would be done in a container?
m
@nicholas yes that's pretty much how the set up of it goes
n
Great, what's nice about that is how easy it is to transform those to immediately observable Prefect tasks (with logging!):
Copy code
@task
def make_api_call():
  return request("GET")

@task
def create_csv(data):
  logger = context.get("logger")
  with open("my.csv", "w") as f:
    logger.debug(data) # in place of a python logger
    f.writer(f).writerow(data)

  return "my.csv"


@task
def load_to_db(ref):
  # some db logic here


with Flow("ETL") as flow:
  data = make_api_call()
  ref = create_csv(data)
  load_to_db(ref)
^that would immediately create your data dependencies. Then if you wanted to take it further, you could define the storage/execution you'd like to see:
Copy code
flow.run_config = DockerRun() # says the flow should be executed in a Docker container
flow.executor = LocalDaskExecutor() # uses Dask in the container to allow task parallelization
flow.storage = Docker(registry_url="<<my registry>>", image_name="<<flow image>>") # defines how your flow is stored and allows you to add external dependencies to your container
m
Hello @nicholas, I have spent a lot of today digesting the info that you had provided on Friday. In order to rebuild the existing docker images I should be following https://docs.prefect.io/orchestration/recipes/configuring_storage.html#building-a-custom-base-image, correct? Or should I be just adding prefect to my requirements.txt?
n
Hi @Matthew Blau - if you're continuing to use Docker as a containerization method it makes sense to build out a custom image
m
@nicholas awesome; all our containers are local right now so that should be pretty easy to work with then, yes? I can build out a custom image with a python version, have it register to the UI and run on a schedule without much trouble, if I am understanding correctly.
n
Just to tweak your statement slightly, you'd build out a flow that uses a custom image for storage and then register that with the API (thereby it'll show up in the UI)
m
@nicholas ah so I would be converting our existing ETL tasks into a Prefect flow directly, that uses docker as the storage medium. And from there I would register the docker container similar to here? https://docs.prefect.io/api/latest/cli/register.html#flow or would I need to add a flow.register line to the ETL task in the docker container for it to register to the UI?
n
Either one of those works just fine 🙂 (I recommend using
flow.register
as part of your flow script though, just for visibility to your team)
m
@nicholas awesome! I think I am getting the hang of this now, Prefect is pretty straightforward to work with. Could I trouble you for another question, unrelated to the current thread? my config.toml looks like
Copy code
[server]
  [server.ui]
  apollo_url="<http://localhost:4201/graphql>"
as a test to see if I can change the ports it listens on and when I start prefect with
Copy code
prefect server start
the apollo_url is still set to the default port of 4200. Am I doing something wrong with the config file?
n
Hm that particular port won't work because that's being used by the graphql container (which is a little confusing but those are separate services). When making changes to your config.toml you'll need to restart the Server and for UI-specific settings you can check http://localhost:8080/settings.json to see what settings the UI is picking up (in this instance the
server_url
)
m
@nicholas hmm, that settings.json page doesn't load for me though localhost:8080 is up. We have a service that consumes the hasura port of port 3000 and I was wanting to change the port in the config.toml to something like port 3001 in order to not conflict
n
Hm... can you confirm a few things for me? 1. What version of Prefect are you running to start your Server 2. Are you running Server locally or on another machine?
m
0.14 locally, 0.14.1 on a remote server. Testing the changes on local before making the same changes on the remote server
n
Hm ok, and you don't get anything when visiting your local UI address +
/settings.json
?
m
@nicholas correct, I am navigating to localhost:8080/settings.json and it just says unable to connect
n
But visiting localhost:8080 brings up the UI? That's odd to me. Are you modifying the webserver at all?
m
@nicholas I had restarted the server and settings.json pulls up, but the grapql server is still listening to port 4200 despite
Copy code
server_url	"<http://localhost:4201/graphql>"
being in the settings.json page. Am I doing something wrong?
n
I'm not sure what you mean by the graphql server, can you post your config.toml?
m
@nicholas right now it is just
Copy code
[server]
host = "<http://localhost>"
port = "4200"
host_port = "4200"

endpoint = "${server.host):${server.port}"
[server.graphql]
host = "<http://localhost>"
port = "4201"
host_port = "4201"
debug = false
path = "/graphql/"
and when I bring up the server the address I go to is localhost://4200 and it brings up the graphql page. Perhaps I am misunderstanding? My understanding is to adjust the port info and it will change the url to localhost://4201
@nicholas okay so I have determined that I have an incorrect understanding of this. I have a better understanding now. However, when I attempt to override the defaults for hasura, as that is the problem port, in my config.toml I get ECONNREFUSED on port 3001. If I start prefect with --hasura-port 3001 I do not have this issue. Additionally, I get an error that shows that it cannot connect to http://localhost:4200/graphql on the remote server I am using for prefect. In UI I can update the ip to 192.168.1.xxx and it connects just fine. However, when I attempt to put those values in the config.toml I encounter errors. What am I doing wrong? Here is my config toml for reference:
Copy code
[server]
  [server.graphql]
  host = "192.168.1.xxx"
  port = "4201"
  host_port = "4201"
  debug = false
  path = "/graphql"

  [server.hasura]
  host = "http:/192.168.1.xxx"
  port = "3001"
  host_port = "3001"
  graphql_url = "http://${server.hasura.host}:$server.hasura.port}/v1alpha1/graphql"
  ws_url = "ws://${server.hasura.host}:${server.hasura.port}/v1alpha1/graphql"
  execute_retry_seconds = 10
n
@Matthew Blau - where is that config.toml file stored? That config looks fine to me from the hasura point of view. For the UI you'll need to add another section like this:
Copy code
[server.ui]
apollo_url = "<http://192.168.1.xxx:4200/graphql>"
and then restart the server.
m
@nicholas It lives in ~/.prefect. I just need to add a section for apollo_url then is what you are saying?
n
Yup, that should make sure the UI will connect to the API correctly. This article might help you with this whole deployment process: https://medium.com/the-prefect-blog/prefect-server-101-deploying-to-google-cloud-platform-47354b16afe2
m
@nicholas hmm, playing around with the config.toml even stripping everything down to
Copy code
[server]
  
  [server.ui]
    
    apollo_url = "<http://YOUR_MACHINES_PUBLIC_IP:4200/graphql>"
as shown in the medium article you provided does not allow for it to connect, although it does show up in the UI as seen here. I cannot even manually set it through the UI. Only removing the config file entirely allows me to at least set it correctly in the UI
n
You've got an extra
/
forward slash in your settings for that IP address:
Copy code
192.168.1.39/:4200/graphql
should be:
Copy code
192.168.1.39:4200/graphql
m
@nicholas ahh I see! I have corrected that and modified my config.toml and am getting this error:
Copy code
apollo_1    |    'request to <http://hasura:3001/v1alpha1/graphql> failed, reason: connect ECONNREFUSED 192.168.112.3:3001',
apollo_1    |   type: 'system',
apollo_1    |   errno: 'ECONNREFUSED',
apollo_1    |   code: 'ECONNREFUSED' }
apollo_1    | Trying again in 3 seconds...
apollo_1    | Building schema...
apollo_1    | { FetchError: request to <http://hasura:3001/v1alpha1/graphql> failed, reason: connect ECONNREFUSED 192.168.112.3:3001
apollo_1    |     at ClientRequest.<anonymous> (/apollo/node_modules/node-fetch/lib/index.js:1455:11)
apollo_1    |     at ClientRequest.emit (events.js:182:13)
apollo_1    |     at Socket.socketErrorListener (_http_client.js:392:9)
apollo_1    |     at Socket.emit (events.js:182:13)
apollo_1    |     at emitErrorNT (internal/streams/destroy.js:82:8)
apollo_1    |     at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
apollo_1    |     at process._tickCallback (internal/process/next_tick.js:63:19)
apollo_1    |   message:
apollo_1    |    'request to <http://hasura:3001/v1alpha1/graphql> failed, reason: connect ECONNREFUSED 192.168.112.3:3001',
apollo_1    |   type: 'system',
apollo_1    |   errno: 'ECONNREFUSED',
apollo_1    |   code: 'ECONNREFUSED' }
apollo_1    | Trying again in 3 seconds...
this is my config:
Copy code
[server]
  [server.hasura]
     host = "<http://192.168.1.39>"
     port = "3001"
     host_port = "3001
  [server.ui]
    apollo_url = "192.168.1.39:4200/graphql"
What am I doing wrong?
n
Hi @Matthew Blau - it looks like your hasura container isn’t starting or isn’t accessible for some reason. Does anything in the logs indicate that any other container is unhealthy?
m
Hello @nicholas after much digging around I have determined that prefect parses everything named config.toml.*. I had a config.toml and config.toml.bak and as a result it had caused some oddities that were resolved after I removed the config.toml.bak file. I will be filing a report on github for this a little later today
n
Interesting, glad you got to the bottom of it!