alexandre kempf
01/07/2020, 1:39 PMfrom prefect import task, Parameter, Flow
@task
def load_data(c, b):
return c
@task
def bugtask(c):
return c
with Flow("training") as flowModel:
init = Parameter("c")
data = load_data(init, b= {"f": 4})
# data = load_data(init, b= {"f": bugtask})
state_model = flowModel.run(c=5)
Now if you just use the comment instead of load_data
(basically, if you have a task in your arguments, even nested in other structured and not executed, there is an error.
It this expected ?
I have the feeling that it tries to run all the tasks, even if they are not called !
@josh @Dylan, This is a problem for subflows since I must give the configuration of the subflow as an argument of my run_flow task :sKamil Okáč
01/07/2020, 2:01 PMfrom prefect import Flow
from prefect.engine.executors.dask import DaskExecutor
import mytask
mt = mytask.MyTask()
with Flow("Flow") as flow:
t1 = mt(1)
executor = DaskExecutor(address='tcp://....:8786')
flow.run(executor=executor)
mytask,py:
from prefect import Task
class MyTask(Task):
def run(self, x):
return x
This leads to error on worker: "`ModuleNotFoundError: No module named 'mytask'`"
If I use @task decorator instead of subclassing, there's no problem.John Ramirez
01/07/2020, 4:09 PMtask.copy()
the upstream dependencies are removed but are task results copied as well?matt_innerspace.io
01/07/2020, 5:17 PMJake Schmidt
01/07/2020, 7:58 PMprefect.Parameter
in a call to a prefect.ShellTask
? Would I use a StringFormatter
task?matt_innerspace.io
01/07/2020, 9:05 PMhello_world.py
script? Lots of examples of what can happen inside the python script, but i'm missing how it would be deployed as a flow. I can see the flow runs inside a docker container, on an agent.
I can see how the prefect
cli tool can run a flow, but it seems the deployment and running are linked somehow?
For comparison, i use openfaas python functions which also run in docker containers and are deployed wherever has room to run it, but there is an explicit faas-cli push ...
to deploy to the cloud/cluster when you're ready.Sean
01/07/2020, 11:17 PMCreateSshTunnel
task does not work in general, as it is stateful and local - i.e. I don't think it would work when using the Dask executor (for example), as the tunnel task could execute on a different node than the postgres task. So what would a correct way to structure this be?Jeff Brainerd
01/09/2020, 2:22 AMmsgpack
to v1.0.0rc1
which broke Prefect on Dask. The symptom is errors like this when a flow finishes:
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
File "/Users/jeff/.local/share/virtualenvs/jellyfish-u52nBq9x/lib/python3.7/site-packages/distributed/protocol/core.py", line 106, in loads
header = msgpack.loads(header, use_list=False, **msgpack_opts)
File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: tuple is not allowed for map key
Reverting to msgpack 0.6.x
solved the issue. Easy to repro on prefect 0.8.1
and dask 2.8.1
and dask 2.9.1
. Not sure if this is something wonky in my environment or something real…matt_innerspace.io
01/09/2020, 3:16 PMmatt_innerspace.io
01/09/2020, 4:45 PMyuvipanda
01/09/2020, 6:28 PMyuvipanda
01/09/2020, 6:28 PMyuvipanda
01/09/2020, 6:28 PMyuvipanda
01/09/2020, 6:28 PMyuvipanda
01/09/2020, 6:29 PMAlexander Kolev
01/09/2020, 9:44 PMArsenii
01/10/2020, 2:09 AMflow.schedule = daily_schedule if daily else monthly_schedule
is acceptable inside the Flow itself, or is it better to do something outside it instead.
Thanks!David Mellor
01/10/2020, 4:02 PMalexandre kempf
01/13/2020, 3:53 PMjosh
01/13/2020, 10:10 PM0.9.0
sometime tomorrow or the day after and I’m giving an advanced heads up because there are some breaking changes based around the default functionality of Result Handlers. If you have been registering flows with Docker
storage then you should not see any difference in behavior 🙂
There are also some new features and a bunch of enhancements! The full changelog is available here https://github.com/PrefectHQ/prefect/blob/master/CHANGELOG.md#unreleasedBryan Whiting
01/14/2020, 9:12 PMScott Brownlie
01/14/2020, 9:22 PMLocalResultsHandler
does a similar thing, however I can't seem to get it to work as I expected.
I have implemented the following toy example:
from prefect.engine.result_handlers import LocalResultHandler
from prefect import task, Task, Flow
results_dir = 'prefect_results/'
@task(checkpoint=True)
def set_input():
return 10
@task(checkpoint=True)
def square(x):
return x**2
with Flow("test", result_handler=LocalResultHandler(dir=results_dir)) as flow:
task1 = set_input()
task2 = square(task1)
flow.run()
I would expect the output from each task to be saved to the specified directory automatically but it's not. Is that not what is supposed to happen?Bryan Whiting
01/15/2020, 6:30 PMZach
01/15/2020, 7:50 PMalexsis
01/16/2020, 11:58 AMFile "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 238, in get_context
return super().get_context(method)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 193, in get_context
ctx._check_available()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 306, in _check_available
raise ValueError('forkserver start method not available')
Has anybody had an experience with using Prefect in eventlet based frameworks?Jackson Maxfield Brown
01/17/2020, 7:26 PMflow.run(executor=DaskExecutor(address=dask_scheduler_address))
does it serialize the flow / computation graph and send it as a task to hold onto? Basically, if I have a dask scheduler spun up and I am working on my local machine, can I call flow.run(...)
and then shut down my machine?Sebastian
01/18/2020, 4:45 AMRyan Connolly
01/18/2020, 8:38 PMRyan Connolly
01/19/2020, 4:17 PMLOOP
mechanism from the functional api... but I was having a harder time grokking that concept.
I'll attach an example of the prefect-core code if anyone is interested.Ryan Connolly
01/19/2020, 4:18 PM