Milton
03/25/2022, 6:25 PM--name
, it will assume the default name Kubernetes
. This is okay when you only run one replica, but when you increase the replica number to 2, both will take the default name and in the Prefect UI, the two agents are treated as the same agent. So what is the recommended way to deploy multiple agents for HA purposes in Kubernetes?Wei Mei
03/25/2022, 7:12 PMHenry
03/25/2022, 7:24 PMPatrick Tan
03/25/2022, 7:28 PMDanny Vilela
03/25/2022, 7:29 PMValueError
) is it possible to recover that traceback/error from a state handler? Right now I can say “this task failed and will be retrying in X minutes”, but not “this task failed for reason Y and will retry in X minutes”. Or can we pass keyword arguments to a state handler with this signature:
# What I have now.
def notify_on_retry(task: Task, old_state: State, new_state: Retrying) -> State: ...
# Maybe what I want?
def notify_on_retry(task: Task, old_state: State, new_state: Retrying, message: str) -> State: ...
# Alternative?
def notify_on_retry(task: Task, old_state: State, new_state: Retrying, **kwargs) -> State: ...
Harry Baker
03/25/2022, 8:45 PM@task()
def flow_run_helper(flow_name, project_name):
cfr = create_flow_run(flow_name=flow_name, project_name=project_name)
wfr = wait_for_flow_run(cfr, stream_logs=True, raise_final_state=True)
return wfr
but its yelling at me about "ValueError: Could not infer an active Flow context while creating edge". my app does a lot of chaining of flows, so i wanted to streamline thisLee Briggs
03/25/2022, 9:27 PMDominic Pham
03/26/2022, 12:22 AMChu Lục Ninh
03/27/2022, 11:31 AMhimanshu pandey
03/27/2022, 2:06 PMLI LIU
03/28/2022, 1:02 AMJeff Kehler
03/28/2022, 8:31 AMParameter
into the class constructor __init__
of a task that has been subclassed from prefect.Task
I am receiving a prefect.Task
object instead of the value from the Parameter itself. I can't seem to figure out what I'm doing wrong.Michael Smith
03/28/2022, 8:36 AMJons Cyriac
03/28/2022, 9:14 AMBennett Lambert
03/28/2022, 10:55 AMrequests.exceptions.SSLError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1091)')))
Is there a way to provide proxy credentials when starting the agent? I've tried using the --env flag to supply proxy information. Or is there something similar to a --verify=false that I can set?Rahul Kadam
03/28/2022, 12:42 PMFlorian Guily
03/28/2022, 1:05 PMBennett Lambert
03/28/2022, 2:13 PMDekel R
03/28/2022, 2:44 PMwith Flow('html_data_extraction__dev',
storage=Docker(registry_url="us-central1-docker.pkg.dev/***/",
dockerfile="./Dockerfile"),
schedule=daily_schedule, executor=LocalDaskExecutor(scheduler="processes")) as flow:
mode = Parameter(name='mode', default=None)
with case(mode, None):
data_a=task_a
with case(mode, 'onboard'):
data_b=task_b
data_c=merge(data_a, data_b)
task_c
task_d
with case(mode, None):
task_x
with case(mode, 'onboard'):
task_y
Tasks a and b retrieve some data (each one is relevant for a different data source), task c and d are common (not in a “case”) - doing X on the data (the data looks the same at this point) and then again tasks x and y are different - each one is relevant for a different case.
When running locally (mac, flow,run…) it all works as expected.
When running on Prefect cloud - all of the tasks gets skipped (exactly the same code and credentials)
Any idea on what I’m missing here?
I’m using “upstream_tasks” in order to run the tasks in a specific order when necessary.
Thankskevin
03/28/2022, 3:54 PM{'_schema': 'Invalid data type: None'}
after it is scheduled but before the first task gets executed. When I try to run this flow in a local environment it executes as expected. Any idea what could be causing this issue?Michael Smith
03/28/2022, 4:46 PMMyles Steinhauser
03/28/2022, 4:48 PMKen Nguyen
03/28/2022, 5:17 PM{'errors': [{'path': ['flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}], 'data': None}
When re-running a flow at a later time, it runs successfully. Are there any docs that can provide info on API limits?Harry Baker
03/28/2022, 7:05 PMMichael Smith
03/28/2022, 7:44 PMMyles Steinhauser
03/28/2022, 8:36 PMFlows
? Specifically, I’m trying to workaround some delayed scaling issues with ECS using EC2 instances (not ECS with Fargate tasks)
Often, this failure is reported back to Prefect like the following error until Capacity Provider scaling has caught up again:
FAIL signal raised: FAIL('a4f09101-0577-41ce-b8b0-31b84f26d855 finished in state <Failed: "Failed to start task for flow run a4f09101-0577-41ce-b8b0-31b84f26d855. Failures: [{\'arn\': \'arn:aws:ecs:us-east-1:<redacted>:container-instance/a8bc98b7c6864874bc6d1138f758e8ea\', \'reason\': \'RESOURCE:CPU\'}]">')
I’m using the following calls to launch the sub-flows (as part of a larger script):
flow_a = create_flow_run(flow_name="A", project_name="myles")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
Alex Prokop
03/28/2022, 8:39 PMEric Mauser
03/28/2022, 8:50 PMconnections = ['conn1', 'conn2',
'conn3']
with Flow("flow_name", run_config=RUN_CONFIG, storage=STORAGE, schedule=SCHEDULE) as flow:
for conn_id in connections:
flow.add_task( AirbyteConnectionTask(
airbyte_server_host=<Airbyte host>
airbyte_server_port=<airbyte port>,
airbyte_api_version="v1",
connection_id=conn_id
)
)
flow.run(executor=LocalDaskExecutor)
Leo Kacenjar
03/28/2022, 9:47 PMCMD
in my dockerfile and it seems to not be executing. That makes me think it is being overwritten. Maybe I have to provide an ENTRYPOINT
intead?Sacha Ventura
03/29/2022, 5:36 AMSacha Ventura
03/29/2022, 5:36 AMimport prefect
from prefect import task, Flow
from prefect.storage import S3
@task
def print_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f'hello world')
with Flow("hello-flow") as flow:
print_task()
flow.storage = S3(bucket="****")
if __name__ == '__main__':
flow.run()
Error downloading Flow from S3: Unable to locate credentials
by providing all the possible combinations of AWS env vars in the task definition container.15:58:12
INFO
agent
Submitted for execution: Task ****
15:58:47
INFO
S3
Downloading flow from s3://***/hello-flow/2022-03-29t04-10-25-834594-00-00
15:58:47
ERROR
S3
Error downloading Flow from S3: 'str' object has no attribute 'get'
15:58:48
ERROR
execute flow-run
Failed to load and execute flow run: AttributeError("'str' object has no attribute 'get'")
prefecthq/prefect:1.1.0-python3.8
as agent image)Anna Geller
03/29/2022, 9:48 AMecs
in its name:
https://github.com/anna-geller/packaging-prefect-flows/tree/master/flowsSacha Ventura
04/06/2022, 2:20 AM