Ranu Goldan
03/31/2021, 7:58 AMkumar
03/31/2021, 10:55 AMTim Pörtner
03/31/2021, 11:50 AMDaniel Black
03/31/2021, 2:30 PMRia May Dewi
03/31/2021, 3:11 PM{
flow{
id
name
schedule
}
}
And the result is:
{
"id": "86f5fb6f-26c9-4ffe-9961-f28e5538b53b",
"name": "spark",
"schedule": {
"type": "Schedule",
"clocks": [
{
"cron": "0 1 * * *",
"type": "CronClock",
"day_or": true,
"labels": null,
"end_date": null,
"start_date": null,
"__version__": "0.14.13",
"parameter_defaults": {}
}
],
"filters": [],
"or_filters": [],
"__version__": "0.14.13",
"adjustments": [],
"not_filters": []
}
liren zhang
03/31/2021, 3:27 PM@task
def get_parameters():
.....
return {}
flow_a = StartFlowRun(flow_name="flow_a", project_name="sample", wait=True)
flow_b = StartFlowRun(flow_name="flow_b", project_name="sample", wait=True)
with Flow("parent-flow") as flow:
date_to_process = Parameter("date_to_process", default=None)
vp = get_parameters(date_to_process)
flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
flow_b = flow_b(upstream_tasks=[flow_a])
I was able to do these successfully; but I am NOT sure how I can use the parameter in the dependent flows. In this case, I have a get_parameters() task to compose the parameters that I need to pass down to dependent flow_a. How do I receive/use the parameter I passed in from flow_a = flow_a(upstream_tasks=[vp], parameters=vp)
?
Let's just say, I want to print out the parameters I pass into flow_a. What do I need to do in flow_a.py to reference the passed in parameter?Daniel Caldeweyher
03/31/2021, 4:10 PMweekly = CronSchedule("15 1 * * *", start_date=DEFAULT_START_DATE)
with Flow("Daily Extract", schedule=weekly) as flow:
...
This flow was happily running at the scheduled time at 1:15 a.m. every day, except now it just stopped getting scheduled. Curiously today is the first day of the next month....Andrew Hannigan
03/31/2021, 4:19 PMRyan Baker
03/31/2021, 4:23 PMHui Zheng
03/31/2021, 4:39 PMMitchell Bregman
03/31/2021, 4:41 PMECSRun
run configuration, with ECSAgent
and Docker
storage. I am running into a similar issue - not entirely sure if there is a workaround currently, but here is the error I cannot get past:
[2021-03-31 15:09:51,907] ERROR - agent | Error while deploying flow: InvalidParameterException('An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.')
My setup:
# flow.py
import prefect
from prefect import Flow, task
@task
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Got here!!!!")
with Flow("Test") as flow:
say_hello()
--
# deploy.py
flow.run_configs = ECSRun(
run_task_kwargs={'cluster': 'my-cluster'},
execution_role_arn='arn:aws:iam::{ACCOUNT_NUMBER}:role/my-ecs-task-role',
labels=['ecs']
)
flow.storage = Docker(
env_vars=config.ENVIRONMENT_VARIABLES,
extra_dockerfile_commands=[
f"RUN pip install -e /service",
],
files={f"{os.path.join(os.path.expanduser('~'), 'project')}": "/service"},
image_name=config.DOCKER_IMAGE_NAME,
image_tag=config.DOCKER_IMAGE_TAG,
registry_url=config.DOCKER_REGISTRY_URL,
)
Here is how I am running the agent:
prefect agent ecs start --token {PREFECT_RUNNER_TOKEN} --cluster my-cluster --label ecs --launch-type FARGATE
My issue is, I have no idea if I am doing something wrong on the AWS side of things or not; I am trying to wrap my head around the root cause. I’m also noticing there are task_definitions
and task_role_arn
parameters and I’m unsure if I am expected to use them or not, where to set those up. Any insight would be extremely appreciated!Matic Lubej
03/31/2021, 4:49 PMNicholas Chammas
03/31/2021, 4:56 PMKonstantinos
03/31/2021, 5:15 PMRey Marin
03/31/2021, 5:49 PMCarter Kwon
03/31/2021, 6:14 PMtest/
. I was trying to see if a secret could be named test/this/path
and after adding it test/
was the only part that stayed. Now it can't be deleted.matta
03/31/2021, 10:42 PMRuntimeError: cannot schedule new futures after shutdown
Not sure what to do? I don't want to manually have to set its state to "Success"Brian Keating
04/01/2021, 2:11 AMEC2Cluster
. I'm new to dask. The relevant part of my flow script is:
flow.executor = DaskExecutor(
cluster_class='dask_cloudprovider.aws.EC2Cluster',
cluster_kwargs={'n_workers': 2, 'docker_image': 'prefecthq/prefect', 'debug': True}
)
flow.run()
This fails with FlowRunner: ClientError('An error occurred (InvalidParameterValue) when calling the RunInstances operation: User data is limited to 16384 bytes')
. The issue is that the docker run command looks like this: docker run --net=host prefecthq/prefect env DASK_INTERNAL_INHERIT_CONFIG="a_very_very_long_string" python -m distributed.cli.dask_scheduler
, so I guess the command winds up being too long. Does anyone know a workaround for this issue?Ranu Goldan
04/01/2021, 5:55 AMVarun Joshi
04/01/2021, 5:59 AMJeremy Tee
04/01/2021, 6:27 AMtransform task
, it is expected to fail for parameter "a"
, but when i rerun from fail, what is the expected flow? Will it rerun for all cases [3,1,"a"]
or skip 3,1
, and only rerun a
@task
def extract():
return [3, 1, "a"]
@task
def transform(x):
y = x + 1
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(y)
return y
with Flow("failure-flow-test", executor=LocalExecutor(), run_config=LocalRun()) as flow:
e = extract()
t = transform.map(e)
Jacob Blanco
04/01/2021, 7:20 AMxyzz
04/01/2021, 9:31 AMVladimir Bolshakov
04/01/2021, 1:17 PMResume
state passed via Client.set_task_run_state
method to paused task in task runtime? When i try to use Client.get_task_run_state
in task i found that state in task runtime is Running
(after Resume
and after Submitted
). So passed result to Resume
state is not accessible. Have any ideas?Hawkar Mahmod
04/01/2021, 1:23 PMliren zhang
04/01/2021, 2:43 PMZach Khorozian
04/01/2021, 2:49 PMLuis Gallegos
04/01/2021, 4:58 PMfrom prefect import task, Flow, Parameter
from prefect.executors import LocalDaskExecutor, DaskExecutor
from prefect.tasks.prefect import StartFlowRun
import prefect
executor = LocalDaskExecutor(num_workers=1)
flow1 = StartFlowRun("flow1", project_name='test', wait=True)
flow2 = StartFlowRun("flow2", project_name='test', wait=True)
with Flow("example", executor=executor) as flow:
table_dict_param_list = []
with open('parameters.txt', 'r') as f:
lines = f.readlines()
for cnt, line in enumerate(lines):
dict_param = {}
dict_param['param1'] = cnt
dict_param['param2'] = line
table_dict_param_list.append(table_dict_param)
flow1 = flow1()
## i need this execution to be sequential like in a "for loop"
flow2.map(parameters=table_dict_param_list)
flow.register(project_name="test")
Nicholas Chammas
04/01/2021, 5:17 PMidempotency_key=flow.serialized_hash()
the default?
https://docs.prefect.io/orchestration/concepts/flows.html#core-client
I can’t think of why someone would want the Flow version to change if the Flow definition hasn’t.Riley Hun
04/01/2021, 5:58 PMprefect-server-towel
seems to have an ErrImagePull
error, and prefect-server-hasura
and prefect-server-ui
can't pull their respective images from the registry.
Here's my deployment command I entered:
helm repo add prefecthq <https://prefecthq.github.io/server/>
helm install ${NAME} prefecthq/prefect-server --values=values.yaml