https://prefect.io logo
a

Ajeel Ahmed

07/10/2023, 1:10 PM
I’m just calling simple two tasks in a flow without using
task_runners
?
t

Taylor Curran

07/10/2023, 1:34 PM
The concurrent task runner is the default, if you are at all submitting tasks, the concurrent task runner will be used. Could you share your flow code? Using
task.submit()
syntax will result in the task runner being used.
a

Ajeel Ahmed

07/10/2023, 1:55 PM
okay what if if it’s just a flow? Because I’m still getting it
Copy code
import datetime
import json
import logging
import shlex
import subprocess
from subprocess import STDOUT, CalledProcessError, check_output
from urllib.parse import urlparse

from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from sqlalchemy.orm import aliased

from prefect import flow, get_run_logger, task
from prefect.blocks.system import JSON, Secret, String

from .utils import get_service

logger = logging.getLogger(__name__)


def upload_to_folder(service, folder_id, file_path, file_name):
    file_metadata = {"name": file_name, "parents": [folder_id]}
    media = MediaFileUpload(file_path)
    file = (
        service.files()
        .create(body=file_metadata, media_body=media, fields="id, webViewLink")
        .execute()
    )
    <http://logger.info|logger.info>(f'File ID: "{file.get("id")}".')
    return file


@flow
def backup_precog_database_flow(
):
    global logger
    logger = get_run_logger()

    backup_fname_prefix = String.load(backup_fname_prefix_block).value

    secret_block = Secret.load(database_uri_block)
    database_uri = secret_block.get()

    gdrive_token_block = JSON.load(gdrive_token_block)
    gdrive_token = gdrive_token_block.value
    scope = "<https://www.googleapis.com/auth/drive>"

    service = get_service(
        api_name="drive",
        api_version="v3",
        scopes=[scope],
        key_json=gdrive_token,
    )
    <http://logging.info|logging.info>(f"Authorized google drive service")

    parsed = urlparse(database_uri)
    host = parsed.hostname
    port = parsed.port
    user = parsed.username
    password = parsed.password
    database = parsed.path[1:]
    <http://logger.info|logger.info>("pfpfpf")
    <http://logger.info|logger.info>(
        f"Dumping Postgres DB. Host: {host}:{port}, user: {user}, database:"
        f" {database}"
    )

    dt = datetime.datetime.utcnow().isoformat()

    fname = f"{backup_fname_prefix}_{database}_{dt}.dump"
    fpath = f"/tmp/{fname}"

    command = (
        f"export PGPASSWORD={password} && pg_dump -Fc -h {host} -p {port} -U"
        f" {user} {database} > {fpath}"
    )
    <http://logger.info|logger.info>(f"Executing command: {command}")
    try:
        output = check_output(command, stderr=STDOUT, shell=True)
        logger.debug(f"Dumping output: {output}")
    except CalledProcessError as e:
        logger.error(f"Dumping failed: {e.output}")
        raise

    <http://logger.info|logger.info>(f"Dumping done")

    <http://logger.info|logger.info>(f"Uploading to google drive")
    result = upload_to_folder(service, gdrive_folder_id, fpath, fname)
    <http://logger.info|logger.info>(f"Upload done: {result}")
2️⃣ 1
here is the code
t

Taylor Curran

07/10/2023, 1:56 PM
Im not sure but a task runner may still be instantiated even if its not used. Is it causing any problems?
a

Ajeel Ahmed

07/10/2023, 1:59 PM
hm no, not really, right now I’m running into 1. One of the task(in this scenario I’m not using task_runner and just putting tasks as is into the flow) is taking too long, I don’t know why and I’m investigating it firstly by using a
sequential
task runner and then not using
tasks
at all
while doing that I realised the task_runner was getting called and running tasks concurrently even in the scenarios where I’m not using the task_runner
t

Taylor Curran

07/10/2023, 2:01 PM
thats unusual, in your timeline digram, you can verify that the tasks are running around the same time?
a

Ajeel Ahmed

07/10/2023, 2:05 PM
Copy code
Loading flow for deployment 'backup_precog_database_flow'...
11:15:51 AM

Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
11:15:52 AM

Executing flow 'backup-precog-database-flow' for flow run 'cunning-boar'...
11:15:52 AM

Beginning execution...
11:15:53 AM

Created task run 'backup_to_server-d0ca3167-0' for task 'backup_to_server'
11:15:53 AM

Executing 'backup_to_server-d0ca3167-0' immediately...
11:15:53 AM

Beginning execution...
11:15:53 AM
backup_to_server-d0ca3167-0

Dumping Postgres DB. Host: postgresql:5432, user: postgres, database: db
11:15:53 AM

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1449, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/prefect/jobs/flows/backup_precog_database_flow.py", line 118, in backup_to_server
    <http://logging.info|logging.info>()
