John Ramirez
05/06/2020, 1:12 PMDask Cloud Provider Environment
and want to know if this env would be the best way to accomplish this goalJie Lou
05/06/2020, 2:24 PMCronClock("00 16 * * *",parameter_defaults=MY_PARAMETER_1)
to schedule one flow. And then I also have another flow with different batch of parameters to be scheduled at the same time, CronClock("00 16 * * *",parameter_defaults=MY_PARAMETER_2)
. And then I set flow.schedule=Schedule(clocks=[clock1,clock2])
, and then register the flow. In cloud UI, I just see one flow scheduled instead of two. If I tweak the time a bit, i.e., set CronClock("05 16 * * *",parameter_defaults=MY_PARAMETER_2)
, then two flows are scheduled as expected. It seems like if two flows are scheduled at the same time, then only on will be picked. It’d better if it allows multiple flows scheduled at the same time.Jim Crist-Harif
05/06/2020, 3:45 PMEnvironment
class per dask-cluster class, and it seems a bit untenable. I've been thinking about making a generic dask environment that takes the cluster-manager class and kwargs and uses that to create a dask cluster. Since dask already has a spec'd interface for this, it seems significantly simpler than having a mirror of each of these in prefect. Something like (class name not decided):
environment = DaskClusterEnvironment(
cls=dask_yarn.YarnCluster,
kwargs={
"queue": "engineering",
"environment": "hdfs://..."
}
)
David Ojeda
05/06/2020, 4:29 PM[2020-05-06 18:24:40] WARNING - py.warnings | /home/david/.virtualenvs/iguazu-env/lib/python3.8/site-packages/prefect/client/client.py:576: U
serWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming task runs from failure may not wor
k properly.
but the flow constructor docstring says:
- result_handler (ResultHandler, optional, DEPRECATED): the handler to use for
retrieving and storing state results during execution
Should I add a no-op result handler to quiet that warning, or just ignore it?Dan DiPasquo
05/06/2020, 9:10 PMMatthias
05/06/2020, 10:13 PMprefect server start
manually? Or is there a way to run prefect server start
inside another Docker container? I tried to add the Docker socket as a volume but that did not work.Manuel Mourato
05/06/2020, 11:04 PMf_run=test_flow1.run()
test_flow1.visualize(flow_state=f_run)
Anyone had this issue before?Chris Vrooman
05/07/2020, 6:09 AM@task
def my_function(x, y):
print(x+y)
with Flow(name="my_flow") as flow:
# Run 1st
my_function(1, 2)
# Run 2nd
my_function(3, 4)
# Run 3rd
my_function(5, 6)
Ketan Bhikadiya
05/07/2020, 8:33 AMManuel Mourato
05/07/2020, 8:36 AMenv = RemoteEnvironment(
executor="prefect.engine.executors.DaskExecutor",
executor_kwargs={
"local_processes": True
}
)
The execution DAG is in the annexed image.
My understing is that , using multiprocesing, Task 2 and Task 4 would execute at the same time after Task 1, but in different processes, because they do not depend on each other. But the behaviour I see, is them executing sequentially, like wait() is being called between each process.
Is my understanding not correct?Zviri
05/07/2020, 8:47 AMCloudTaskRunner
but not the plain TaskRunner
(using Dask Deployment).
I was observing that during the "mapping" procedure the worker that was doing the actual mapping was continuously using more and more memory.
Which seemed reasonable since mapping constitutes copying the mapped task. However, I noticed that when using the CloudTaskRunner
memory consumption is much much higher during this step.
To be specific, mapping from a list that only contained approximately 8000 elements has eaten up more than 4 GB of memory on the worker.
I did some debugging and found out that the same mapped task has a serialized size of 15 200 bytes using TaskRunner
, but 122 648 bytes using the CloudTaskRunner
.
This is almost a 10 fold increase which makes the mapping function pretty unusable for me. The increased size is ultimately coming from pickling this function: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/engine/task_runner.py#L788
and I think the serialized size of the CloudTaskRunner
class is the cause of the different sizes.
Is this behavior something that is known? Or is it worth a bug report?
I will stick to the plain TaskRunner
for now which will, unfortunately, prevent me from using the cloud UI which I really like. It would be great if this could be fixed.
I'm using the latest prefect (v 0.10.7)Adrien Boutreau
05/07/2020, 9:36 AMMatthias
05/07/2020, 1:49 PMMatthias
05/07/2020, 2:01 PMprefect agent start
in my main Docker container the process picks up the flowsDavid Ojeda
05/07/2020, 2:53 PMDaskKubernetesEnvironment
and a kubernetes agent on a k8s GCP cluster that has auto-scaling (I am not sure if this is relevant).
The strange situation is that, sometimes, the flow stays on running indefinetely because the last 1 to 3 tasks of the map are “still running”. I will try to collect some logs when this happens, but it does not look like the workers are stuck; it seems more like they were killed a bit too soon and they could not inform the prefect server (or the dask scheduler?) that they finished…
Has anyone encountered this strange situation?Matias Godoy
05/07/2020, 2:55 PMManuel Mourato
05/07/2020, 4:00 PMtask_sequence = [load_data, task1, task2]
test_flow1.chain(*task_sequence)
which I saved to a file locally, and then loaded it via the test_flow2=Flow.load(path)
method.
Now, I want to add a new task3 to this flow, but I want to make load_data an upstream dependency of this new task, like this:
test_flow2.set_dependencies(
task=task3
upstream_tasks=[load_data])
But I get the error: A task with the slug "Task1" already exists in this flow
It seems to complain about load data already being defined in the flow, which it is. But what I want is to say load_data is a dependency of task3
What am I doing wrong?Manuel Mourato
05/07/2020, 6:08 PMrv = reduce(self.proto)
File "stringsource", line 2, in jnius.JavaClass.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
A Cython object as far as I was able to search cannot be pickeled, unless specific code is add like so :
https://stackoverflow.com/questions/12646436/pickle-cython-class
My goal here, is to store this flow so that I can run an agent which will then execute it when I run via the UI. If I just register the flow in the UI , but dont save it, I get this error:
[2020-05-07 17:43:48,846] INFO - agent | flow = storage.get_flow(storage.flows[flow_data.name])
[2020-05-07 17:43:48,846] INFO - agent | KeyError: 'Test-Flow55'
My question is: did someone have an issue like this before? Can I get around this in some way?John Ramirez
05/07/2020, 8:10 PMDaskCloudProviderEnvironment
and I keep getting this error
botocore.exceptions.NoRegionError: You must specify a region.
distributed.deploy.spec - WARNING - Cluster closed without starting up
any ideas?John Ramirez
05/07/2020, 8:49 PM0.10.7
and found this error
Traceback (most recent call last):
File "main.py", line 4, in <module>
from prefect.environments import DaskCloudProviderEnvironment
ImportError: cannot import name 'DaskCloudProviderEnvironment' from 'prefect.environments' (/Users/johnramirez/projects/client/superset/aws-fargate-poc/venv/lib/python3.7/site-packages/prefect/environments/__init__.py)
Manuel Mourato
05/07/2020, 9:50 PMmatta
05/08/2020, 12:56 AMmatta
05/08/2020, 12:57 AMZviri
05/08/2020, 8:03 AMSimon Basin
05/08/2020, 1:01 PMJohn Ramirez
05/08/2020, 1:10 PMDaskKubernetesEnvironment
with a custom YAML spec, can you still specify min & max workers in the classAdrien Boutreau
05/08/2020, 3:24 PM{
"graphQLErrors": [],
"networkError": {},
"message": "Network error: NetworkError when attempting to fetch resource."
}
From a python job :
<http://localhost:4200/graphql/alpha>
{'data': {'flow': [{'name': 'Zoom_test'}, {'name': 'Zoom_test'}, {'name': 'Zoom_test'}, {'name': 'Zoom_test'}, {'name': 'Zoom_test'}]}}
I update hosts file by
xxx.eu-west-2.compute.amazonaws.com localhost:4200
but without successJohn Ramirez
05/08/2020, 4:01 PMDaskKubernetesEnvironment
to run my workflows. There will be times where I will need to submit a high volume of flows to the cluster. My expectation is that the auto-scaler will spin up the additional EC2 instances required to handle to temporary volume increase but I am concerned about the latency (that is the time it takes to make the additional resources available). Is there a setting on the retry the entire flow if it fails? That way it reduces the gap.Jeremiah
Matthias
05/08/2020, 10:24 PMtask1
and task2
(defined by the @task
decorator), which are both storing data in a database. for integrity i need to make sure that task1
finished before task2
starts. the tasks do not need to return anything. how do i make the dependency explicit and tell task2
to wait for task1
?