Marcos San Miguel
11/02/2021, 9:41 AMKlemen Strojan
11/02/2021, 11:14 AMOndřej Melichar
11/02/2021, 11:32 AMBilly McMonagle
11/02/2021, 2:28 PMhaf
11/02/2021, 2:32 PMBenjamin Bonhomme
11/02/2021, 3:20 PMVamsi Reddy
11/02/2021, 4:29 PMale
11/02/2021, 5:27 PMCannot provide `task_definition_arn` when using `Docker` storage
Looking at the code at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py it seems a by design choice.
What are the alternatives for running a flow on ECS while providing the task definition ARN?Greg Adams
11/02/2021, 7:23 PMWill
11/02/2021, 8:08 PMIf not provided, the default on the agent will be used (if configured).
). This does not happen for the task role, which is instead passed from the agent. This behaviour is unexpected; I would have assumed when passing a custom task definition, that the roles I have defined (task role and execution role) would not be overridden by prefect.
So, I've now modified naming convention for the task role for a particular flow, to test passing it via:
`task_role_arn (str, optional)`: The name or full ARN for the IAM role to use for this task. If not provided, the default on the agent will be used (if configured).
The role I have passed has full S3 permissions:
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
}
]
}
The flow itself uses the [S3Upload](https://docs.prefect.io/api/latest/tasks/aws.html#s3upload) action.
It fails with Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
Jacob Bedard
11/03/2021, 4:52 AMFreddie
11/03/2021, 10:36 AMArchitha Rao
11/03/2021, 11:35 AMBrett Naul
11/03/2021, 12:48 PMedge_on_conflict
in the prefect
or server
repos
File "/src/util/prefect.py", line 575, in register_flow
flow_id = client.register(flow, project_name, idempotency_key=flow.serialized_hash())
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1227, in register
self.graphql(
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [***'path': ['register_edges'], 'message': '[***\'extensions\': ***\'path\': \'$.variableValues\', \'code\': \'validation-failed\'***, \'message\': "no such type exists in the schema: \'edge_on_conflict\'"***]', 'extensions': ***'code': 'INTERNAL_SERVER_ERROR'***]
Lukas N.
11/03/2021, 1:23 PMThomas Nyegaard-Signori
11/03/2021, 1:51 PMscale-down-delay-after-add
to 1m and similarly the scale-down-unneeded-time
to 1m.
The issues that we are facing is sometimes the task pods fail, seemingly without reason, and logs are quite unhelpful. My hunch is that is has something to do with scaling of the cluster, potentially destroying pods/losing networking between flow and task pod in the process? We are already setting the <http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: false
on all pods, so eviction shouldnt be the issue. Have anyone else had any experiences with k8s autoscaler settings leading to weird, intermittent task failures?Maxwell Varlack
11/03/2021, 3:16 PMSteve s
11/03/2021, 3:24 PMcreate_flow_run
. Today it started giving me the traceback pasted below. I commented out everything in the flow except for the very first create_flow_run
and it's still giving me this error (see thread):
Any ideas?Jake Place
11/03/2021, 4:21 PMStartFlowRun
storage information? Or a better way to store flows that need to operate in multiple environments?Will
11/03/2021, 4:42 PMsaml
11/03/2021, 5:56 PMHarish
11/03/2021, 7:04 PMKhuyen Tran
11/03/2021, 9:17 PMflow.run(parameters={"x": 8, "y": 9})
I wonder if there is a way we can do that with flow.register
as well? Something similar to this:
flow.register(project_name="My Project", parameters={"x": 8, "y": 9})
m
11/03/2021, 11:49 PM@task(log_stdout=True)
def run_mlflow(project_path, experiment):
mlflow.projects.run(
project_path, experiment_name=experiment,
)
and there my mlflow experiment where I would put prefect logging :
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)
# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):
set_env(cfg)
get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)
train = mlflow.run(project_path, "train", experiment_name=experiment)
the logging from print() appear just in the task and not in the mlflow experiment
Thank you in advanceKarol
11/04/2021, 4:33 AMscript
and pickle
based storage as described here:
https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storageAsH
11/04/2021, 8:18 AMAqib Fayyaz
11/04/2021, 9:30 AMMartin T
11/04/2021, 1:27 PMhost_config
to our DockerRun
config:
flow.storage = Docker(
registry_url=...,
image_name=...,
files={...},
env_vars={...},
python_dependencies=[...]
)
client = docker.APIClient()
host_config = client.create_host_config(mem_limit=12345,
mem_reservation=1234
)
flow.run_config = DockerRun(host_config=host_config)
flow.executor = LocalDaskExecutor()
When registered to Cloud, this seems to be ok, since starting a Run shows the following default in Host Config:
{
"Memory": 12345,
"MemoryReservation": 1234
}
However, this seems to have no effect on the the newly created flow containers.
docker stats
show that MEM USAGE keeps growing and a LIMIT that equals the total server memory.
docker inspect <CONTAINER> | grep \"Memory[\"R]
gives
"Memory": 0,
"MemoryReservation": 0,
What are we missing here?Louis Auneau
11/04/2021, 1:31 PMDeploymentSpec
with the location and name and register it, the output seems OK.
• prefect deployment ls
does list my deployment.
But:
• The UI does not show my deployment.
• The command prefect deployment inspect my_deployment
does nothing.
Let me know if you need more details or logs to investigate 🙂 !
thank you by advance and have an excellent day !Chris Arderne
11/04/2021, 2:14 PMAgent
or Executor
(or both). I read this SO answer already, along with this blog post on using an ECS Agent, which doesn't mention Dask.
Background: our workflows are heterogenous between tasks (some just moving some stuff around, some very parallel, some requiring GPU) and between runs (sometimes will be much more parallel), and very bursty. Some tasks are very long-lived and will spawn cloud containers of their own, but I assume Prefect doesn't need to care about that (unless we wanted those to be able to log back to Prefect).
VertexAgent/VertexRun
Following the blog post, we could set up a Vertex Agent
(just merged on GitHub, not on PyPI yet) as the agent. We could then set up a Flow-of-Flows (as can only specify VertexRun
parameters at the Flow level), specifying the image, CPU etc for each flow. This would by default use a LocalExecutor
, so any parallelism within that Flow would simply be run sequentially? But each separate run of that Flow would automatically spin up the needed instances.
DaskExecutor
If we wanted Task-level parallelism, we'd need to use the DaskExecutor
? So then we'd have a Vertex instances running the Agent
, and then a bunch of Vertex instances running `Flow`s, and then a Dask cluster running parallelised `Task`s?
Have I understood that correctly? Would there be a way to avoid Dask by eg parallelising at the flow-to-flow interface?Chris Arderne
11/04/2021, 2:14 PMAgent
or Executor
(or both). I read this SO answer already, along with this blog post on using an ECS Agent, which doesn't mention Dask.
Background: our workflows are heterogenous between tasks (some just moving some stuff around, some very parallel, some requiring GPU) and between runs (sometimes will be much more parallel), and very bursty. Some tasks are very long-lived and will spawn cloud containers of their own, but I assume Prefect doesn't need to care about that (unless we wanted those to be able to log back to Prefect).
VertexAgent/VertexRun
Following the blog post, we could set up a Vertex Agent
(just merged on GitHub, not on PyPI yet) as the agent. We could then set up a Flow-of-Flows (as can only specify VertexRun
parameters at the Flow level), specifying the image, CPU etc for each flow. This would by default use a LocalExecutor
, so any parallelism within that Flow would simply be run sequentially? But each separate run of that Flow would automatically spin up the needed instances.
DaskExecutor
If we wanted Task-level parallelism, we'd need to use the DaskExecutor
? So then we'd have a Vertex instances running the Agent
, and then a bunch of Vertex instances running `Flow`s, and then a Dask cluster running parallelised `Task`s?
Have I understood that correctly? Would there be a way to avoid Dask by eg parallelising at the flow-to-flow interface?Kevin Kho
11/04/2021, 2:20 PMLocalExecutor
, but we commonly see LocalDaskExecutor
+ ECSRun
to utilize all of the cores of the machine that the flow is running on. It also helps to specify the num_workers
. LocalDaskExecutor
is already enough to utilize the cores of your machine.
If you are using Vertex
+ DaskExecutor
, I think it would normally be an external cluster like DaskExecutor(cluster_address_here)
. DaskExecutor
on the Vertex
compute would not yield anything over the LocalDaskExecutor
.
On the last question of avoiding Dask to parallelize at the Flow level. When you do StartFlowRun
or create_flow_run
, it won’t wait for the subflow to complete by default. So the behavior that you see should be kicking off the Flow run, and then moving on to kicking off the next Flow runs. So yes, this spins up a bunch of Vertex jobs running simultaneouslyChris Arderne
11/04/2021, 3:01 PMLocalDask
wouldn't help much. I haven't played much with the flow-to-flow idiom, but it seems like it should be possible to do the following four `Flow`s:
1. “Master” flow gets kicked off by Agent
2. Starts a simple sequential flow that prepares a bunch of things and determines how parallel subsequent steps will be.
3. Result from that is passed to a StartFlowRun
with .map()
so a bunch of parallel Flow
runs are kicked off on separate Vertex instances
4. Results from that are reduced back down to a single sequential Flow
that summarises and eg inserts results into our database
So all of these would use wait=True
, but the middle Flow would be parallelised… Does that make sense/should that work?Kevin Kho
11/04/2021, 3:58 PMcreate_flow_run
, wait_for_flow_run
and get_task_run_result
. You would use the get_task_run_result
to fetch the result from another flow, but you need to know the task slug of the task with the result you want to fetch ahead of time.wait_for_flow_run
and others you can just create and not wait.get_task_run_result
, you can also have those subflows persist that data somewhere and then load it in the main flow after waiting for the completion.Chris Arderne
11/04/2021, 6:25 PMKevin Kho
11/04/2021, 6:33 PM