Ajeel Ahmed
07/10/2023, 1:10 PMtask_runners
?Taylor Curran
07/10/2023, 1:34 PMtask.submit()
syntax will result in the task runner being used.Ajeel Ahmed
07/10/2023, 1:55 PMimport datetime
import json
import logging
import shlex
import subprocess
from subprocess import STDOUT, CalledProcessError, check_output
from urllib.parse import urlparse
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from sqlalchemy.orm import aliased
from prefect import flow, get_run_logger, task
from prefect.blocks.system import JSON, Secret, String
from .utils import get_service
logger = logging.getLogger(__name__)
def upload_to_folder(service, folder_id, file_path, file_name):
file_metadata = {"name": file_name, "parents": [folder_id]}
media = MediaFileUpload(file_path)
file = (
service.files()
.create(body=file_metadata, media_body=media, fields="id, webViewLink")
.execute()
)
<http://logger.info|logger.info>(f'File ID: "{file.get("id")}".')
return file
@flow
def backup_precog_database_flow(
):
global logger
logger = get_run_logger()
backup_fname_prefix = String.load(backup_fname_prefix_block).value
secret_block = Secret.load(database_uri_block)
database_uri = secret_block.get()
gdrive_token_block = JSON.load(gdrive_token_block)
gdrive_token = gdrive_token_block.value
scope = "<https://www.googleapis.com/auth/drive>"
service = get_service(
api_name="drive",
api_version="v3",
scopes=[scope],
key_json=gdrive_token,
)
<http://logging.info|logging.info>(f"Authorized google drive service")
parsed = urlparse(database_uri)
host = parsed.hostname
port = parsed.port
user = parsed.username
password = parsed.password
database = parsed.path[1:]
<http://logger.info|logger.info>("pfpfpf")
<http://logger.info|logger.info>(
f"Dumping Postgres DB. Host: {host}:{port}, user: {user}, database:"
f" {database}"
)
dt = datetime.datetime.utcnow().isoformat()
fname = f"{backup_fname_prefix}_{database}_{dt}.dump"
fpath = f"/tmp/{fname}"
command = (
f"export PGPASSWORD={password} && pg_dump -Fc -h {host} -p {port} -U"
f" {user} {database} > {fpath}"
)
<http://logger.info|logger.info>(f"Executing command: {command}")
try:
output = check_output(command, stderr=STDOUT, shell=True)
logger.debug(f"Dumping output: {output}")
except CalledProcessError as e:
logger.error(f"Dumping failed: {e.output}")
raise
<http://logger.info|logger.info>(f"Dumping done")
<http://logger.info|logger.info>(f"Uploading to google drive")
result = upload_to_folder(service, gdrive_folder_id, fpath, fname)
<http://logger.info|logger.info>(f"Upload done: {result}")
Taylor Curran
07/10/2023, 1:56 PMAjeel Ahmed
07/10/2023, 1:59 PMsequential
task runner and then not using tasks
at allTaylor Curran
07/10/2023, 2:01 PMAjeel Ahmed
07/10/2023, 2:05 PMLoading flow for deployment 'backup_precog_database_flow'...
11:15:51 AM
Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
11:15:52 AM
Executing flow 'backup-precog-database-flow' for flow run 'cunning-boar'...
11:15:52 AM
Beginning execution...
11:15:53 AM
Created task run 'backup_to_server-d0ca3167-0' for task 'backup_to_server'
11:15:53 AM
Executing 'backup_to_server-d0ca3167-0' immediately...
11:15:53 AM
Beginning execution...
11:15:53 AM
backup_to_server-d0ca3167-0
Dumping Postgres DB. Host: postgresql:5432, user: postgres, database: db
11:15:53 AM
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/opt/prefect/jobs/flows/backup_precog_database_flow.py", line 118, in backup_to_server
<http://logging.info|logging.info>()
TypeError: info() missing 1 required positional argument: 'msg'
11:15:53 AM
backup_to_server-d0ca3167-0
Finished in state Failed("Task run encountered an exception: TypeError: info() missing 1 required positional argument: 'msg'\n")
11:15:53 AM
backup_to_server-d0ca3167-0
Created task run 'backup_to_drive-9749801d-0' for task 'backup_to_drive'
11:15:54 AM
Executing 'backup_to_drive-9749801d-0' immediately...
backup_to_drive
and backup_to_server
started at the same timeChristopher Boyd
07/10/2023, 2:08 PMAjeel Ahmed
07/10/2023, 2:09 PMChristopher Boyd
07/10/2023, 2:09 PMAjeel Ahmed
07/10/2023, 2:14 PMLoading flow for deployment 'backup_precog_database_flow'...
02:47:50 PM
Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
02:47:51 PM
Executing flow 'backup-precog-database-flow' for flow run 'groovy-earthworm'...
02:47:53 PM
Beginning execution...
02:47:53 PM
Dumping Postgres DB. Host: 1.1.1.1, user: postgres, database: db
02:47:54 PM
Executing command: export PGPASSWORD=1.1.1.1. && pg_dump -Fc -h 1.1.1.1 -p 16436 -U postgres db > /tmp/dev_precog_db_2023-07-10T13:47:54.281547.dump
import datetime
import json
import logging
import shlex
import subprocess
from subprocess import STDOUT, CalledProcessError, check_output
from urllib.parse import urlparse
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from sqlalchemy.orm import aliased
from prefect import flow, get_run_logger, task
from prefect.blocks.system import JSON, Secret, String
from prefect.task_runners import SequentialTaskRunner
from .utils import get_service
logger = logging.getLogger(__name__)
def upload_to_folder(service, folder_id, file_path, file_name):
file_metadata = {"name": file_name, "parents": [folder_id]}
media = MediaFileUpload(file_path)
file = (
service.files()
.create(body=file_metadata, media_body=media, fields="id, webViewLink")
.execute()
)
<http://logger.info|logger.info>(f'File ID: "{file.get("id")}".')
return file
def get_file_name(backup_fname_prefix_block, database):
backup_fname_prefix = String.load(backup_fname_prefix_block).value
dt = datetime.datetime.utcnow().isoformat()
fname = f"{backup_fname_prefix}_{database}_{dt}.dump"
return fname
def get_db_creds(database_uri_block):
secret_block = Secret.load(database_uri_block)
database_uri = secret_block.get()
parsed = urlparse(database_uri)
host = parsed.hostname
port = parsed.port
user = parsed.username
password = parsed.password
database = parsed.path[1:]
return host, port, user, password, database
@task()
def backup_to_drive(
gdrive_folder_id,
gdrive_token_block,
database_uri_block,
backup_fname_prefix_block,
):
gdrive_token_block = JSON.load(gdrive_token_block)
gdrive_token = gdrive_token_block.value
scope = "<https://www.googleapis.com/auth/drive>"
service = get_service(
api_name="drive",
api_version="v3",
scopes=[scope],
key_json=gdrive_token,
)
<http://logging.info|logging.info>("Authorized google drive service")
host, port, user, password, database = get_db_creds(database_uri_block)
fname = get_file_name(backup_fname_prefix_block, database)
fpath = f"/tmp/{fname}"
<http://logger.info|logger.info>(
f"Dumping Postgres DB. Host: {host}:{port}, user: {user}, database:"
f" {database}"
)
command = (
f"export PGPASSWORD={password} && pg_dump -Fc -h {host} -p {port} -U"
f" {user} {database} > {fpath}"
)
<http://logger.info|logger.info>(f"Executing command: {command}")
try:
output = check_output(command, stderr=STDOUT, shell=True)
logger.debug(f"Dumping output: {output}")
except CalledProcessError as e:
logger.error(f"Dumping failed: {e.output}")
raise
<http://logger.info|logger.info>("Dumping done")
<http://logger.info|logger.info>("Uploading to google drive")
result = upload_to_folder(service, gdrive_folder_id, fpath, fname)
<http://logger.info|logger.info>(f"Upload done: {result}")
@task()
def backup_to_server(
database_uri_block,
backup_fname_prefix_block,
):
host, port, user, password, database = get_db_creds(database_uri_block)
fname = get_file_name(backup_fname_prefix_block, database)
fpath_server = f"/mnt/bkp001pf/source/{fname}"
<http://logger.info|logger.info>(
f"Dumping Postgres DB. Host: {host}:{port}, user: {user}, database:"
f" {database}"
)
command = (
f"export PGPASSWORD={password} && pg_dump -Fc -h {host} -p {port} -U"
f" {user} {database} > {fpath_server} && touch new.pid"
)
<http://logger.info|logger.info>(f"Executing command: {command}")
try:
output = check_output(command, stderr=STDOUT, shell=True)
logger.debug(f"Dumping output: {output}")
except CalledProcessError as e:
logger.error(f"Dumping failed: {e.output}")
raise
<http://logger.info|logger.info>("Dumping done")
@flow
def backup_precog_database_flow(
database_uri_block="precog-database-uri",
gdrive_folder_id="10_aDBuvOC3KFubzz2WmdlwSRPtTXlFMT",
gdrive_token_block="gdrive-service-account-token",
backup_fname_prefix_block="dev-backup-fname-prefix",
):
global logger
logger = get_run_logger()
backup_to_server(
database_uri_block, backup_fname_prefix_block, return_state=True
)
backup_to_drive(
database_uri_block,
gdrive_folder_id,
gdrive_token_block,
backup_fname_prefix_block,
)
Christopher Boyd
07/10/2023, 2:20 PM.submit()
actions anywhere right?Ajeel Ahmed
07/10/2023, 2:20 PMChristopher Boyd
07/10/2023, 2:20 PMAjeel Ahmed
07/10/2023, 2:24 PMdef flow():
task_1()
task_2()
example 2:
def flow():
task_1(return_state=True)
task_2()
Christopher Boyd
07/10/2023, 2:24 PM()
on your task decorators?Ajeel Ahmed
07/10/2023, 2:26 PMtask_1
fails task_2
doesn’t run either
but in example 2 if task_1
fails task_2
does run even without it
why is this? why does my passing the return_state
parameter cause such a difference in the outputreturn_state
I want them to run regardless of the previous ones stateChristopher Boyd
07/10/2023, 2:29 PMAjeel Ahmed
07/10/2023, 2:31 PMChristopher Boyd
07/10/2023, 2:36 PMAjeel Ahmed
07/10/2023, 2:54 PMconcurrently running
log message comes on even in the example I posted above (with the emoji 2️⃣) even though I’m not using tasks at all in that onesubmit
and explicitly call the sequential
task_runnerChristopher Boyd
07/10/2023, 2:56 PMAjeel Ahmed
07/10/2023, 2:58 PMconcurrently running
message comes on regardless of whether I use the tasks or notJeff Hale
07/10/2023, 3:49 PMprefect version
?