Tim Enders
01/21/2022, 9:04 PMwith Flow() as flow:
pattern?Michael Bell
01/21/2022, 9:26 PMprefect
and dask-cloudprovider
right now. It seems dask-cloudprovider[aws]
relies on aiobotocore
which pins to a very specific botocore
version and that's causing conflicts when trying to set up my environment. Anyone have any experience with this?Madison Schott
01/21/2022, 9:33 PMAlex To
01/21/2022, 11:10 PMbareflow
)
Our use case is slightly different: we will be using the tool for container execution orchestration in which each task simply invokes a container (within our k8s cluster or ECS) or a job (databrick job). Each container is an atomic unit of work written in any language. This architecture de-couples orchestration from the actual functional task (container) and avoid recoding of hundreds of existing tasks/containers. This has been working well for us using our internal tool.
Our flow would be simply as task1: call container-A; task2 call Databrick-job-B; task3: call container-C after task1 and task2 are completed.
My questions are:
1. Based on the documentations, my best option is to run local agent on some EC2 instances with localDashExecutor. Using any other agent type would require additional resources and add more latency. e.g: With Kubernetes agent, one pod for the Flow run and another for the actual container; Latency = double spin up time. The downside is scaling problem with local agent. Do I understand it correctly? Any other approach?
2. Any plan to add ECSRunTask to AWS Tasks? This is to run any arbitrary task defined outside of prefect context in ECS. Similar to Airflow ECS Operator? I am surprised it's not already on the list.
ThanksTim Enders
01/22/2022, 3:09 PMChris K.
01/22/2022, 4:09 PMChris K.
01/22/2022, 4:11 PMChris K.
01/22/2022, 4:11 PMChris K.
01/22/2022, 4:14 PMChris K.
01/22/2022, 4:26 PMAn executor that runs all functions synchronously and immediately in the main thread. To be used mainly for debugging purposes.
Guy Thompson
01/23/2022, 4:13 PMAshton
01/23/2022, 8:29 PMYusuf Khan
01/23/2022, 9:42 PMChris L.
01/24/2022, 3:52 AMResourceManager().setup()
to my tasks and also have cleanup. Thanks!Adi Gandra
01/24/2022, 6:21 AMMessage: Failed to pull image "<http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>": rpc error: code = NotFound desc = failed to pull and unpack image "<http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>": failed to resolve reference "<http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>": <http://z.dkr.ecr.us-east-1.amazonaws.com/r:latest|z.dkr.ecr.us-east-1.amazonaws.com/r:latest>: not found
I have followed the tutorial pretty closely, although some stuff what slightly outdated so I used the newer paradigms that were introduced. Any insight on how I could configure my EKS to be able to pull images from the ECR would be appreciated. I also did all of this on my master admin account, so I would think that permissions shouldn’t be an issueThomas Hoeck
01/24/2022, 9:48 AMClovis
01/24/2022, 10:58 AMJOB XXX failed
• Final state : Success
All my tasks were set as reference, so I really don't know how to explain why this flow showed no trace of error (taskbar was green, etc.). Do you have any clues to explain this behaviour?Dekel R
01/24/2022, 12:59 PM. 23 January 2022 11:00pm]: 500 Server Error for <http+docker://localhost/v1.41/images/create?tag=2022-01-19t11-58-16-139521-00-00&fromImage=uxxx-flow>: Internal Server Error ("Get "<https://xxx>": context deadline exceeded")
Seems like the docker registry is unavailable for some reason for a short period of time - exactly when Prefect is trying to pull a flow image and run it.
I have 2 questions that I couldn’t find any answers to:
1. Is there any configuration option I can use in order to make Prefect retry the image pulling on failure?
2. Can I configure Prefect to pull images only upon change (new flow version) and saving them locally? The way it works right now is that each run invokes an image pull.
Thanks!Muddassir Shaikh
01/24/2022, 1:16 PMXavier Babu
01/24/2022, 2:51 PMDavid Yang
01/24/2022, 3:02 PMdammy arinde
01/24/2022, 3:13 PMLeon Kozlowski
01/24/2022, 4:10 PMPedro Machado
01/24/2022, 4:38 PMwait_for_flow_run
task when trying to implement a flow of flows:
Unexpected error: TypeError('Object of type FlowRunView is not JSON serializable')
Any ideas? Code in thread.Adi Gandra
01/24/2022, 4:52 PMflow.storage = Docker(registry_url="<http://x.dkr.ecr.us-east-1.amazonaws.com|x.dkr.ecr.us-east-1.amazonaws.com>",
image_name="y",
image_tag='latest')
flow.executor = DaskExecutor()
flow.run_config = KubernetesRun(env={})
# flow.run()
flow.register(project_name="SA-Test", build=False)
I want to build this manually as part of our CI/CD and push it to the ECR. That part is done. Now when I want to run the prefect flow, the kub pod gets the image but then I get an error:
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage',)
Is there anything special I need to do in my Dockerfile or when I build this image to allow it to be successfully used without building inside Prefectbrian
01/24/2022, 5:01 PMMehdi Nazari
01/24/2022, 5:12 PMwith Flow("Reporting-Data-Import",
storage=Local(path="/app/flow.py", stored_as_script=True),
run_config=DockerRun(image="reporting_data_import:prod-prefect")) as flow:
# load parameters
with open("parameter.json", "r") as f:
default_param = f.read()
#
json_param = Parameter("data_sources", default=default_param if default_param else "").default
flow_parameter = extract_parameters(json_param)
is_valid = is_parameter_valid(flow_parameter)
with case(is_valid, False):
<http://logger.info|logger.info>(f"Invalid parameter! {flow_parameter}!")
# copy table & content
copy_table_content.map(param=flow_parameter.sources, upstream_tasks=[is_valid])
Please share your thoughts as I’m not sure where my issue is.Suresh R
01/24/2022, 6:10 PM'Running' object has no attribute 'is_success'
,can someone help.
def post_to_cw(obj, old_state, new_state):
if new_state.is_failed():
cloudwatch.put_metrics(cid=CID, flow_name=FLOW_NAME, status_code=1)
if new_state.is_success():
cloudwatch.put_metrics(cid=CID, flow_name=FLOW_NAME, status_code=0)
return new_state
Jason Motley
01/24/2022, 6:11 PMPedro Machado
01/24/2022, 6:11 PMLocalExecutor
and it will run them in order but was wondering if there is a way to explicitly define the dependencies.
For example, in this case, I'd like to make sure that the mapped task for 2 runs after 1 and 3 runs after 3.
items = [1, 2, 3]
ny_task.map(items)
thanksPedro Machado
01/24/2022, 6:11 PMLocalExecutor
and it will run them in order but was wondering if there is a way to explicitly define the dependencies.
For example, in this case, I'd like to make sure that the mapped task for 2 runs after 1 and 3 runs after 3.
items = [1, 2, 3]
ny_task.map(items)
thanksKevin Kho
01/24/2022, 6:14 PMitems
beforehand and it’s not a task output, you can do thisPedro Machado
01/25/2022, 1:15 PM