Jake
01/26/2022, 8:02 PMFailed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'cfmeta\'",)\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with \'3.6.15\', currently running with \'3.6.8\')\nThis also may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.',)
I see that it is common and I read this thread. However, our flow needs to use specifically 3.6.15 because some of our dependencies require it and I do not see an agent that matches this version. I saw that script based storage was suggested, but we generate our flows dynamically so that is not an option. Any suggestions on how to resolve this? Thanks in advance!Saurabh Indoria
01/27/2022, 5:01 AMSergi de Pablos
01/27/2022, 6:43 AMmassumo
01/27/2022, 2:00 PMAndreas
01/27/2022, 3:10 PMshijas km
01/27/2022, 3:37 PMSuresh R
01/27/2022, 4:26 PMEli Treuherz
01/27/2022, 5:14 PMKevin Mullins
01/27/2022, 5:20 PMfunction a
in a downstream task or function b
in a down stream task (essentially deciding on a strategy function or factory function). Could the down-stream task properly get what function to execute from the other? Hopefully this makes sense.Matthew Webster
01/27/2022, 6:10 PMSuresh R
01/27/2022, 6:17 PMVamsi Reddy
01/27/2022, 9:03 PMTypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Parth Patel
01/28/2022, 12:36 AMMiroslav Rác
01/28/2022, 1:35 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/miro/'")
… Obviously, when I created my flow, my local path has been probably been pickled and so it cannot run on another machine.
Is this expected behavior? How can I run flows created on a different machine?Bruno Murino
01/28/2022, 1:58 PMAndrea Nerla
01/28/2022, 3:19 PMDavid Yang
01/28/2022, 4:13 PMPedro Machado
01/28/2022, 4:28 PMcreate_flow_run
and wait_for_flow_run
and I am using this task to start the child flow(s) from the parent. This task is set to retry. Today, I noticed that if the child flow fails, this task won't start a new flow run when it retries. It seems to check for the same flow_run_id
. I suspect it's somehow related to an idempotency key that is set by default.
Would it be better to use an idempotency key I explicitly set that will be different across retries? Is there a better way to set this up?
The goals are: 1) ability to retry the child flow when it fails and 2) ability to restart the parent flow from failed if needed. Thanks!Suresh R
01/28/2022, 6:25 PMJovan Sakovic
01/28/2022, 6:41 PMlogger = prefect.context.get("logger")
)
• pass these logger object from function to function to finally get to the one that is running on threads
◦ extract_thread = Thread(target=extract_messages, name=f"Extractor #1", args=(extract_queue, load_queue, logger))
Michael Bell
01/28/2022, 10:02 PMError uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
. If I run the flow w/out the DaskExecutor, it works just fine. I have previously tested using dask_cloudprovider.aws.FargateCluster
directly within a task to create a cluster, send a dask array calculation to the cluster, and tear down all within one Prefect task and that worked fine as well. Any thoughts as to where the permissions error might be coming from? The execution role I specify in the cluster_kwargs
has s3:* permissions.
import os
import sys
from typing import List
sys.path.append('.')
import prefect
from prefect import task, Flow
from prefect.storage.s3 import S3
from prefect.run_configs.ecs import ECSRun
from prefect.executors.dask import DaskExecutor
from dask_cloudprovider.aws import FargateCluster
from config.build import ECR_IMAGE
@task(log_stdout=True)
def prefect_task(n: int) -> int:
print(f'Running task w/ input {n}')
time.sleep(2)
return n * 10
@task(log_stdout=True)
def reduce_task(ns: List[int]) -> int:
print(f'Running reduction task')
return sum(ns)
schedule = None
account = os.environ.get("AWSENV")
with Flow("dask_poc", storage=S3(bucket=f'prefect-{account}-us-east-1')) as flow:
results = prefect_task.map([1,2,3,4,5])
reduced_result = reduce_task(results)
flow.run_config = ECSRun(
image=ECR_IMAGE,
execution_role_arn=...,
task_role_arn=...,
run_task_kwargs={
"cluster": "PrefectCluster",
},
labels=[account]
)
flow.executor = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={
"image": ECR_IMAGE,
"n_workers": 5,
"cluster_arn": ...,
"execution_role_arn": ...,
"vpc": ...,
"subnets": [..., ],
"security_groups": [..., ],
},
)
Jovan Sakovic
01/29/2022, 11:30 AMFailed
state.
But, the DbtShellTask raises the FAIL signal with the message that first occurs Command failed with exit code 2
and the notifier doesn’t get to the Error that I actually want - dbt’s error.
Is there a way to push these other error messages to the notifier?Tara
01/31/2022, 6:11 AMBruno Murino
01/31/2022, 11:26 AMMary Clair Thompson
01/31/2022, 1:23 PMps ef
yields ~7k zombie processes on the machine; on closer inspection with ps -auxwwf
we're seeing graph-ql as the culprit for all of those hanging processes. This seems to be an issue elsewhere (eg this report), and I'm wondering if there's anything that may have changed in 0.15.12 to cause this? And what mitigation you all would suggest?Tim Wright
01/31/2022, 1:50 PMTim Wright
01/31/2022, 3:12 PMshijas km
01/31/2022, 4:11 PMKevin Mullins
01/31/2022, 5:02 PMTalmaj Marinč
01/31/2022, 5:10 PMtask.map
.
What am I doing wrong?Talmaj Marinč
01/31/2022, 5:10 PMtask.map
.
What am I doing wrong?Kevin Kho
01/31/2022, 5:10 PMflow.executor = LocalDaskExecutor()
Talmaj Marinč
01/31/2022, 5:11 PMDaskExecutor
?Kevin Kho
01/31/2022, 5:12 PMTalmaj Marinč
01/31/2022, 5:28 PMKevin Kho
01/31/2022, 5:29 PMprefect register -f ..
Talmaj Marinč
01/31/2022, 5:50 PM