Ilhom Hayot o'g'li
05/24/2022, 8:38 AMKayvan Shah
05/24/2022, 8:49 AMTodd de Quincey
05/24/2022, 9:45 AMEd Burroughes
05/24/2022, 11:59 AMwith Flow("tax_assessor_inc") as flow:
raw_s3_keys = Parameter("raw_s3_keys")
flow_tasks = []
for s3_key in raw_s3_keys:
_clean = clean([s3_key])
_load_data = load_data(_clean)
_test_raw_data = test_raw_data()
_transform_task = transform_task()
flow_tasks.append([_clean, _load_data, _test_raw_data, _transform_task])
reduced_flow_tasks = sum(flow_tasks, [])
for i in range(len(reduced_flow_tasks) - 1):
reduced_flow_tasks[i].set_downstream(reduced_flow_tasks[i + 1])
The issue arises when trying to iterate over the raw_s3_keys
parameter as it's not iterable. It works when hardcoding the s3 paths but obviously that's not what we want, as it would great to dynamically define that list. Also annoyingly each task has to occur synchronously otherwise we'd use something like prefect map.
Firstly is it possible to iterate the output of a parameter object. If not is there a better way of solving this problem.
Sorry for the long winded message, Thank you 🙂Frank Embleton
05/24/2022, 1:21 PMRainer Schülke
05/24/2022, 1:41 PMVamsi Reddy
05/24/2022, 2:33 PMRUN_CONFIG = ECSRun(
labels=["dev"],
task_role_arn="arn:aws:iam::xxxxx:role/prefectTaskRole",
execution_role_arn="arn:aws:iam::xxxxx:role/prefectECSAgentTaskExecutionRole",
image="<http://xxxxxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:latest|xxxxxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:latest>", # this is our custom image that also has our flows
run_task_kwargs=dict(cluster="prefectEcsClusterDev", launchType="FARGATE", overrides=dict(
containerOverrides=[
dict(
name="flow",
cpu=4096,
memory=8192,
)])),
cpu=4096, #tried specifying here but still no luck
memory=8192
)
Todd de Quincey
05/24/2022, 2:36 PMJK
05/24/2022, 2:42 PMMadison Schott
05/24/2022, 4:50 PMConstantino Schillebeeckx
05/24/2022, 6:02 PM"FOO": "BAR"
in the run config, and then do something like os.environ["FOO"]: "BAZ"
in the flow.py - what will FOO
bet set to when the flow runs?Patrick Tan
05/24/2022, 6:39 PMTom Manterfield
05/24/2022, 7:07 PMChristian Vogel
05/24/2022, 7:18 PMSander
05/24/2022, 8:45 PMPRASHANT GARG
05/25/2022, 5:39 AMThomas Opsomer
05/25/2022, 10:16 AMNaga Sravika Bodapati
05/25/2022, 12:08 PMKayvan Shah
05/25/2022, 12:14 PMDaniel Sääf
05/25/2022, 12:29 PM_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
After reading up on this it sounds like this is due to some other exception that results in an unpickleable object. Is that right? Are there any ways i can reach the underlying exception?
Traceback and code in thread..Jonathan Mathews
05/25/2022, 12:44 PMLee Mendelowitz
05/25/2022, 12:47 PMRobin Weiß
05/25/2022, 12:53 PMNaga Sravika Bodapati
05/25/2022, 1:13 PMFlorian Guily
05/25/2022, 1:14 PMlocal dask executor
in this cluster or if there is an aditionnal configuration to use dask as an executor in a k8's cluster ?Andrew Lawlor
05/25/2022, 1:47 PMConstantino Schillebeeckx
05/25/2022, 1:58 PMsuccess
though. why wouldn't this flow's overall state be set to failed
?Anna Geller
Xavier Babu
05/25/2022, 4:34 PMwill milner
05/25/2022, 5:18 PM