Hedgar
10/08/2022, 2:01 PMAaron Goebel
10/09/2022, 3:34 AMget_run_logger
. I've seen some references to PREFECT_LOGGING_EXTRA_LOGGERS
, but no legit examples for code that isn't library code. I have code like this at the top of all my utility function modules logging.getLogger("utils")
and have set PREFECT_LOGGING_EXTRA_LOGGERS='utils'
to no avail. Logs don't get streamed up to prefect UIBirkir Björnsson
10/09/2022, 3:28 PMsulfredlee
10/10/2022, 4:52 AMset_dependencies
I am trying to have this example:
@task(name="task_A")
def task_A() -(list, int):
test_output_list = list()
test_output_int = 10
return test_output_list, test_output_int
@task(name="task_B")
def task_B(input_a: list, input_b: int, input_c: float):
pirnt(f"{input_a}, {input_b}, {input_c},"
with Flow(name = "test_flow") as flow:
ret_a_list, ret_a_int = task_A()
task_B(ret_a_list, ret_a_int, 10.05)
I would like to change the implementation for the with Flow() as flow
to
test_flow = Flow(name="test_flow")
test_flow.set_dependencies([task_A, task_B], upstream_task=[task_A], downstream_tasks=[task_B])
I have 2 questions:
- is the set_dependencies
the correct function I should use?
- how to use set_dependencies
correctly to pass the output from task_A
to task_B
?
ThanksImre Kerr
10/10/2022, 8:33 AMRajvir Jhawar
10/10/2022, 1:21 PM"path": "/spec/template/spec/containers/0/env/-"
Which means the path for the volume mount should be:
"path": "/spec/template/spec/containers/0/volumeMounts/-"
When i use that path it doesn't work, any ideas?Mark
10/10/2022, 1:28 PMfrom prefect import flow
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def trigger_dbt_cli_commands_flow():
dbt_cli_profile = DbtCliProfile.load("dbtbase")
trigger_kwargs = dict(
profiles_dir="../dbt/data-dbt",
project_dir="../dbt/data-dbt/dbt-project/",
overwrite_profiles=False,
dbt_cli_profile=dbt_cli_profile,
)
trigger_dbt_cli_command(
"dbt deps",
**trigger_kwargs
)
return result
trigger_dbt_cli_commands_flow()
Jai P
10/10/2022, 2:34 PMprefect-snowflake
library to wrap snowpark APIs?Kun Situ
10/10/2022, 2:41 PMraise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '403 Forbidden' for url
Does anyone know how I can fix it, any help would be much appreciated!Jon
10/10/2022, 2:51 PMAttributeError: 'Parameter' object ...
. Why is the flow trying to run when I register it?
the more unclear thing is that when i declare a parameter, it returns a parameter object when i really just want the value that's passed to the flowDavid Cupp
10/10/2022, 4:20 PMsubmit()
method that lets you create task objects and them submit them directly, using wait_for
to specify dependencies.
Is there any similar mechanism for sub flows? I am interested in launching a "subflow" but without directly calling the annotated flow method, because I don't know which method I'm calling until runtime. For example, I would like to do this:
result = make_rpc_call()
newflow = Flow(
name=result.name,
parameters=result.parameters,
).submit(wait_for=[...])
is this possible?David Elliott
10/10/2022, 4:36 PMDavid Cupp
10/10/2022, 5:48 PM1 A = Task(...).submit()
...
5 B = Task(...).submit(wait_for=[A])
...
9 _ = Task(...).submit(wait_for=[B])
If an except is throw after line 1 and before line 9, is there an easy way to ensure that Task "A" does not start?Alexandru Anghel
10/10/2022, 6:06 PMGeorgiana Ogrean
10/10/2022, 6:38 PMConnectionRefusedError: [Errno 111] Connect call failed ('0.0.0.0', 4200)
The Orion UI comes up just fine when I do not try to build a deployment with:
prefect deployment build /app/my_package/flows/my_python_script.py:my_flow_name --name generate_deployment --tag dev --infra docker-container
Any ideas what I could be missing? More details in the thread.Kun Situ
10/10/2022, 8:04 PMLuca Schneider
10/10/2022, 8:16 PMJacqueline Garrahan
10/10/2022, 8:19 PMTaylor Babin
10/10/2022, 9:44 PMSeth Coussens
10/10/2022, 9:47 PMDeepanshu Aggarwal
10/10/2022, 10:29 PMHomesh Wathsalya
10/10/2022, 11:28 PMThar Htet San
10/11/2022, 4:49 AMZac Hooper
10/11/2022, 5:11 AMdescription = """
This is an example description
\n\n\n
Schedule: Every 3 minutes
"""
How it looks in the UI:Vadym Dytyniak
10/11/2022, 7:49 AMAndreas Nigg
10/11/2022, 8:45 AMmax
10/11/2022, 9:27 AMHieu Tran
10/11/2022, 9:53 AMimport asyncio
from prefect.client import get_client
from prefect.orion.schemas.states import StateType
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateType
async def main():
client = get_client()
state_type_filter = FlowRunFilterStateType(any_=[StateType.RUNNING])
state_filter = FlowRunFilterState(state_type_filter)
flow_filter = FlowRunFilter(state_filter)
flow_runs = await client.read_flow_runs(flow_filter)
print(flow_runs)
if __name__ == '__main__':
asyncio.run(main())
Traceback (most recent call last):
File "get_client_context.py", line 21, in <module>
asyncio.run(main())
File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "get_client_context.py", line 9, in main
state_filter = FlowRunFilterState(state_type_filter)
File "pydantic/main.py", line 333, in pydantic.main.BaseModel.__init__
TypeError: __init__() takes exactly 1 positional argument (2 given)
Stephen Lloyd
10/11/2022, 11:02 AMAdam Eury
10/11/2022, 1:45 PM