Jason Carter
07/23/2020, 7:22 PMprefect backend server, prefect server start and prefect agent start
in that order but no flow in the UI.
import prefect
from prefect import task, Flow
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud!")
flow = Flow("hello-flow", tasks=[hello_task])
# flow.run()
flow.register()
Ashish Arora
07/23/2020, 11:08 PMJeremy Knickerbocker
07/24/2020, 5:31 AMJason Carter
07/24/2020, 5:44 PMfirst_flow.py
do I have to run python first_flow.py
in order to get it to register the change? ( flow.register()
is in my py file)
2. I'm now seeing my flow in the UI but when I try to run it via the UI (as a one off run) it goes into "scheduled to run" but late and never runs (i just mark as cancel after 3mins).
import prefect
from prefect import task, Flow
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud!")
flow = Flow("hello-flow", tasks=[hello_task])
# flow.run()
flow.register()
https://prefect-community.slack.com/archives/CL09KU1K7/p1595532174143800Jan Feřtek
07/24/2020, 5:57 PMfrom prefect import task, Flow, Parameter
from prefect.tasks.templates import StringFormatter
from prefect.tasks.shell import ShellTask
msg_task = StringFormatter(template='{my_string}')
shell_task = ShellTask()
with Flow("string-template") as flow:
name = Parameter('name')
msg_output = msg_task(my_string=name)
shell_output_static = shell_task(command=f"echo Arthur")
shell_output_dynamic = shell_task(command=f"echo {msg_output}")
flow_state = flow.run(name='Marvin')
print(flow_state.result[msg_output].result)
# Marvin
print(flow_state.result[shell_output_static].result)
# Arthur
print(flow_state.result[shell_output_dynamic].result)
# Command failed with exit code 2
Avi A
07/24/2020, 7:21 PMPedro Machado
07/24/2020, 8:12 PMTsang Yong
07/24/2020, 8:48 PMMatt Wong-Kemp
07/24/2020, 8:58 PMDaskExecutor
is it safe to share the dask cluster between the execution of the tasks and direct use of dask?
The use case here is I have a large number of small tasks I'd like to do in parallel, followed by some large joined data analysis. I'd like to gain the concurrency from running on a dask cluster in my flows, but at the same time I want to perform some data analysis using the dask Dataframe
class and distribute this across a cluster as well. If I provision the cluster myself, is it safe to share the scheduler between the flow and the dataframe library? Or should I expect to need to provision my own dask cluster inside a task to run my Dataframe code on?Mike Ninov
07/24/2020, 11:16 PMAvi A
07/25/2020, 8:39 PMDaskExecutor
. The flow fails due to high memory usage by the dask worker which is being killed again and again endlessly.
Questions:
1. It seems that Dask/Prefect don’t serialize the completed tasks outputs when the worker gets to high memory usage. I’m not that proficient in Dask, I used Spark a lot and I know that it dumps to disk whenever memory is running low. How can I configure Dask to do the same (or is it a prefect thing?). Worth to mention that the prefect agent, dask scheduler and worker are all on the same machine.
2. Is there some way to have the flow fail in this case? It keeps restarting the worker but not failing, so I don’t get a message that this failure happens
Thanks!Daniel
07/25/2020, 10:41 PMAlfie
07/26/2020, 10:07 AMimport prefect
from prefect import task, Flow
@task
def hello_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud!")
flow = Flow("hello-flow", tasks=[hello_task])
flow.register(labels=['public'])
But the label of a flow run is:
"type": "LocalEnvironment",
"labels": [
"public",
"AppledeMacBook-Pro.local"
],
Is there a way to get rid of label “AppledeMacBook-Pro.local” for it impacts the agent to pick up the flow run? thanksVikram Iyer
07/27/2020, 5:45 AMPREFECT_SERVER__GRAPHQL_URL
is set to http://localhost:4200/graphql but I want to change it some other URL. How can I achieve this?delphi
07/27/2020, 7:09 AMdelphi
07/27/2020, 7:26 AMRobin
07/27/2020, 8:45 AMAdam
07/27/2020, 9:05 AMArsenii
07/27/2020, 10:18 AMMatthew Maldonado
07/27/2020, 12:06 PMMatthew Maldonado
07/27/2020, 1:12 PMkarteekaddanki
07/27/2020, 1:18 PMflow.run()
locally it seems to work. Here is the code snippet I am using for registering my flow. I am using Docker
storage.
if args.base_image:
storage = Docker(
base_image=args.base_image,
registry_url=args.registry_url,
local_image=args.local_image)
for flow in flows:
path = storage.add_flow(flow)
print(f"storing flow {flow.name} at {path} in the image.")
storage = storage.build()
for flow in flows:
flow.storage = storage
flow.environment = LocalEnvironment(
executor=LocalDaskExecutor(),
labels=args.labels)
flow.register(project_name=args.project, build=False)
Am I doing something wrong? I am on the developer version of the cloud backend.
EDIT: To be more precise, when I run flow.run()
I see concurrency on the mapped tasks, but not when using the cloud API (even for a single flow).Adam
07/27/2020, 2:50 PMLeo Meyerovich (Graphistry)
07/27/2020, 3:13 PMLeo Meyerovich (Graphistry)
07/27/2020, 3:14 PMJared
07/27/2020, 3:21 PMZach
07/27/2020, 5:17 PMTarcisio
07/27/2020, 6:01 PMSven Teresniak
07/27/2020, 6:41 PM$PREFECT__HOME_DIR
. I see my pickled flows and a lot of pickled results. But Prefect seems not to be aware of that.
The last log line is btw Flow 'session-creation-watchdog': Handling state change from Scheduled to Running
. In a successful run, the next log line would be the start of my first task.
I don't know where to start to debug that. What can I do to better understand the problem? Is there a timeout or can I force a "PING" to the flow to identify the real state?mithalee mohapatra
07/27/2020, 7:07 PMmithalee mohapatra
07/27/2020, 7:07 PMKyle Moon-Wright
07/27/2020, 7:13 PMmithalee mohapatra
07/27/2020, 7:25 PM