kevin
09/10/2020, 7:36 PMJohn Song
09/10/2020, 8:07 PMJohn Song
09/10/2020, 8:08 PMZach
09/10/2020, 9:01 PMZach
09/10/2020, 9:02 PMZach
09/10/2020, 9:05 PMcloudpickle<1.5
, but prefect needs dstributed==2.25.0
and distributed
needs cloudpickle>=1.5.0
Dylan
09/10/2020, 9:07 PMJulian
09/10/2020, 9:44 PMwith Flow("scale_presto_cluster", result=get_default_local_result()) as flow:
scale_down = Parameter("scale_down_capacity", required=False)
scale_up = Parameter("scale_up_capacity", required=False)
with PrestoCluster(n_worker_low=scale_down, n_worker_high=scale_up) as client:
run_flow_task_stats_weekly = FlowRunTask(flow_name="dummy-weekly", project_name='default', wait=True)
run_flow_task_stats_weekly(parameters=get_weekly_fromto())
with case(check_if_monthly_run, True):
run_flow_task_stats_monthly = FlowRunTask(flow_name="dummy-monthly", project_name='default', wait=True)
run_flow_task_stats_monthly(parameters=get_monthly_fromto(), upstream_tasks=[run_flow_task_stats_weekly])
I used a ResourceManager as documented in https://docs.prefect.io/api/latest/tasks/resources.html#functions
I noticed that the cleanup task wasn't called after I cancelled scale_presto_cluster
flow run during it's execution, but was cancelled instead as well.
Since this flow will be run nightly, I mus't ensure that the cleanup
task of the Flow will be triggered no matter what. How can I achieve this? Is a task even the right approach?Rob Fowler
09/10/2020, 10:05 PMRob Fowler
09/10/2020, 10:08 PMGitanshu Sardana
09/11/2020, 1:20 AMbackend = "server"
[server]
host = "<http://172.18.1.3>"
[server.ui]
graphql_url = "<http://172.18.1.3:4200/graphql>"
[logging]
# The logging level: NOTSET, DEBUG, INFO, WARNING, ERROR, or CRITICAL
level = "WARNING"
this is what my config.toml looks likeGitanshu Sardana
09/11/2020, 2:33 AMVipul
09/11/2020, 7:22 AMMarvin
09/11/2020, 7:22 AMLeonard Marcq
09/11/2020, 7:57 AMdocker-compose.yml
from prefect/cli/
, modify it to point to your db and up
, or do you pull the whole repo (prefect or prefect_server?), or install prefect via pip
and pass it a custom .toml
config somehow and spin up your server with `prefect server start`; or do you do something else entirely?karteekaddanki
09/11/2020, 8:28 AMSven Teresniak
09/11/2020, 8:49 AMtag
s for tasks, label
s for flows, but not possible to use tags or labels for flow-runs, right?
Now, the idea is to write a ressourcemanager as kind of Lock (reentrant or not, semaphore or whatever flavor). Then you could (hopefully) enter a RessourceManager with
-block the same way you would use a Lock
in Python code.
Something like
1. setup()
searches (like in "asking Prefect") for all flows-runs with a given tag (that is, a run with a flow that has this certain tag set)
2. if setup()
does NOT find another flow-run with that tag, the tag is set (i don't know yet where and how)
3. if setup()
DOES find find another matching flow-run (or RM): then do some polling or return with successful error (configuration)
4. then all the tasks in the RM's with
environment run (this is the part that should be synchronized/locked)
5. on exit the RMs cleanup()
is called to remove the tag/label/whatever from the current flow (to allow other flows to set the tag)
Do you think this is a good idea? Which/where is the best way to store that tag? Using the Context, Parameters, Tags, Labels? If you think this could be a nice feature we could create a prototype and pull request.
We need to prevent a race condition here and for that we need to make the execution of several task isolated (the I in ACID in RDBS)Jordan
09/11/2020, 9:49 AMMarvin
09/11/2020, 9:49 AMJuan
09/11/2020, 1:50 PMKyle Pierce
09/11/2020, 2:08 PMMarvin
09/11/2020, 2:09 PMNate Joselson
09/11/2020, 2:24 PMMarvin
09/11/2020, 2:24 PMJoe Lyman
09/11/2020, 4:15 PMwith Flow("myflow") as myflow:
# tasks
myflow.register(project_name="myproj")
Now, separately in b.py
I run a file watcher that needs to trigger "myflow" (in a non-blocking fashion) when a file arrives. I stumbled across:
FlowRunTask(project_name="myproj", flow_name="myflow")
but I don't know if this is correct. I considered from a import myflow; myflow.run()
in b.py
also - although the flows seemed to run, they didn't appear in the UI.ARun
09/11/2020, 4:17 PMSkip Breidbach
09/11/2020, 6:12 PMMarvin
09/11/2020, 6:12 PMNick McCullum
09/11/2020, 7:06 PMMarvin
09/11/2020, 7:06 PMMarvin
09/11/2020, 7:06 PM