Eli Treuherz
01/27/2022, 5:14 PMKevin Mullins
01/27/2022, 5:20 PMfunction a
in a downstream task or function b
in a down stream task (essentially deciding on a strategy function or factory function). Could the down-stream task properly get what function to execute from the other? Hopefully this makes sense.Matthew Webster
01/27/2022, 6:10 PMSuresh R
01/27/2022, 6:17 PMVamsi Reddy
01/27/2022, 9:03 PMTypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
Parth Patel
01/28/2022, 12:36 AMMiroslav Rác
01/28/2022, 1:35 PMFailed to load and execute Flow's environment: ModuleNotFoundError("No module named '/Users/miro/'")
… Obviously, when I created my flow, my local path has been probably been pickled and so it cannot run on another machine.
Is this expected behavior? How can I run flows created on a different machine?Bruno Murino
01/28/2022, 1:58 PMAndrea Nerla
01/28/2022, 3:19 PMDavid Yang
01/28/2022, 4:13 PMPedro Machado
01/28/2022, 4:28 PMcreate_flow_run
and wait_for_flow_run
and I am using this task to start the child flow(s) from the parent. This task is set to retry. Today, I noticed that if the child flow fails, this task won't start a new flow run when it retries. It seems to check for the same flow_run_id
. I suspect it's somehow related to an idempotency key that is set by default.
Would it be better to use an idempotency key I explicitly set that will be different across retries? Is there a better way to set this up?
The goals are: 1) ability to retry the child flow when it fails and 2) ability to restart the parent flow from failed if needed. Thanks!Suresh R
01/28/2022, 6:25 PMJovan Sakovic
01/28/2022, 6:41 PMlogger = prefect.context.get("logger")
)
• pass these logger object from function to function to finally get to the one that is running on threads
◦ extract_thread = Thread(target=extract_messages, name=f"Extractor #1", args=(extract_queue, load_queue, logger))
Michael Bell
01/28/2022, 10:02 PMError uploading to S3: An error occurred (AccessDenied) when calling the PutObject operation: Access Denied
. If I run the flow w/out the DaskExecutor, it works just fine. I have previously tested using dask_cloudprovider.aws.FargateCluster
directly within a task to create a cluster, send a dask array calculation to the cluster, and tear down all within one Prefect task and that worked fine as well. Any thoughts as to where the permissions error might be coming from? The execution role I specify in the cluster_kwargs
has s3:* permissions.
import os
import sys
from typing import List
sys.path.append('.')
import prefect
from prefect import task, Flow
from prefect.storage.s3 import S3
from prefect.run_configs.ecs import ECSRun
from prefect.executors.dask import DaskExecutor
from dask_cloudprovider.aws import FargateCluster
from config.build import ECR_IMAGE
@task(log_stdout=True)
def prefect_task(n: int) -> int:
print(f'Running task w/ input {n}')
time.sleep(2)
return n * 10
@task(log_stdout=True)
def reduce_task(ns: List[int]) -> int:
print(f'Running reduction task')
return sum(ns)
schedule = None
account = os.environ.get("AWSENV")
with Flow("dask_poc", storage=S3(bucket=f'prefect-{account}-us-east-1')) as flow:
results = prefect_task.map([1,2,3,4,5])
reduced_result = reduce_task(results)
flow.run_config = ECSRun(
image=ECR_IMAGE,
execution_role_arn=...,
task_role_arn=...,
run_task_kwargs={
"cluster": "PrefectCluster",
},
labels=[account]
)
flow.executor = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={
"image": ECR_IMAGE,
"n_workers": 5,
"cluster_arn": ...,
"execution_role_arn": ...,
"vpc": ...,
"subnets": [..., ],
"security_groups": [..., ],
},
)
Jovan Sakovic
01/29/2022, 11:30 AMFailed
state.
But, the DbtShellTask raises the FAIL signal with the message that first occurs Command failed with exit code 2
and the notifier doesn’t get to the Error that I actually want - dbt’s error.
Is there a way to push these other error messages to the notifier?Tara
01/31/2022, 6:11 AMBruno Murino
01/31/2022, 11:26 AMMary Clair Thompson
01/31/2022, 1:23 PMps ef
yields ~7k zombie processes on the machine; on closer inspection with ps -auxwwf
we're seeing graph-ql as the culprit for all of those hanging processes. This seems to be an issue elsewhere (eg this report), and I'm wondering if there's anything that may have changed in 0.15.12 to cause this? And what mitigation you all would suggest?Tim Wright
01/31/2022, 1:50 PMTim Wright
01/31/2022, 3:12 PMshijas km
01/31/2022, 4:11 PMKevin Mullins
01/31/2022, 5:02 PMTalmaj Marinč
01/31/2022, 5:10 PMtask.map
.
What am I doing wrong?Dev Dasgupta
01/31/2022, 5:13 PMprefect register --project my_project -p bar
)
foo_root_folder
├── README.md
├── bar
│ └── flow.py
├── pyproject.toml
├── requirements.txt
├── setup.cfg
Since the file flow.py
was cluttered with all utils functions required for the flow execution I refactored it into a separate python package and made calls to those functions in flow.py
from bar.utils import get_some_useful_function
The resultant folder looked like the following:
foo_root_folder
├── README.md
├── bar
│ ├── __init__.py
│ ├── utils.py
│ └── flow.py
├── pyproject.toml
├── requirements.txt
├── setup.cfg
When I try to register now I am getting error as ModuleNotFoundError: No module named 'bar'
Can anyone please let me know if anyone faced similar issues before and how they resolved it?
ThanksAndrew Lawlor
01/31/2022, 5:38 PMKeith Veleba
01/31/2022, 5:53 PMDavid Yang
01/31/2022, 6:46 PMKeith Veleba
01/31/2022, 7:16 PMChris Hemphill
01/31/2022, 8:05 PMDave
01/31/2022, 8:21 PMCouldn't connect to *Prefect Server* at *<http://34.82.119.23:4200/graphql>
,* can anyone help?