Matthew Blau
01/15/2021, 4:17 PMnicholas
import prefect
@task
def some_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("An info message.")
logger.warning("A warning message.")
(notice that you can set the logging level there, which will be reflected in the Prefect UI). Any Python logging you've got can probably be switched over with minimal effort. In addition, you can pass log_stdout=True
to your task (@task(log_stdout=True
) to capture print statements 🙂 You can read more on logging here.
Restarting / Using Results
One of the really nice things about Prefect is that you can configure results external to the execution environment, through the result interface. This pattern enables retries and restarts even when using short lived workers and/or containers. You can configure results at the flow level OR on a per-task basis, giving you tons of flexibility. When you've configured results on your tasks, restarting flow runs from failure points becomes as simple as clicking the restart button in the UI. More on using results here.
Orchestration
You may already be using these, but if not I'd encourage you to take a look at the Kubernetes agent (which submits your runs as Kubernetes jobs) or the ECS agent (which will deploy flow runs as ECS tasks), and the Dask executor, which will unlock run parallelization (particularly useful when using mapping on large ETL pipelines). For more on this, Jim wrote an excellent blog post walking through some of the specifics of the new run config interface.
Hopefully that answers some of your questions!Matthew Blau
01/15/2021, 8:08 PMnicholas
Matthew Blau
01/15/2021, 9:41 PMnicholas
Matthew Blau
01/15/2021, 9:50 PMnicholas
Matthew Blau
01/15/2021, 9:54 PMnicholas
def make_api_call():
return request("GET")
def create_csv(data):
with open("my.csv", "w") as f:
print(data) # in place of a python logger
f.writer(f).writerow(data)
return "my.csv"
def load_to_db(ref):
# some db logic here
data = await make_api_call()
ref = create_csv(data)
load_to_db(ref)
Matthew Blau
01/15/2021, 10:27 PMnicholas
@task
def make_api_call():
return request("GET")
@task
def create_csv(data):
logger = context.get("logger")
with open("my.csv", "w") as f:
logger.debug(data) # in place of a python logger
f.writer(f).writerow(data)
return "my.csv"
@task
def load_to_db(ref):
# some db logic here
with Flow("ETL") as flow:
data = make_api_call()
ref = create_csv(data)
load_to_db(ref)
^that would immediately create your data dependencies. Then if you wanted to take it further, you could define the storage/execution you'd like to see:
flow.run_config = DockerRun() # says the flow should be executed in a Docker container
flow.executor = LocalDaskExecutor() # uses Dask in the container to allow task parallelization
flow.storage = Docker(registry_url="<<my registry>>", image_name="<<flow image>>") # defines how your flow is stored and allows you to add external dependencies to your container
Matthew Blau
01/18/2021, 9:55 PMnicholas
Matthew Blau
01/19/2021, 3:49 PMnicholas
Matthew Blau
01/19/2021, 3:54 PMnicholas
flow.register
as part of your flow script though, just for visibility to your team)Matthew Blau
01/19/2021, 4:00 PM[server]
[server.ui]
apollo_url="<http://localhost:4201/graphql>"
as a test to see if I can change the ports it listens on and when I start prefect with
prefect server start
the apollo_url is still set to the default port of 4200. Am I doing something wrong with the config file?nicholas
server_url
)Matthew Blau
01/19/2021, 4:21 PMnicholas
Matthew Blau
01/19/2021, 4:40 PMnicholas
/settings.json
?Matthew Blau
01/19/2021, 4:50 PMnicholas
Matthew Blau
01/19/2021, 4:56 PMserver_url "<http://localhost:4201/graphql>"
being in the settings.json page. Am I doing something wrong?nicholas
Matthew Blau
01/19/2021, 7:19 PM[server]
host = "<http://localhost>"
port = "4200"
host_port = "4200"
endpoint = "${server.host):${server.port}"
[server.graphql]
host = "<http://localhost>"
port = "4201"
host_port = "4201"
debug = false
path = "/graphql/"
and when I bring up the server the address I go to is localhost://4200 and it brings up the graphql page. Perhaps I am misunderstanding? My understanding is to adjust the port info and it will change the url to localhost://4201[server]
[server.graphql]
host = "192.168.1.xxx"
port = "4201"
host_port = "4201"
debug = false
path = "/graphql"
[server.hasura]
host = "http:/192.168.1.xxx"
port = "3001"
host_port = "3001"
graphql_url = "http://${server.hasura.host}:$server.hasura.port}/v1alpha1/graphql"
ws_url = "ws://${server.hasura.host}:${server.hasura.port}/v1alpha1/graphql"
execute_retry_seconds = 10
nicholas
[server.ui]
apollo_url = "<http://192.168.1.xxx:4200/graphql>"
and then restart the server.Matthew Blau
01/19/2021, 9:14 PMnicholas
Matthew Blau
01/19/2021, 9:58 PM[server]
[server.ui]
apollo_url = "<http://YOUR_MACHINES_PUBLIC_IP:4200/graphql>"
as shown in the medium article you provided does not allow for it to connect, although it does show up in the UI as seen here. I cannot even manually set it through the UI. Only removing the config file entirely allows me to at least set it correctly in the UInicholas
/
forward slash in your settings for that IP address:
192.168.1.39/:4200/graphql
should be:
192.168.1.39:4200/graphql
Matthew Blau
01/20/2021, 1:28 PMapollo_1 | 'request to <http://hasura:3001/v1alpha1/graphql> failed, reason: connect ECONNREFUSED 192.168.112.3:3001',
apollo_1 | type: 'system',
apollo_1 | errno: 'ECONNREFUSED',
apollo_1 | code: 'ECONNREFUSED' }
apollo_1 | Trying again in 3 seconds...
apollo_1 | Building schema...
apollo_1 | { FetchError: request to <http://hasura:3001/v1alpha1/graphql> failed, reason: connect ECONNREFUSED 192.168.112.3:3001
apollo_1 | at ClientRequest.<anonymous> (/apollo/node_modules/node-fetch/lib/index.js:1455:11)
apollo_1 | at ClientRequest.emit (events.js:182:13)
apollo_1 | at Socket.socketErrorListener (_http_client.js:392:9)
apollo_1 | at Socket.emit (events.js:182:13)
apollo_1 | at emitErrorNT (internal/streams/destroy.js:82:8)
apollo_1 | at emitErrorAndCloseNT (internal/streams/destroy.js:50:3)
apollo_1 | at process._tickCallback (internal/process/next_tick.js:63:19)
apollo_1 | message:
apollo_1 | 'request to <http://hasura:3001/v1alpha1/graphql> failed, reason: connect ECONNREFUSED 192.168.112.3:3001',
apollo_1 | type: 'system',
apollo_1 | errno: 'ECONNREFUSED',
apollo_1 | code: 'ECONNREFUSED' }
apollo_1 | Trying again in 3 seconds...
this is my config:
[server]
[server.hasura]
host = "<http://192.168.1.39>"
port = "3001"
host_port = "3001
[server.ui]
apollo_url = "192.168.1.39:4200/graphql"
What am I doing wrong?nicholas
Matthew Blau
01/20/2021, 4:29 PMnicholas