Joshua Grant
07/18/2023, 4:28 PMasyncio.exceptions.TimeoutErrorException in ASGI applicationasyncpg/protocol/protocol.pyxFile "/usr/local/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 407, in run_asgiPREFECT_API_DATABASE_PASSWORDPREFECT_API_DATABASE_PASSWORD is set but not included in the PREFECT_API_DATABASE_CONNECTION_URL. The provided password will be ignored.Marvin
07/18/2023, 4:29 PMPREFECT_API_DATABASE_PASSWORDPREFECT_API_DATABASE_CONNECTION_URLasyncio.exceptions.TimeoutErrorException in ASGI applicationPREFECT_API_DATABASE_PASSWORD is set but not included in the PREFECT_API_DATABASE_CONNECTION_URL. The provided password will be ignored.PREFECT_API_DATABASE_CONNECTION_URLJoshua Grant
07/18/2023, 4:33 PMMarvin
07/18/2023, 4:33 PMJoshua Grant
07/18/2023, 4:45 PMMarvin
07/18/2023, 4:46 PMPREFECT_API_DATABASE_PASSWORD is set but not included in the PREFECT_API_DATABASE_CONNECTION_URL. The provided password will be ignored.PREFECT_API_DATABASE_PASSWORDPREFECT_API_DATABASE_CONNECTION_URLmysecretPREFECT_API_DATABASE_PASSWORDbash
export PREFECT_API_DATABASE_PASSWORD=mysecretPREFECT_API_DATABASE_CONNECTION_URLbash
export PREFECT_API_DATABASE_CONNECTION_URL=<postgresql://USER:mysecret@localhost:5432/mydatabase>USERmysecretlocalhost5432mydatabasePREFECT_API_DATABASE_CONNECTION_URLPREFECT_API_DATABASE_PASSWORDJoshua Grant
07/18/2023, 4:55 PMPREFECT_API_DATABASE_CONNECTION_URLPREFECT_API_DATABASE_PASSWORDasyncpgMarvin
07/18/2023, 4:55 PMPREFECT_API_DATABASE_CONNECTION_URLPREFECT_API_DATABASE_PASSWORDasyncpgasyncpgpostgresql://asyncpgpython
import asyncpg
conn = await asyncpg.connect('<postgresql://USER:mysecret@localhost:5432/mydatabase>')Joshua Grant
07/20/2023, 2:05 PMMarvin
07/20/2023, 2:05 PMJoshua Grant
07/20/2023, 5:54 PMMarvin
07/20/2023, 5:54 PMpython
from prefect import Flow
# create your subflow
with Flow("My Subflow") as subflow:
    # define your subflow tasks here
# use your subflow in another flow
with Flow("My Main Flow") as main_flow:
    subflow_task = subflow(name="My Subflow Task")
# now, "My Subflow Task" will show up in your logs and UIJoshua Grant
07/20/2023, 6:32 PM"""sample flow to test the tasks."""
from prefect import flow, task
from prefect_dask import DaskTaskRunner
from prefect.task_runners import SequentialTaskRunner
from concurrent.futures import ProcessPoolExecutor, as_completed
@task
def say_goodbye_task(names):
    return run_concurrently(names)
def run_concurrently(names):
    result = []
    with ProcessPoolExecutor(max_workers=2) as executor:
        result_future = [executor.submit(say_goodbye,name) for name in names]
        for future in as_completed(result_future):
            result.append(future.result())
    return result
def say_goodbye(name):
    print(f"Goodbye {name}")
    return f"Goodbye {name}"
@task
def say_hello(name):
    print(f"hello {name}")
# Sub flow
@flow(task_runner=SequentialTaskRunner)
def seq_greetings(names):
    return say_goodbye_task(names)
# Main flow
@flow(task_runner=DaskTaskRunner(
        cluster_kwargs={"n_workers": 4, "threads_per_worker": 5}
    ))
def greetings(names):
    # Calling some task
    hs = say_hello.map(names)
    for h in hs:
        h.wait()
    # Calling sub flow
    seq_result = seq_greetings(names, name="my_flow_")
    # Calling task using result from sub flow
    say_hello.map(seq_result)
if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])Marvin
07/20/2023, 6:32 PMnameJoshua Grant
07/20/2023, 6:35 PMMarvin
07/20/2023, 6:35 PMJoshua Grant
07/20/2023, 6:36 PMMarvin
07/20/2023, 6:36 PMJoshua Grant
07/21/2023, 11:54 AMMarvin
07/21/2023, 11:54 AMJoshua Grant
07/21/2023, 12:31 PMMarvin
07/21/2023, 12:31 PMJoshua Grant
07/24/2023, 1:50 PMMarvin
07/24/2023, 1:51 PMalembic historyalembic currentJoshua Grant
07/24/2023, 3:49 PMMarvin
07/24/2023, 3:50 PMJoshua Grant
07/24/2023, 3:55 PM