Benjamin Bonhomme
11/02/2021, 3:20 PMVamsi Reddy
11/02/2021, 4:29 PMale
11/02/2021, 5:27 PMCannot provide `task_definition_arn` when using `Docker` storage
Looking at the code at https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py it seems a by design choice.
What are the alternatives for running a flow on ECS while providing the task definition ARN?Greg Adams
11/02/2021, 7:23 PMWill
11/02/2021, 8:08 PMIf not provided, the default on the agent will be used (if configured).
). This does not happen for the task role, which is instead passed from the agent. This behaviour is unexpected; I would have assumed when passing a custom task definition, that the roles I have defined (task role and execution role) would not be overridden by prefect.
So, I've now modified naming convention for the task role for a particular flow, to test passing it via:
`task_role_arn (str, optional)`: The name or full ARN for the IAM role to use for this task. If not provided, the default on the agent will be used (if configured).
The role I have passed has full S3 permissions:
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
}
]
}
The flow itself uses the [S3Upload](https://docs.prefect.io/api/latest/tasks/aws.html#s3upload) action.
It fails with Error uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
Jacob Bedard
11/03/2021, 4:52 AMFreddie
11/03/2021, 10:36 AMArchitha Rao
11/03/2021, 11:35 AMBrett Naul
11/03/2021, 12:48 PMedge_on_conflict
in the prefect
or server
repos
File "/src/util/prefect.py", line 575, in register_flow
flow_id = client.register(flow, project_name, idempotency_key=flow.serialized_hash())
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1227, in register
self.graphql(
File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 569, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [***'path': ['register_edges'], 'message': '[***\'extensions\': ***\'path\': \'$.variableValues\', \'code\': \'validation-failed\'***, \'message\': "no such type exists in the schema: \'edge_on_conflict\'"***]', 'extensions': ***'code': 'INTERNAL_SERVER_ERROR'***]
Lukas N.
11/03/2021, 1:23 PMThomas Nyegaard-Signori
11/03/2021, 1:51 PMscale-down-delay-after-add
to 1m and similarly the scale-down-unneeded-time
to 1m.
The issues that we are facing is sometimes the task pods fail, seemingly without reason, and logs are quite unhelpful. My hunch is that is has something to do with scaling of the cluster, potentially destroying pods/losing networking between flow and task pod in the process? We are already setting the <http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: false
on all pods, so eviction shouldnt be the issue. Have anyone else had any experiences with k8s autoscaler settings leading to weird, intermittent task failures?Maxwell Varlack
11/03/2021, 3:16 PMSteve s
11/03/2021, 3:24 PMcreate_flow_run
. Today it started giving me the traceback pasted below. I commented out everything in the flow except for the very first create_flow_run
and it's still giving me this error (see thread):
Any ideas?Jake Place
11/03/2021, 4:21 PMStartFlowRun
storage information? Or a better way to store flows that need to operate in multiple environments?Will
11/03/2021, 4:42 PMsaml
11/03/2021, 5:56 PMHarish
11/03/2021, 7:04 PMKhuyen Tran
11/03/2021, 9:17 PMflow.run(parameters={"x": 8, "y": 9})
I wonder if there is a way we can do that with flow.register
as well? Something similar to this:
flow.register(project_name="My Project", parameters={"x": 8, "y": 9})
m
11/03/2021, 11:49 PM@task(log_stdout=True)
def run_mlflow(project_path, experiment):
mlflow.projects.run(
project_path, experiment_name=experiment,
)
and there my mlflow experiment where I would put prefect logging :
initialize(config_path="conf", job_name="gojob")
cfg = compose(config_name="config")
print(OmegaConf.to_yaml(cfg))
traking = cfg["var"]["MLFLOW_TRACKING_URI"]
params = {}
project_path = cfg["project_path"]
experiment = cfg["experiment"]
mlflow.set_tracking_uri(traking)
mlflow.set_experiment(experiment)
# print(subprocess.run(["ls"]))
with mlflow.start_run(nested=True):
set_env(cfg)
get_data = mlflow.run(project_path, "process_data", experiment_name=experiment)
train = mlflow.run(project_path, "train", experiment_name=experiment)
the logging from print() appear just in the task and not in the mlflow experiment
Thank you in advanceKarol
11/04/2021, 4:33 AMscript
and pickle
based storage as described here:
https://docs.prefect.io/orchestration/flow_config/storage.html#pickle-vs-script-based-storageAsH
11/04/2021, 8:18 AMAqib Fayyaz
11/04/2021, 9:30 AMMartin T
11/04/2021, 1:27 PMhost_config
to our DockerRun
config:
flow.storage = Docker(
registry_url=...,
image_name=...,
files={...},
env_vars={...},
python_dependencies=[...]
)
client = docker.APIClient()
host_config = client.create_host_config(mem_limit=12345,
mem_reservation=1234
)
flow.run_config = DockerRun(host_config=host_config)
flow.executor = LocalDaskExecutor()
When registered to Cloud, this seems to be ok, since starting a Run shows the following default in Host Config:
{
"Memory": 12345,
"MemoryReservation": 1234
}
However, this seems to have no effect on the the newly created flow containers.
docker stats
show that MEM USAGE keeps growing and a LIMIT that equals the total server memory.
docker inspect <CONTAINER> | grep \"Memory[\"R]
gives
"Memory": 0,
"MemoryReservation": 0,
What are we missing here?Louis Auneau
11/04/2021, 1:31 PMDeploymentSpec
with the location and name and register it, the output seems OK.
• prefect deployment ls
does list my deployment.
But:
• The UI does not show my deployment.
• The command prefect deployment inspect my_deployment
does nothing.
Let me know if you need more details or logs to investigate 🙂 !
thank you by advance and have an excellent day !Chris Arderne
11/04/2021, 2:14 PMAgent
or Executor
(or both). I read this SO answer already, along with this blog post on using an ECS Agent, which doesn't mention Dask.
Background: our workflows are heterogenous between tasks (some just moving some stuff around, some very parallel, some requiring GPU) and between runs (sometimes will be much more parallel), and very bursty. Some tasks are very long-lived and will spawn cloud containers of their own, but I assume Prefect doesn't need to care about that (unless we wanted those to be able to log back to Prefect).
VertexAgent/VertexRun
Following the blog post, we could set up a Vertex Agent
(just merged on GitHub, not on PyPI yet) as the agent. We could then set up a Flow-of-Flows (as can only specify VertexRun
parameters at the Flow level), specifying the image, CPU etc for each flow. This would by default use a LocalExecutor
, so any parallelism within that Flow would simply be run sequentially? But each separate run of that Flow would automatically spin up the needed instances.
DaskExecutor
If we wanted Task-level parallelism, we'd need to use the DaskExecutor
? So then we'd have a Vertex instances running the Agent
, and then a bunch of Vertex instances running `Flow`s, and then a Dask cluster running parallelised `Task`s?
Have I understood that correctly? Would there be a way to avoid Dask by eg parallelising at the flow-to-flow interface?Barbara Abi Khoriati
11/04/2021, 2:24 PMMartim Lobao
11/04/2021, 4:24 PMand
(the output of each task is a bool), but it looks like prefect started the child task when the first parent finished instead of waiting for both:
FIRST_TASK_DONE = first_task()
SECOND_TASK_DONE = second_task()
THIRD_TASK_READY = FIRST_TASK_DONE and SECOND_TASK_DONE
third_task(third_task_ready=THIRD_TASK_READY)
third_task
was supposed to only start after both first_task
and second_task
had finished, but instead it only waited for the first one to completeBret Haueter
11/04/2021, 5:02 PME Li
11/04/2021, 5:11 PMPhilip MacMenamin
11/04/2021, 5:31 PMprefect agent docker start
results in
requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff5ba1a23a0>: Failed to establish a new connection: [Errno 111] Connection refused'))