Charles
07/02/2024, 6:19 PMMarvin
07/02/2024, 6:20 PMGitHub
file system from prefect.filesystems
has been removed. Instead, you should use prefect_github.GitHubRepository
.
- The Infrastructure
class from prefect.infrastructure.base
is also removed. Use the BaseWorker
class for custom infrastructure integrations. Refer to the upgrade guide for agents to workers for more information.
2. Updated Documentation and Guides:
- There's an ongoing effort to update documentation and guides to align with 3.0 changes. Ensure to check the Prefect documentation frequently for the latest updates.
3. Issue Tracking and Additional Help:
- Refer to existing GitHub issues and pull requests for real-world examples and community contributions:
- Migration helper improvements
- Removing agents and updating to workers
- Ordering of flow run parameters
For detailed steps and the latest updates, please refer to the Prefect 3.0 documentation and the corresponding migration guides.
If you have specific parts of your Prefect 2.0 setup that you'd like help with migrating, providing those details would allow for more tailored guidance.
Sources:
- Migration helper improvements: https://github.com/PrefectHQ/prefect/pull/14348
- Remove agents and update to workers: https://github.com/PrefectHQ/prefect/pull/13631
- Ordering of flow run parameters: https://github.com/PrefectHQ/prefect/issues/8596Charles
07/02/2024, 8:21 PMMarvin
07/02/2024, 8:22 PMPREFECT_API_URL
might have undergone some changes. Here's some pertinent information from recent GitHub issues:
1. Setting the PREFECT_API_URL
:
- When starting the server, if PREFECT_API_URL
is not set, you'll be prompted to set it.
- Example interaction:
bash
❯ prefect server start
`PREFECT_API_URL` is not set. You need to set it to communicate with the server.
? How would you like to proceed? [Use arrows to move; enter to select]
Create a new profile with `PREFECT_API_URL` set and switch to it
> Set `PREFECT_API_URL` in the current profile: 'local_postgres'
? Enter the `PREFECT_API_URL` value (<http://127.0.0.1:4200/api>):
Set `PREFECT_API_URL` to '<http://127.0.0.1:4200/api>' in the current profile 'local_postgres'
2. Prompt to Set when Not Set:
- You will be prompted to set PREFECT_API_URL
when starting the server if it’s not previously set.
- This ensures that the server can communicate correctly, and you won’t experience unexpected errors or connectivity issues.
3. Error Handling Improvement:
- If PREFECT_API_URL
is not set and an operation fails, clearer error messages are now provided. This guides users to set this configuration to avoid misconfigurations.
For detailed settings and configurations, you may refer to the Prefect documentation or review the GitHub issues and pull requests related to this feature:
- Prompt user to set `PREFECT_API_URL`
- Improve exception message when `PREFECT_API_URL` is unset
If you still face issues, feel free to provide specific details, and I’ll assist you further.Charles
07/02/2024, 11:42 PMimport asyncio
import os
import subprocess
import time
from time import sleep
import numpy as np
from kdbconn.kdbconn import KdbConn
from kdbconn.qpython import qconnection
from prefect import flow, get_run_logger, task
from prefect.context import get_run_context
from prefect.runtime import task_run
GET_KDB_CONN_RETRY_DELAY = 10 # secs
GET_KDB_CONN_MAX_RETRIES = 30 # 30 * 10sec = 5min
@task()
async def grab_hmaster_port(env):
jogger = get_run_logger()
host = get_kdb_env_host(env)
user = os.getlogin()
# new_port = None
for retry in range(GET_KDB_CONN_MAX_RETRIES):
try:
with qconnection.QConnection(host, 5000, user, timeout=3.0) as hmaster:
desc = "Access Port"
q_string = 'getClientPort[`%s;`%s;`$"%s"]' % (env, "access", desc)
new_port = hmaster(q_string)
break
except Exception as e:
if retry + 1 < GET_KDB_CONN_MAX_RETRIES:
jogger.warning(
"Exception while getting access port from hmaster, retrying in {} seconds (Attempt {}/{})".format(
GET_KDB_CONN_RETRY_DELAY, retry + 1, GET_KDB_CONN_MAX_RETRIES
)
)
sleep(GET_KDB_CONN_RETRY_DELAY)
else:
raise e
return new_port
@task()
async def grab_access_port_handle(env, port):
jogger = get_run_logger()
host = get_kdb_env_host(env)
user = os.getlogin()
for retry in range(GET_KDB_CONN_MAX_RETRIES):
try:
conn = KdbConn(host, port=port, user=user)
break
except Exception as e:
if retry + 1 < GET_KDB_CONN_MAX_RETRIES:
jogger.warning(
"Exception while connecting to access port, retrying in {} seconds (Attempt {}/{})".format(
GET_KDB_CONN_RETRY_DELAY, retry + 1, GET_KDB_CONN_MAX_RETRIES
)
)
sleep(GET_KDB_CONN_RETRY_DELAY)
else:
raise e
return conn
def get_kdb_env_host(env):
return (
subprocess.Popen(
"\\\\ccl\\data\\ExtLib\\kdbenv.exe " + env, stdout=subprocess.PIPE
)
.communicate()[0]
.decode("utf-8")
.split("\r")[0]
)
def generate_task_name():
parameters = task_run.parameters
task_name = parameters["task_name"]
return f"{task_name}"
@task(
retries=3,
retry_delay_seconds=5,
task_run_name=generate_task_name,
persist_result=True,
)
async def async_query(query, conn, task_name, delay=0):
jogger = get_run_logger()
<http://jogger.info|jogger.info>("Starting task: " + task_name)
full_query = query
result = conn.query_async_wait([np.string_("sync_query"), full_query])
<http://jogger.info|jogger.info>("Completed task: " + task_name)
return result
@flow
async def set_flags(env, injected_code):
logger = get_run_logger()
hmaster_port_number = await grab_hmaster_port("qdevd")
conn = grab_access_port_handle.submit(env, hmaster_port_number)
conn1 = grab_access_port_handle.submit(env, hmaster_port_number)
conn2 = grab_access_port_handle.submit(env, hmaster_port_number)
test = get_run_context()
<http://logger.info|logger.info>("All dates used are .z.D-1, as the testing environment is qdevd")
sample = time.ctime()
sample_2 = time.ctime()
print("CURRENT TIME ", time.ctime())
initial_cache_posted = async_query.with_options(
tags=["data", "dataloader"], name="1_data_wait_initial_cache_posted_77"
).submit(
".wf.wait[`INITIAL_CACHE_POSTED_77;1f;1f;120f]",
conn,
task_name="1_data_wait_initial_cache_posted_77",
)
fundamental_extracted = async_query.with_options(
tags=["data", "dataloader"], name="2_data_wait_fundamental_extracted"
).submit(
".wf.waitS[`FUNDAMENTAL_EXTRACTED;5f;1f;120f];",
conn1,
task_name="2_data_wait_fundamental_extracted",
)
injected_code = async_query.with_options(
tags=["adhoc", "kdb"], name="3_injected_code (setting flags)"
).submit(injected_code, conn2, task_name="3_injected_code (setting flags)")
return
# def deploy():
# deployment = Deployment.build_from_flow(
# flow=set_flags,
# name="kdb_earnings_demo",
# parameters={
# "env": "qdevd",
# "injected_code": ".wf.set[`INITIAL_CACHE_POSTED_77]; .wf.setS[`FUNDAMENTAL_EXTRACTED]; .wf.set[`IBES_Cycle6_Loaded_77];",
# },
# )
# deployment.apply()
if __name__ == "__main__":
# deploy()
asyncio.run(
set_flags(
env="qdevd",
injected_code=".wf.set[`INITIAL_CACHE_POSTED_77]; .wf.setS[`FUNDAMENTAL_EXTRACTED]; .wf.set[`IBES_Cycle6_Loaded_77];",
)
)
Error Message:
$ python Script/earnings_demo/test.py
16:40:20.597 | DEBUG | prefect.client - Connecting to API at <http://v-tm-qeq-05:4200/api/>
16:40:20.626 | INFO | Task run 'grab_access_port_handle-0' - Finished in state Completed()
16:40:20.674 | INFO | Task run 'grab_access_port_handle-1' - Finished in state Completed()
16:40:20.705 | ERROR | Task run '1_data_wait_initial_cache_posted_77' - Task run was aborted: The enclosing flow must be running to begin task execution.
16:40:20.709 | ERROR | Task run '1_data_wait_initial_cache_posted_77' - Finished in state Pending()
Please wait for all submitted tasks to complete before exiting your flow by calling `.wait()` on the `PrefectFuture` returned from your `.submit()` calls.
Example:
from prefect import flow, task
@task
def say_hello(name):
print f"Hello, {name}!"
@flow
def example_flow():
say_hello.submit(name="Marvin)
say_hello.wait()
example_flow()
16:40:20.736 | ERROR | Task run '2_data_wait_fundamental_extracted' - Task run was aborted: The enclosing flow must be running to begin task execution.
16:40:20.740 | ERROR | Task run '2_data_wait_fundamental_extracted' - Finished in state Pending()
Please wait for all submitted tasks to complete before exiting your flow by calling `.wait()` on the `PrefectFuture` returned from your `.submit()` calls.
Example:
from prefect import flow, task
@task
def say_hello(name):
print f"Hello, {name}!"
@flow
def example_flow():
say_hello.submit(name="Marvin)
say_hello.wait()
example_flow()
Marvin
07/02/2024, 11:42 PMMy code is returning an error with PRefect 3.0 that did not error out with PRefect 2.x:Code: ```import asyncio import os import subprocess import time from time import sleep import numpy as np from kdbconn.kdbconn import KdbConn from kdbconn.qpython import qconnection from prefect import flow, get_run_logger, task from prefect.context import get_run_context from prefect.runtime import task_run GET_KDB_CONN_RETRY_DELAY = 10 # secs GET_KDB_CONN_MAX_RETRIES = 30 # 30 * 10sec = 5min @task() async def grab_hmaster_port(env): jogger = get_run_logger() host = get_kdb_env_host(env) user = os.getlogin() # new_port = None for retry in range(GET_KDB_CONN_MAX_RETRIES): try: with qconnection.QConnection(host, 5000, user, timeout=3.0) as hmaster: desc = "Access Port" q_string = 'getClientPort[`%s;`%s;`$"%s"]' % (env, "access", desc) new_port = hmaster(q_string) break except Exception as e: if retry + 1 < GET_KDB_CONN_MAX_RETRIES: jogger.warning( "
Charles
07/02/2024, 11:45 PMMarvin
07/02/2024, 11:45 PMCan you explain why this code snippet returns an error in Prefect 3.0 but not with 2.0?import asyncio import os import subprocess import time from time import sleep import numpy as np from kdbconn.kdbconn import KdbConn from kdbconn.qpython import qconnection from prefect import flow, get_run_logger, task from prefect.context import get_run_context from prefect.runtime import task_run GET_KDB_CONN_RETRY_DELAY = 10 # secs GET_KDB_CONN_MAX_RETRIES = 30 # 30 * 10sec = 5min @task() async def grab_hmaster_port(env): jogger = get_run_logger() host = get_kdb_env_host(env) user = os.getlogin() # new_port = None for retry in range(GET_KDB_CONN_MAX_RETRIES): try: with qconnection.QConnection(host, 5000, user, timeout=3.0) as hmaster: desc = "Access Port" q_string = 'getClientPort[`%s;`%s;`$"%s"]' % (env, "access", desc) new_port = hmaster(q_string) break except Exception as e: if retry + 1 < GET_KDB_CONN_MAX_RETRIES: jogger.warning( "Exception while getting access port
Charles
07/02/2024, 11:48 PMCharles
07/02/2024, 11:52 PMMarvin
07/02/2024, 11:52 PMCan you explain why this code snippet returns an error in Prefect 3.0 but not with 2.0?GET_KDB_CONN_RETRY_DELAY = 10 # secs GET_KDB_CONN_MAX_RETRIES = 30 # 30 * 10sec = 5min @task() async def grab_hmaster_port(env): jogger = get_run_logger() host = get_kdb_env_host(env) user = os.getlogin() # new_port = None for retry in range(GET_KDB_CONN_MAX_RETRIES): try: with qconnection.QConnection(host, 5000, user, timeout=3.0) as hmaster: desc = "Access Port" q_string = 'getClientPort[`%s;`%s;`$"%s"]' % (env, "access", desc) new_port = hmaster(q_string) break except Exception as e: if retry + 1 < GET_KDB_CONN_MAX_RETRIES: jogger.warning( "Exception while getting access port from hmaster, retrying in {} seconds (Attempt {}/{})".format( GET_KDB_CONN_RETRY_DELAY, retry + 1, GET_KDB_CONN_MAX_RETRIES ) ) sleep(GET_KDB_CONN_RETRY_DELAY) else: raise e return new_port @task() async def
Charles
07/02/2024, 11:53 PM@flow
async def set_flags(env, injected_code):
logger = get_run_logger()
hmaster_port_number = await grab_hmaster_port("qdevd")
conn = grab_access_port_handle.submit(env, hmaster_port_number)
conn1 = grab_access_port_handle.submit(env, hmaster_port_number)
conn2 = grab_access_port_handle.submit(env, hmaster_port_number)
test = get_run_context()
<http://logger.info|logger.info>("All dates used are .z.D-1, as the testing environment is qdevd")
sample = time.ctime()
sample_2 = time.ctime()
print("CURRENT TIME ", time.ctime())
initial_cache_posted = async_query.with_options(
tags=["data", "dataloader"], name="1_data_wait_initial_cache_posted_77"
).submit(
".wf.wait[`INITIAL_CACHE_POSTED_77;1f;1f;120f]",
conn,
task_name="1_data_wait_initial_cache_posted_77",
)
fundamental_extracted = async_query.with_options(
tags=["data", "dataloader"], name="2_data_wait_fundamental_extracted"
).submit(
".wf.waitS[`FUNDAMENTAL_EXTRACTED;5f;1f;120f];",
conn1,
task_name="2_data_wait_fundamental_extracted",
)
injected_code = async_query.with_options(
tags=["adhoc", "kdb"], name="3_injected_code (setting flags)"
).submit(injected_code, conn2, task_name="3_injected_code (setting flags)")
Marvin
07/02/2024, 11:53 PMCan you explain why this throws an error in Prefect 3.0 but not 2.0?```@flow async def set_flags(env, injected_code): logger = get_run_logger() hmaster_port_number = await grab_hmaster_port("qdevd") conn = grab_access_port_handle.submit(env, hmaster_port_number) conn1 = grab_access_port_handle.submit(env, hmaster_port_number) conn2 = grab_access_port_handle.submit(env, hmaster_port_number) test = get_run_context() logger.info("All dates used are .z.D-1, as the testing environment is qdevd") sample = time.ctime() sample_2 = time.ctime() print("CURRENT TIME ", time.ctime()) initial_cache_posted = async_query.with_options( tags=["data", "dataloader"], name="1_data_wait_initial_cache_posted_77" ).submit( ".wf.wait[`INITIAL_CACHE_POSTED_77;1f;1f;120f]", conn, task_name="1_data_wait_initial_cache_posted_77", ) fundamental_extracted = async_query.with_options( tags=["data", "dataloader"], name="2_data_wait_fundamental_extracted" ).submit( ".wf.waitS[`FUNDAMENTAL_EXTRACTED;5f;1f;120f];
Charles
07/02/2024, 11:54 PM@flow
async def set_flags(env, injected_code):
logger = get_run_logger()
hmaster_port_number = await grab_hmaster_port("qdevd")
conn = grab_access_port_handle.submit(env, hmaster_port_number)
conn1 = grab_access_port_handle.submit(env, hmaster_port_number)
conn2 = grab_access_port_handle.submit(env, hmaster_port_number)
test = get_run_context()
<http://logger.info|logger.info>("All dates used are .z.D-1, as the testing environment is qdevd")
sample = time.ctime()
sample_2 = time.ctime()
print("CURRENT TIME ", time.ctime())
initial_cache_posted = async_query.with_options(
tags=["data", "dataloader"], name="1_data_wait_initial_cache_posted_77"
).submit(
".wf.wait[`INITIAL_CACHE_POSTED_77;1f;1f;120f]",
conn,
task_name="1_data_wait_initial_cache_posted_77",
)
fundamental_extracted = async_query.with_options(
tags=["data", "dataloader"], name="2_data_wait_fundamental_extracted"
).submit(
".wf.waitS[`FUNDAMENTAL_EXTRACTED;5f;1f;120f];",
conn1,
task_name="2_data_wait_fundamental_extracted",
)
Marvin
07/02/2024, 11:54 PMCan you explain why this throws an error in Prefect 3.0 but not Prefect 2.0?```@flow async def set_flags(env, injected_code): logger = get_run_logger() hmaster_port_number = await grab_hmaster_port("qdevd") conn = grab_access_port_handle.submit(env, hmaster_port_number) conn1 = grab_access_port_handle.submit(env, hmaster_port_number) conn2 = grab_access_port_handle.submit(env, hmaster_port_number) test = get_run_context() logger.info("All dates used are .z.D-1, as the testing environment is qdevd") sample = time.ctime() sample_2 = time.ctime() print("CURRENT TIME ", time.ctime()) initial_cache_posted = async_query.with_options( tags=["data", "dataloader"], name="1_data_wait_initial_cache_posted_77" ).submit( ".wf.wait[`INITIAL_CACHE_POSTED_77;1f;1f;120f]", conn, task_name="1_data_wait_initial_cache_posted_77", ) fundamental_extracted = async_query.with_options( tags=["data", "dataloader"], name="2_data_wait_fundamental_extracted" ).submit( ".wf.waitS[`FUNDAMENTAL_EXTRACTED;5f;1f;120
Charles
07/02/2024, 11:55 PM@flow
async def set_flags(env, injected_code):
logger = get_run_logger()
hmaster_port_number = await grab_hmaster_port("qdevd")
conn = grab_access_port_handle.submit(env, hmaster_port_number)
conn1 = grab_access_port_handle.submit(env, hmaster_port_number)
conn2 = grab_access_port_handle.submit(env, hmaster_port_number)
test = get_run_context()
<http://logger.info|logger.info>("All dates used are .z.D-1, as the testing environment is qdevd")
sample = time.ctime()
sample_2 = time.ctime()
print("CURRENT TIME ", time.ctime())
initial_cache_posted = async_query.with_options(
tags=["data", "dataloader"], name="1_data_wait_initial_cache_posted_77"
).submit(
".wf.wait[`INITIAL_CACHE_POSTED_77;1f;1f;120f]",
conn,
task_name="1_data_wait_initial_cache_posted_77",
)
Marvin
07/02/2024, 11:55 PMgrab_access_port_handle.submit
might have changed in Prefect 3.0. Ensure that these methods are correctly invoked according to the new API.
3. Exception Handling and Context Awareness:
- Prefect 3.0 could be more stringent about context management and the state of tasks/flows, especially with asynchronous operations.
Here's a detailed analysis of the potential causes by revisiting the flow and its context:
### Potential Issue in Your Code:
- Context Management:
Prefect 3.0 might be stricter about ensuring that you use the correct context within async operations.
- Task and Flow API Changes:
Ensure that calls like grab_access_port_handle.submit
are used correctly in the new version.
- Logger and Context:
Using get_run_logger
and get_run_context
within async functions might have some differences in behavior.
### Example of Code Analysis:
python
@flow
async def set_flags(env, injected_code):
logger = get_run_logger()
hmaster_port_number = await grab_hmaster_port("qdevd")
conn = grab_access_port_handle.submit(env, hmaster_port_number)
conn1 = grab_access_port_handle.submit(env, hmaster_port_number)
conn2 = grab_access_port_handle.submit(env, hmaster_port_number)
test = get_run_context() # Ensure this is properly used in async context
<http://logger.info|logger.info>("All dates used are .z.D-1, as the testing environment is qdevd")
sample = time.ctime()
sample_2 = time.ctime()
print("CURRENT TIME ", time.ctime())
initial_cache_posted = async_query.with_options(
tags=["data", "dataloader"],
name="1_data_wait_initial_cache_posted_77"
).submit(
".wf.wait[`INITIAL_CACHE_POSTED_77;1f;1f;120f]",
conn,
task_name="1_data_wait_initial_cache_posted_77",
)
### Steps to Resolve:
1. Review Prefect 3.0 Documentation:
- Consult the Prefect 3.0 documentation for any changes that may impact your flow.
2. Verify Async Task Execution:
- Ensure tasks like grab_access_port_handle.submit
are correctly written and compatible with Prefect 3.0.
3. Check Logger Usage:
- Verify that get_run_logger
and get_run_context
are properly used within your async context.
4. Update Imports and Methods:
- Review any changes in method names, signatures, or required parameters from Prefect 2.0 to 3.0.
5. Community and Issue Tracking:
- Check GitHub issues such as Handling of signals in Prefect 3.0 and other related discussions for community-driven solutions.
For further assistance, providing specific error messages or stack traces you encounter can help identify the exact cause and necessary adjustments.