Rob McInerney
05/23/2022, 4:36 PMwith Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
dataset = Parameter("dataset", default={})
dataset_is_valid = check_dataset_is_valid(dataset)
with case(dataset_is_valid, True):
access_token = get_access_token()
response = refresh_dataset(dataset, access_token)
result = check_result(dataset, response, access_token)
flow.set_reference_tasks([response, result])
with case(dataset_is_valid, False):
flow.set_reference_tasks([dataset_is_valid])
Ilhom Hayot o'g'li
05/23/2022, 5:58 PMJosh
05/23/2022, 8:59 PMThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.7.13', currently running with '3.7.12')")
The CI/CD system registering my flow I guess is using python 3.7.13. But how do I know what the execution environment python is if it’s executing from a docker image in a prefect docker agent?Josh
05/23/2022, 10:47 PMHaoboGu
05/24/2022, 2:27 AMDylan Lim
05/24/2022, 2:52 AMGuillaume Latour
05/24/2022, 6:57 AM/home/<user>/.prefect/results
folder with <user>
beeing the user launching the prefect server & agent on the other server (and who are not present in the docker)
I've added this line into the configuration: flows.checkpointing = false
, relaunched the server and agent but nothing has changed: the results folder is still beeing filled with intermediate tasks results.
Am I doing something wrong? Is it possible to prevent this intermediate backups?
Ty in advanceEmma Rizzi
05/24/2022, 7:17 AMIlhom Hayot o'g'li
05/24/2022, 8:38 AMKayvan Shah
05/24/2022, 8:49 AMTodd de Quincey
05/24/2022, 9:45 AMEd Burroughes
05/24/2022, 11:59 AMwith Flow("tax_assessor_inc") as flow:
raw_s3_keys = Parameter("raw_s3_keys")
flow_tasks = []
for s3_key in raw_s3_keys:
_clean = clean([s3_key])
_load_data = load_data(_clean)
_test_raw_data = test_raw_data()
_transform_task = transform_task()
flow_tasks.append([_clean, _load_data, _test_raw_data, _transform_task])
reduced_flow_tasks = sum(flow_tasks, [])
for i in range(len(reduced_flow_tasks) - 1):
reduced_flow_tasks[i].set_downstream(reduced_flow_tasks[i + 1])
The issue arises when trying to iterate over the raw_s3_keys
parameter as it's not iterable. It works when hardcoding the s3 paths but obviously that's not what we want, as it would great to dynamically define that list. Also annoyingly each task has to occur synchronously otherwise we'd use something like prefect map.
Firstly is it possible to iterate the output of a parameter object. If not is there a better way of solving this problem.
Sorry for the long winded message, Thank you 🙂Frank Embleton
05/24/2022, 1:21 PMRainer Schülke
05/24/2022, 1:41 PMVamsi Reddy
05/24/2022, 2:33 PMRUN_CONFIG = ECSRun(
labels=["dev"],
task_role_arn="arn:aws:iam::xxxxx:role/prefectTaskRole",
execution_role_arn="arn:aws:iam::xxxxx:role/prefectECSAgentTaskExecutionRole",
image="<http://xxxxxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:latest|xxxxxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:latest>", # this is our custom image that also has our flows
run_task_kwargs=dict(cluster="prefectEcsClusterDev", launchType="FARGATE", overrides=dict(
containerOverrides=[
dict(
name="flow",
cpu=4096,
memory=8192,
)])),
cpu=4096, #tried specifying here but still no luck
memory=8192
)
Todd de Quincey
05/24/2022, 2:36 PMJK
05/24/2022, 2:42 PMMadison Schott
05/24/2022, 4:50 PMConstantino Schillebeeckx
05/24/2022, 6:02 PM"FOO": "BAR"
in the run config, and then do something like os.environ["FOO"]: "BAZ"
in the flow.py - what will FOO
bet set to when the flow runs?Patrick Tan
05/24/2022, 6:39 PMTom Manterfield
05/24/2022, 7:07 PMChristian Vogel
05/24/2022, 7:18 PMKevin Kho
05/24/2022, 7:30 PMSander
05/24/2022, 8:45 PMPRASHANT GARG
05/25/2022, 5:39 AMThomas Opsomer
05/25/2022, 10:16 AMNaga Sravika Bodapati
05/25/2022, 12:08 PMKayvan Shah
05/25/2022, 12:14 PMDaniel Sääf
05/25/2022, 12:29 PM_pickle.PicklingError: Pickling client objects is explicitly not supported.
Clients have non-trivial state that is local and unpickleable.
After reading up on this it sounds like this is due to some other exception that results in an unpickleable object. Is that right? Are there any ways i can reach the underlying exception?
Traceback and code in thread..Jonathan Mathews
05/25/2022, 12:44 PMJonathan Mathews
05/25/2022, 12:44 PMAnna Geller
05/25/2022, 12:55 PMJonathan Mathews
05/25/2022, 12:59 PMKevin Kho
05/25/2022, 2:35 PM