Joshua Greenhalgh
04/20/2023, 4:27 PMJoseph Thickpenny Ryan
04/20/2023, 4:50 PMon_completion
and on_failure
that would be send via the subflows? (or if it's possible at all?)
There are some cases where the subflow is itself useful, but we've incorporated it into a flow of flows as well so we would want notifications from both places but not in both situationsTheo Platt
04/20/2023, 7:03 PMfrom prefect.client.client import Client
c = Client(api_token='xxxxxxxxxx')
params = {'param1': 'Foo', 'param2': 'Bar'}
c.create_flow_run(
version_group_id='xxxx-xxxxxx-xxxxxx-xxxxx',
run_name='my_run_name',
parameters = params)
But I can't see a good example of how to do the same with v2! I think I'm missing something fundamental! ThanksChoenden Kyirong
04/20/2023, 8:56 PMMichał Augoff
04/20/2023, 10:01 PMMatt Alhonte
04/21/2023, 3:20 AMprefect register
CLI command, using the ECSRun
Run Config. The schedule is defined in the .py
file and passed as an argument to the flow (using a list of CronClock
objects passed to a Schedule
object).Stephen Lloyd
04/21/2023, 4:43 AMMichail Melonas
04/21/2023, 8:55 AMJustin Trautmann
04/21/2023, 9:42 AMFederico Zambelli
04/21/2023, 10:00 AM@flow
def first_flow():
...
@flow
def second_flow():
...
def main():
first_flow()
second_flow()
What if in certain situations, i want to run only the 2nd flow? Is that possible / reasonable ?sandeep kumar
04/21/2023, 10:49 AMfrom prefect import task, flow, get_run_logger, allow_failure
from prefect.task_runners import SequentialTaskRunner
@flow
def flow_1(p: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f" \n\n ----------------Flow 1 --{p}-------------\n\n")
return 5
@flow
def flow_2(m: int):
raise ValueError("Non-deterministic error has occured.")
@flow
def combine_data(p: list) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"-\n\n------------Combining data -- {p}-------------\n\n")
return sum(p)
@flow
def additional_flow():
logger = get_run_logger()
<http://logger.info|logger.info>(f"-\n\n------------additional_flow -- {p}-------------\n\n")
@flow
def clean_up_task(result: int):
logger = get_run_logger()
<http://logger.info|logger.info>(f"\n\nCleaning up 🧹 --- {result}-----\n\n")
<http://logger.info|logger.info>("\n\nCleaning ------------------------------ up 🧹\n\n")
@flow(task_runner=SequentialTaskRunner)
def allow_flaky_transformation_to_pass():
f1 = flow_1(1)
f2 = flow_2(2)
result = combine_data.submit(
[f1, f2], wait_for=[allow_failure(f1), allow_failure(f2)]) # wait for all upstream flows
additional_flow()
clean_up_task.submit() # This should always run even if all of the above flow fails
if __name__ == "__main__":
allow_flaky_transformation_to_pass()
Blue Radar
04/21/2023, 11:34 AMprefect agent start -p <WORK_POOL> --api <WORK_SPACE_API>
But when i try to do the same in the Kubernetes deployment. It is trying to create a new work queue even if the work queue is already present in the work pool and getting failed.Slackbot
04/21/2023, 1:15 PMGiacomo Chiarella
04/21/2023, 1:23 PMQin XIA
04/21/2023, 1:48 PMGiacomo Chiarella
04/21/2023, 1:50 PMget_run_logger().info(get_client().read_deployment(FlowRunContext.get().flow_run.deployment_id).name)
in a flow and I get getAttributeError: 'coroutine' object has no attribute 'name'
I think this is something related to async coroutines. How can I solve it?Phil Maffettone
04/21/2023, 3:18 PMBen Ayers-Glassey
04/21/2023, 5:50 PM2023-04-21T15:16:04.920982508Z /usr/local/lib/python3.8/runpy.py:127: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; thi
s may result in unpredictable behaviour
2023-04-21T15:16:04.921075059Z warn(RuntimeWarning(msg))
2023-04-21T15:16:04.997977913Z 15:16:04.996 | INFO | Flow run 'phi321-lysenko-tensor' - Created task run 'generate_operation_id-a9e17ed9-0' for task 'generate_operation_id'
2023-04-21T15:16:04.999103995Z 15:16:04.997 | INFO | Flow run 'phi321-lysenko-tensor' - Executing 'generate_operation_id-a9e17ed9-0' immediately...
2023-04-21T15:16:05.043778806Z 15:16:05.042 | INFO | Task run 'generate_operation_id-a9e17ed9-0' - Task run '43baff18-8292-4cd7-a694-2dcf6d1a0c77' already finished.
2023-04-21T15:16:05.119806977Z 15:16:05.044 | ERROR | Flow run 'phi321-lysenko-tensor' - Encountered exception during execution:
2023-04-21T15:16:05.119866174Z Traceback (most recent call last):
2023-04-21T15:16:05.119875789Z File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
2023-04-21T15:16:05.119882275Z result = await run_sync(flow_call)
2023-04-21T15:16:05.119888470Z File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
2023-04-21T15:16:05.119896590Z return await anyio.to_thread.run_sync(
2023-04-21T15:16:05.119904639Z File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
2023-04-21T15:16:05.119911921Z return await get_asynclib().run_sync_in_worker_thread(
2023-04-21T15:16:05.119917612Z File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
2023-04-21T15:16:05.119924062Z return await future
...etc...
2023-04-21T15:16:05.120058513Z File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
2023-04-21T15:16:05.120063946Z raise self._exception
2023-04-21T15:16:05.120069588Z File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1068, in get_task_call_return_value
2023-04-21T15:16:05.120078014Z return await future._result()
2023-04-21T15:16:05.120085086Z File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
2023-04-21T15:16:05.120091377Z return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
2023-04-21T15:16:05.120097177Z File "/usr/local/lib/python3.8/site-packages/prefect/states.py", line 100, in _get_state_result
2023-04-21T15:16:05.120103360Z raise MissingResult(
2023-04-21T15:16:05.120110136Z prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
2023-04-21T15:16:05.141538481Z 15:16:05.140 | INFO | prefect.engine - Engine execution of flow run '5c996762-8edc-4584-9f70-4d1d6adf2b3b' aborted by orchestrator: This run has already terminated.
Chris Goddard
04/21/2023, 7:20 PMscott
04/21/2023, 10:07 PMon_completion
and on_failure
hooks ignored when a task state is Cached
? That seems to be the case, but AFAIK there’s no docs (https://github.com/PrefectHQ/prefect/issues/8967)Faheem Khan
04/21/2023, 10:54 PMKevin Takano
04/22/2023, 6:46 PMDavid Frischer
04/23/2023, 9:20 AMprefect deployments build \
my_flow.py:my_func_flow \
--name super-cool-deployment \
--path /usr/src/app \
--work-queue test \
--skip-upload \
--apply
I would like to add 30m interval with max 15 runs.
Can I do it through terminal ? Or do I need to use a yml file ?
Thanks in advance community !shekhar koirala
04/23/2023, 10:52 PMprefect_client.create_flow_run(flow_id=pf['flow_id_prefect'], parameters=pf["parameters"], run_name=pf["run_name"],idempotency_key=run_key)
Now, I updated the logic with v2 deployment and deployed it using .yaml file.
I could see the deployment in prefect deployment ls
and could trigger it from cli using prefect deployment run --params
but I want to trigger it using the python api.
I found run_deployment
method from prefect.deployments import run_deployment
but this wait till all the process finishes which is not what I want. I dont want polling.
Please do suggest, How I could move forward with it ?Deceivious
04/24/2023, 7:48 AMTarek
04/24/2023, 8:41 AMTuomas Heiskanen
04/24/2023, 9:38 AMEmma Rizzi
04/24/2023, 11:57 AMJari Rosti
04/24/2023, 12:24 PMval = tasks.my_task.with_options(timeout_seconds=600)()
val2 = tasks.my_2nd_task(val)
It seems that .with_options
, .submit
etc. do not define their return types. Prefect version is 2.9.0.shekhar koirala
04/24/2023, 12:33 PMfrom prefect.deployments import run_deployment
I have a weird problem, when I use the line of code, logger in my script only works for warning and error not for info and debug. any hints how to solve this ?