TypeError: info() missing 1 required positional argument: 'msg'
11:15:53 AM
backup_to_server-d0ca3167-0

Finished in state Failed("Task run encountered an exception: TypeError: info() missing 1 required positional argument: 'msg'\n")
11:15:53 AM
backup_to_server-d0ca3167-0

Created task run 'backup_to_drive-9749801d-0' for task 'backup_to_drive'
11:15:54 AM

Executing 'backup_to_drive-9749801d-0' immediately...
1️⃣ 1
you can see
backup_to_drive
and
backup_to_server
started at the same time
c

Christopher Boyd

07/10/2023, 2:08 PM
I don’t see “backup_to_drive” or “backup_to_server” tasks
a

Ajeel Ahmed

07/10/2023, 2:09 PM
Screenshot 2023-07-10 at 3.09.21 PM.png
c

Christopher Boyd

07/10/2023, 2:09 PM
where is that task written / defined in code
a

Ajeel Ahmed

07/10/2023, 2:14 PM
my apologies, the logs for the code that I sent above is this
Copy code
Loading flow for deployment 'backup_precog_database_flow'...
02:47:50 PM

Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
02:47:51 PM

Executing flow 'backup-precog-database-flow' for flow run 'groovy-earthworm'...
02:47:53 PM

Beginning execution...
02:47:53 PM

Dumping Postgres DB. Host: 1.1.1.1, user: postgres, database: db
02:47:54 PM

Executing command: export PGPASSWORD=1.1.1.1. && pg_dump -Fc -h 1.1.1.1 -p 16436 -U postgres db > /tmp/dev_precog_db_2023-07-10T13:47:54.281547.dump
2️⃣ 1
and this is the code for the logs I pasted before :
Copy code
import datetime
import json
import logging
import shlex
import subprocess
from subprocess import STDOUT, CalledProcessError, check_output
from urllib.parse import urlparse

from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from sqlalchemy.orm import aliased

from prefect import flow, get_run_logger, task
from prefect.blocks.system import JSON, Secret, String
from prefect.task_runners import SequentialTaskRunner

from .utils import get_service

logger = logging.getLogger(__name__)


def upload_to_folder(service, folder_id, file_path, file_name):
    file_metadata = {"name": file_name, "parents": [folder_id]}
    media = MediaFileUpload(file_path)
    file = (
        service.files()
        .create(body=file_metadata, media_body=media, fields="id, webViewLink")
        .execute()
    )
    <http://logger.info|logger.info>(f'File ID: "{file.get("id")}".')
    return file


def get_file_name(backup_fname_prefix_block, database):
    backup_fname_prefix = String.load(backup_fname_prefix_block).value
    dt = datetime.datetime.utcnow().isoformat()

    fname = f"{backup_fname_prefix}_{database}_{dt}.dump"
    return fname


def get_db_creds(database_uri_block):
    secret_block = Secret.load(database_uri_block)
    database_uri = secret_block.get()

    parsed = urlparse(database_uri)
    host = parsed.hostname
    port = parsed.port
    user = parsed.username
    password = parsed.password
    database = parsed.path[1:]

    return host, port, user, password, database


@task()
def backup_to_drive(
    gdrive_folder_id,
    gdrive_token_block,
    database_uri_block,
    backup_fname_prefix_block,
):
    gdrive_token_block = JSON.load(gdrive_token_block)
    gdrive_token = gdrive_token_block.value
    scope = "<https://www.googleapis.com/auth/drive>"

    service = get_service(
        api_name="drive",
        api_version="v3",
        scopes=[scope],
        key_json=gdrive_token,
    )
    <http://logging.info|logging.info>("Authorized google drive service")

    host, port, user, password, database = get_db_creds(database_uri_block)
    fname = get_file_name(backup_fname_prefix_block, database)
    fpath = f"/tmp/{fname}"
    <http://logger.info|logger.info>(
        f"Dumping Postgres DB. Host: {host}:{port}, user: {user}, database:"
        f" {database}"
    )

    command = (
        f"export PGPASSWORD={password} && pg_dump -Fc -h {host} -p {port} -U"
        f" {user} {database} > {fpath}"
    )
    <http://logger.info|logger.info>(f"Executing command: {command}")
    try:
        output = check_output(command, stderr=STDOUT, shell=True)
        logger.debug(f"Dumping output: {output}")
    except CalledProcessError as e:
        logger.error(f"Dumping failed: {e.output}")
        raise

    <http://logger.info|logger.info>("Dumping done")
    <http://logger.info|logger.info>("Uploading to google drive")
    result = upload_to_folder(service, gdrive_folder_id, fpath, fname)
    <http://logger.info|logger.info>(f"Upload done: {result}")


