Anna Gerlich
02/07/2023, 11:37 AMTim-Oliver
02/07/2023, 12:42 PMline 139, in from_flow
result_storage=flow.result_storage or ctx.result_factory.storage_block,
AttributeError: 'Flow' object has no attribute 'result_storage'
Any help would be appreciated.jcozar
02/07/2023, 1:19 PMEric Coleman
02/07/2023, 2:20 PMJames Gatter
02/07/2023, 2:29 PMNikhil Joseph
02/07/2023, 2:39 PMHedgar
02/07/2023, 2:39 PMDileep Damodaran
02/07/2023, 4:48 PMShellTask.run(run_id :str)
with a new value of run_id
?Hans Lellelid
02/07/2023, 6:01 PMNikhil Jain
02/07/2023, 6:15 PMAn error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
Hereās my setup: prefect==2.7.0
, prefect-aws==2.1.0
. ECS block definition:
ecs_task_block = ECSTask(
task_definition_arn=task_def_arn,
cluster=f'{env}-prefect-cluster',
image=f'123456789.dkr.ecr.{regions[env]}.<http://amazonaws.com/{image_region[env]}-stride-enrollment-flows-ecr:{ecr_image_tag}|amazonaws.com/{image_region[env]}-stride-enrollment-flows-ecr:{ecr_image_tag}>',
vpc_id=vpcs[env],
task_customizations=[
{
"op": "add",
"path": "/networkConfiguration",
"value": {
"awsvpcConfiguration": {
"subnets" : subnets[env],
"securityGroups" : security_groups[env],
"assignPublicIp" : "DISABLED"
}
}
}
],
configure_cloudwatch_logs=True,
stream_output=True,
)
ecs_task_block.save(f'{env}-ecs-block', overwrite=True)
I can see that prefect is still creating new task definitions; one of the things it is overriding on the task def is aws-logs-prefix
to be the flow-run-id. Is there a different way to be able to access logs in cloudwatch without setting a different name?Ben Muller
02/07/2023, 6:59 PMTomƔs Emilio Silva Ebensperger
02/07/2023, 8:29 PMBen Muller
02/07/2023, 9:15 PMState message: Submission failed. IndexError: list index out of range
cc: @Will Raphaelson in case this is good info for you.Nikhil Jain
02/08/2023, 12:16 AMSubmission failed. botocore.exceptions.ParamValidationError: Parameter validation failed: Unknown parameter in input: "deregisteredAt", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform
I donāt get this error if I deploy the flows locally, but when the flows are deployed from circleci, this error is generated.Dileep Damodaran
02/08/2023, 7:50 AMFAIL
signal?
class K8sSparkSubmitTask(ShellTask):
def __init__(
self,
command: str = None,
max_retries: int = 10,
retry_delay_seconds: int = 3,
master_uri: str = None,
deploy_mode: str = None,
job_name: str = None,
conf_kwargs: dict = None,
env: dict = None,
helper_script: str = None,
shell: str = "bash",
return_all: bool = False,
log_stderr: bool = False,
**kwargs: Any,
):
self.command = command
self.master_uri = master_uri
self.deploy_mode = deploy_mode
self.job_name = job_name
self.conf_kwargs = conf_kwargs or {}
super().__init__(
**kwargs,
command=command,
max_retries=max_retries,
retry_delay=timedelta(seconds=retry_delay_seconds),
env=env,
helper_script=helper_script,
shell=shell,
return_all=return_all,
log_stderr=log_stderr,
)
def run(
self,
run_id: str = None,
) -> str:
if some_condition:
# TODO : Update the value of run_id so that
# new value of run_id will be reflected from next retry onwards
raise FAIL("Retrying with an updated value of run_id")
else:
# Do something
pass
with Flow("Test") as f:
k = K8sSparkSubmitTask()
k.map(run_id=unmapped("123"))
f.run()
Muhammad Husnain
02/08/2023, 8:29 AMAleksandr Liadov
02/08/2023, 10:15 AMParwez Noori
02/08/2023, 10:49 AMAnkit
02/08/2023, 1:41 PMDzmitry Aliashkevich
02/08/2023, 3:24 PMAnkit
02/08/2023, 4:10 PMHaotian Li
02/08/2023, 4:19 PMpbutler
02/08/2023, 5:23 PMSami Serbey
02/08/2023, 7:52 PMprefect orion start
command in a docker container connected to a postgres server. My prefect server is open to the world with http access. Is my architecture susceptible for an sql injection? Can prefect be attacked by SQL injection? Should I take care of this? Thanks for taking the time for reading my post. I wish you a nice day šAaron Goebel
02/08/2023, 9:36 PMChristopher Martin
02/08/2023, 10:58 PMSimon Rascovsky
02/08/2023, 11:00 PMflow_run_name
parameter in 2.7.12 but Iām having some difficulty and Iām unable to create the deployment. Itās a flow-subflow scenario with iteration:
@flow(name="my_subflow", flow_run_name=f"my_subflow run: {iteration}", log_prints=True)
def my_subflow(iteration: str):
print(f"my_subflow run: {iteration}")
@flow(name="main_flow", log_prints=True)
def main_flow():
iteration_list = ["iteration_1", "iteration_2", "iteration_3"]
for iteration in iteration_list:
my_subflow(iteration)
When I try to deploy either of the flows with something like
prefect deployment build -n main_flow_run_name_test_deployment -q my_queue ./flows/flow_run_name_test.py:main_flow --apply
I get the error: Script at './flows/flow_run_name_test.py' encountered an exception: NameError("name 'iteration' is not defined")
VScode is also flagging the iteration
variable as not defined. How would I pass the iteration dynamically if the flow decorator is outside the function?Devin Flake
02/08/2023, 11:22 PMDileep Damodaran
02/09/2023, 3:39 AMRikimaru Yamaguchi
02/09/2023, 6:50 AM