Michał Augoff
10/19/2022, 7:43 PMscikit-learn
providing a flow to train a machine learning model and you can deploy it with your own model class like
Deployment.build_from_flow(flow=sklearn.flows.training, …, parameters={"model": SomePythonClass})
Is that even feasible?
Trevor Kramer
10/19/2022, 7:51 PMfrom prefect import flow, task
@task
def task1(x: int) -> int:
return x + 10
@task
def task2(x: int) -> int:
return -x
@flow()
def run_my_flow(n: int):
task2.map(task1.map(range(n)))
if __name__ == "__main__":
n = 500
print(run_my_flow(n))
David Beck
10/19/2022, 8:13 PMis_anonymous
set to True
. Is it important to also set the flag for overwrite=True
if the anonymous is one that is saved numerous times ala a CI/CD process?Atul Vig
10/19/2022, 8:44 PMDavid Beck
10/19/2022, 9:39 PMMichał
10/19/2022, 10:40 PMJohn Ramey
10/20/2022, 1:19 AMTypeError: task() got an unexpected keyword argument 'nout'
— is the nout
concept longer required?Thomas Pedersen
10/20/2022, 7:27 AMVadym Dytyniak
10/20/2022, 8:16 AMwonsun
10/20/2022, 9:41 AMFailed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Traceback (most recent call last):
File "/home/da/enviorments/bdi/lib/python3.10/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
state = self.client.set_task_run_state(
File "/home/da/enviorments/bdi/lib/python3.10/site-packages/prefect/client/client.py", line 1604, in set_task_run_state
result = self.graphql(
File "/home/da/enviorments/bdi/lib/python3.10/site-packages/prefect/client/client.py", line 464, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'State payload is too large.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
Actually, it wasn't the first time that task failed, and before that, it ran for about 3 minutes and then informed me that no heartbeat detected. (No heartbeat detected from the remote task; marking the run as failed.
) So, the solution I found was configure heartbeats to use threads instead of processes and worte about the flow run config in the .py file. When I did that, the task of receiving parameters was performed longer than the first... (1st try : 3 minutes running -> 2nd try: 12 minutes running) Although the task was executed for a longer time, it was still a failure. 😞
How can i solve this problem? What's the problem of my engineering? This flow may have been written in the wrong way, so I also wrote the flow code below..
import...
from prefect.run_configs import UniversalRun
def custom_function():
'''some works'''
return output
@task
def parsing_waveforms(download):
processing_target = download
'''some works by using above custom_function'''
with Flow('flow_waveforms')as flow:
heir = Parameter('download')
task1 = parsing_waveforms(download=heir)
flow.run_config = UniversalRun(env={'PREFECT__CLOUD__HEARTBEAT_MODE'}:'thread')
flow.register(project_name='data_factory')
Sudharshan B
10/20/2022, 11:23 AMKlemen Strojan
10/20/2022, 12:02 PMdev-prefect-2
.
Which makes no sense - why would I need privileges in the default
namespace? I can run this with Prefect 2.3 without issues.Lennert Van de Velde
10/20/2022, 12:06 PMTrevor Kramer
10/20/2022, 1:42 PMJessica Smith
10/20/2022, 3:14 PMAshoka Sangapallar
10/20/2022, 4:28 PMAshoka Sangapallar
10/20/2022, 4:28 PMAshoka Sangapallar
10/20/2022, 4:34 PMKelvin DeCosta
10/20/2022, 4:42 PMECSTask
.
My question: Do I need to install dependencies like prefect-aws
, s3fs
etc. and register the blocks (eg: prefect block register -m prefect_aws.ecs
) in the container for the agent? Or can I just use the latest prefecthq/prefect
image?Kalise Richmond
10/20/2022, 4:47 PMAlix Cook
10/20/2022, 4:52 PMStefan Rasmussen
10/20/2022, 5:00 PMJohn Ramey
10/20/2022, 5:05 PMget_run_logger
be called in each task/flow? i have a flow with ~20 tasks. and after upgrading to Prefect 2, I have 20 different calls to get_run_logger
— is there a better way to adhere to DRY?Iuliia Volkova
10/20/2022, 7:41 PMBen Muller
10/20/2022, 10:30 PMcreate_flow_run
if that subflow then triggers more "subsubflows" if one of them fail, will my master flow know about it ? is there a way to propagate any of these signales?Tomás Emilio Silva Ebensperger
10/21/2022, 12:14 AMresult = self.result.write(value, **formatting_kwargs)
TypeError: LocalResult.write() got multiple values for argument 'self'
Deepanshu Aggarwal
10/21/2022, 5:17 AMNic
10/21/2022, 8:37 AMredsquare
10/21/2022, 9:19 AMRobin Weiß
10/21/2022, 9:33 AMRunning
state and start a K8s job & pod, the other 100 Flow Runs just idle around in “Pending”. I have searched everywhere to find hints about why they won’t start. There is definitely enough K8s computing resources for them to be scheduled, there is no concurrency limits set via tags or on the work queue directly. The K8s work queue just shows them as “Pending” in the UI. Does anyone have any idea where else to look? Cheers 🙂