@task()
def backup_to_server(
    database_uri_block,
    backup_fname_prefix_block,
):
    host, port, user, password, database = get_db_creds(database_uri_block)
    fname = get_file_name(backup_fname_prefix_block, database)
    fpath_server = f"/mnt/bkp001pf/source/{fname}"
    <http://logger.info|logger.info>(
        f"Dumping Postgres DB. Host: {host}:{port}, user: {user}, database:"
        f" {database}"
    )

    command = (
        f"export PGPASSWORD={password} && pg_dump -Fc -h {host} -p {port} -U"
        f" {user} {database} > {fpath_server} && touch new.pid"
    )
    <http://logger.info|logger.info>(f"Executing command: {command}")
    try:
        output = check_output(command, stderr=STDOUT, shell=True)

        logger.debug(f"Dumping output: {output}")
    except CalledProcessError as e:
        logger.error(f"Dumping failed: {e.output}")
        raise

    <http://logger.info|logger.info>("Dumping done")


@flow
def backup_precog_database_flow(
    database_uri_block="precog-database-uri",
    gdrive_folder_id="10_aDBuvOC3KFubzz2WmdlwSRPtTXlFMT",
    gdrive_token_block="gdrive-service-account-token",
    backup_fname_prefix_block="dev-backup-fname-prefix",
):
    global logger
    logger = get_run_logger()

    backup_to_server(
        database_uri_block, backup_fname_prefix_block, return_state=True
    )

    backup_to_drive(
        database_uri_block,
        gdrive_folder_id,
        gdrive_token_block,
        backup_fname_prefix_block,
    )
1️⃣ 1
I’ll mark them with the same emojis so it’s a bit easier to understand
c

Christopher Boyd

07/10/2023, 2:20 PM
in short - it’s using the concurrent task runner, and shouldn’t be because you aren’t performing any
.submit()
actions anywhere right?
a

Ajeel Ahmed

07/10/2023, 2:20 PM
yes
c

Christopher Boyd

07/10/2023, 2:20 PM
I believe the imports for your blocks use .submit natively, I need to check
a

Ajeel Ahmed

07/10/2023, 2:24 PM
that’s interesting, moreover I discussed this with you before but I’m still not sure so I’ll just quickly ask again example 1:
Copy code
def flow(): 
  task_1()
  task_2()
example 2:
Copy code
def flow(): 
  task_1(return_state=True)
  task_2()
c

Christopher Boyd

07/10/2023, 2:24 PM
I could be wrong there, i’d need to look - you can probably define the sequential_task_runner in the flow params, but yea, I don’t see anything in particular that should be requiring the task runner
ah
take off the
()
on your task decorators?
1
a

Ajeel Ahmed

07/10/2023, 2:26 PM
in example 1 if
task_1
fails
task_2
doesn’t run either but in example 2 if
task_1
fails
task_2
does run even without it why is this? why does my passing the
return_state
parameter cause such a difference in the output
oh wow I didn’t realise that could have made a difference!
I’m asking the above question because I don’t want to know my tasks
return_state
I want them to run regardless of the previous ones state
c

Christopher Boyd

07/10/2023, 2:29 PM
when you pass return_state, you are returning the state of the task, not the output of the task
so it sees the task has failed, and returns the Failed state
if you don’t add return_state, it jsut raises the error instead of capturing it
if you want to return futures of both state/data, you need to .submit() and return a future
a

Ajeel Ahmed

07/10/2023, 2:31 PM
yeah I think I would have to do use submit after all but I don’t need the future objects for anything, I’m only putting these functions as tasks so even if one fails I want the other to keep going
I’m basically making back ups in two different places
so I want them to run independently of each other
c

Christopher Boyd

07/10/2023, 2:36 PM
you don’t need to do anythign wiht the future object, you just have to .submit() to enable the task runner which allows them to run independently like that
a

Ajeel Ahmed

07/10/2023, 2:54 PM
so removing brackets from tasks doesn’t make a difference, and the thing is the
concurrently running
log message comes on even in the example I posted above (with the emoji 2️⃣) even though I’m not using tasks at all in that one
haha so I think I’m just going to use the
submit
and explicitly call the
sequential
task_runner
c

Christopher Boyd

07/10/2023, 2:56 PM
hrmm
a

Ajeel Ahmed

07/10/2023, 2:58 PM
AHA
seems like that
concurrently running
message comes on regardless of whether I use the tasks or not
wait nvm that’s not true, disregard above message
j

Jeff Hale

07/10/2023, 3:49 PM
What’s the result of running
prefect version
?