Richard Pitre
06/01/2020, 12:04 AMphilip
06/01/2020, 9:09 AMException raised while calling state handlers: KeyError(<Parameter: fisrt_num>,)
how can i solve this problem?
Here is my code
@task
def add(x, y):
return x+y
def post_to_slack(task, old_state, new_state):
if new_state.is_failed():
msg = "Task {0} finished in state {1}".format(task, new_state)
if new_state.is_finished():
p = new_state.result[num].result
print(p)
return new_state
with Flow("Tutorial", state_handlers=[post_to_slack]) as flow:
x = Parameter("fisrt_num")
y = Parameter("Second_num")
num = add(x,y)
Arsenii
06/01/2020, 11:01 AMclient.create_flow_run(_version_group_id_=...)
and there's a chance to screw up some important data if any user tries to run it manually. Another use case I can think of is letting certain members of a team access only specific projects. Thanks!Arsenii
06/01/2020, 11:08 AM$(aws ecr get-login --region redacted-region --registry-ids 1234567890 --no-include-email)
in order to generate an ECR docker authorization token, and login -- this is standard practice. However! This token is only valid for 12 hours, so the command above is re-run automatically.
Now the weird thing is, Prefect Agent doesn't actually "refresh" the auth token and gives me an "Your authorization token has expired" error, even though the command above was run. The only thing that helps is re-starting the Agent, at which point it starts using the new token again.
This seems to be some king of a bug, since the configuration of the docker server should not concern the Agent -- or am I missing out on some configuration?
ThanksSumant Agnihotri
06/01/2020, 12:33 PMif
inside flow like this:
def flow_func(ip):
with Flow('flow func') as flow:
task1()
if ip == True:
task2()
Howard Cornwell
06/01/2020, 2:34 PMcloud
tags throughout the documentation, but I think you’re missing one on the linked header.kelv
06/01/2020, 5:02 PMpip install
. I am curious what high-availability options there are in case the master/scheduling/coordinator(?) node fails, and where the configurations/flows are stored that one can restore from.Kesav Kolla
06/01/2020, 6:15 PMWill Milner
06/01/2020, 6:40 PMJared
06/01/2020, 6:52 PMIntervalSchedule
tasks to run 10 times (this is using Core/not on cloud). Am I missing something obvious?
When I initially register the flow, 10 runs get queued and executed, but no more. In the terminal running the server, the scheduler wakes, schedules 0 runs, and sleeps even if all 10 original runs have passed. On-demand runs in the UI still work at this point. I'll comment with what I'm doing to reproduce.Kesav Kolla
06/01/2020, 7:09 PMMatthias
06/01/2020, 8:55 PMWill Milner
06/02/2020, 3:13 PMjorwoods
06/02/2020, 5:37 PMfrom prefect import Flow, task, unmapped, Parameter
from prefect.engine.results import LocalResult
from prefect.engine.executors import LocalDaskExecutor
import prefect
lr = LocalResult(location='{flow_name}-{task_name}-{map_index}.txt')
@task(log_stdout=True, checkpoint=True,)
def add(x, y):
return x + y
with Flow('iterated map', result=lr) as flow:
y = unmapped(Parameter('y', default=10))
mapped_result = add.map([1, 2, 3], y=y)
flow.run(executor=LocalDaskExecutor())
jorwoods
06/02/2020, 9:01 PMtarget (str, optional): location to check for task Result. If a result
exists at that location then the task run will enter a cached state.
`target` strings can be templated formatting strings which will be
formatted at runtime with values from `prefect.context`
If I have a mapped task, and I want it to create separate outputs per map, I see map_index
in prefect.context
, but that relies on my arguments being in the same order each time, correct? Is there a way to pass Parameter values into this target
or the result's location kwarg such that when I look at the directory containing my checkpoint files, I quickly know which ones have completed?Crawford Collins
06/02/2020, 11:40 PMimputed_categorical_dfs
returns two objects. I need to merge these with another task which does the same.
My code which does not work.
transformed_df_map = merge_transformed_data.map(
df1=[*imputed_categorical_dfs, *encoded_df_map],
df2=[*imputed_numeric_dfs] + [*yeo_johnson_dfs],
)
Ben Davison
06/03/2020, 10:01 AMAvi A
06/03/2020, 12:20 PM[3:18pm]: Exception raised while calling state handlers: HTTPError('400 Client Error: Bad Request for url: <http://localhost:4200/graphql/alpha>')
Any idea on how to start debugging?Rodrigo Neves
06/03/2020, 1:27 PMdef run_flow_step_0_map(self, df):
cols = list(df.items())
with Flow("pipeline_map") as flow:
col = self.get_data.map(unmapped(self), cols)
col = self.task1.map(unmapped(self), col)
col = self.task2.map(unmapped(self), col)
col = self.task3.map(unmapped(self), col)
col = self.task4.map(unmapped(self), col)
col = self.task5.map(unmapped(self), col)
result = self.task5.map(unmapped(self), col)
return flow
(is confusing the explanation, if you need extra info just say it)Simone Cittadini
06/03/2020, 2:59 PMRadu
06/03/2020, 3:09 PMMary Clair Thompson
06/03/2020, 3:10 PMNelson
06/03/2020, 3:40 PMconfig.toml
to have different buckets per env:
source_data_bucket = "${environments.${environment}.source_data_bucket}"
transient_data_bucket = "${environments.${environment}.transient_data_bucket}"
[environments]
[environments.dev-nelson]
source_data_bucket = "<s3://REDACTED>"
transient_data_bucket = "<s3://REDACTED>"
[environments.prod]
source_data_bucket = "<s3://REDACTED>"
transient_data_bucket = "<s3://REDACTED>"
I can print the prefect.config.transient_data_bucket
inside a task, but when used as S3Result(bucket=prefect.config.transient_data_bucket)
it fails with Invalid bucket name ""
. How are others doing this? Note I’m providing this result as a task config
@task(
target="{date:%Y-%m-%d}/crunchbase-raw.cloudpickle",
result=S3Result(bucket=prefect.config.transient_data_bucket)
John Ramirez
06/03/2020, 3:43 PMPostgres
task or do I need to make something customDarragh
06/03/2020, 3:51 PMflow.environment = FargateTaskEnvironment(
launch_type="FARGATE",
region="eu-west-1",
cpu="256",
memory="512",
networkConfiguration={
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"subnets": ["subnet-X"],
"securityGroups": ["sg-Y"],
}
},
family="my_flow",
taskRoleArn="arn:aws:iam::X:role/Role",
executionRoleArn="arn:aws:iam::X:role/Role",
containerDefinitions={
"name": "my-flow",
"image": "my-flow",
"command": [],
"environment": [],
"essential": True,
}
)
Fargate Agent:
echo "export REGION_NAME=${CDK_DEPLOY_REGION}" > .env
echo "export AWS_DEFAULT_REGION=${CDK_DEPLOY_REGION}" >> .env
echo "export AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}" >> .env
echo "export AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}" >> .env
source .env
nohup prefect agent start fargate > prefect_fargate.log 2>&1 & disown
Everything seems fine until I try to run it. There’s no update in the agent logs to show it’s been picked up, and the UI just says:
Failed to load and execute Flow's environment: AttributeError("'str' object has no attribute 'get'")
Anything immediate/obvious/stupid stand out?
As an aside I’m guessing I’m missing a bunch of config for the agent, but the docs are a little unclear on that one, the top level of the docs [https://docs.prefect.io/orchestration/agents/fargate.html#installation] seem to suggest you only need the handful of vars I added
UPDATE: Some progress, it’s now giving a meaningful error, Parameter validation failed: Missing required parameter in containerDefinitions[0].logConfiguration: "logDriver
Fixing itMarwan Sarieddine
06/03/2020, 6:33 PMflow.run()
but flow.register()
flags an error
mainly the healthcheck throws this error - which I see referenced in the documentation … I am using prefect v0.11.4
ValueError: Mapped tasks with custom result locations must include
{filename} as a template in their location.
Please see the code below
import os
from prefect import task, Flow
from prefect.engine.results import S3Result
@task
def gen_list():
return [x for x in range(10)]
@task
def add(x, y):
return x + y
@task
def multiply(x, y):
return x * y
result = S3Result(
bucket=os.environ["AWS_BUCKET"],
location='prefect-testing/{task_name}/{map_index}.prefect'
)
with Flow('Test Flow', result=result) as flow:
x = gen_list()
y = gen_list()
added = add.map(x, y)
multiplied = multiply.map(added, added)
# flow runs fine if running locally
flow.run()
"""
flow fails to register
ValueError: Mapped tasks with custom result locations must include
{filename} as a template in their location.
"""
flow.register('Test Project')
Darragh
06/03/2020, 8:22 PMflow.environment = FargateTaskEnvironment(
launch_type="FARGATE",
region="eu-west-1",
cpu="256",
memory="512",
networkConfiguration={
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"subnets": ["subnet-X"],
"securityGroups": ["sg-Y"],
}
},
family="my_flow",
taskRoleArn="arn:aws:iam::X:role/CommonSuperRole",
executionRoleArn="arn:aws:iam::X:role/CommonSuperRole",
containerDefinitions={
"name": "my-flow",
"image": "my-flow",
"command": [],
"environment": [],
"essential": True,
}
)
I keep getting this error:
An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Fargate requires task definition to have execution role ARN to support log driver awslogs
So my questions :
• I’m using the same Uber Role for both the taskRoleArn and the executionRoleArn, probably not best practice but should work?
• I’ve thrown every possible log and cloudwatch related policy/permission at it that I can think of, but nothing is taking. I can provide a dump of the permissions if need be?
Any help massively appreciated, fairly stumped on it. Is there any way to get more debug info out of it?Marwan Sarieddine
06/03/2020, 9:09 PMtarget
or Result.location
when mapped tasks are involved and deploying on a dask cluster - I placed reproducible examples on [github](https://github.com/PrefectHQ/prefect/issues/2716) - would reallyappreciate it if someone can let me know what I am missingAlister Lee
06/04/2020, 12:17 AMAvi A
06/04/2020, 9:18 AMtimeout
argument. This causes all tasks of this type to not report their completion! I see on the agent itself that their execution is done, but there’s no call to the state handler to change state to complete, or to write the task result, as is normally the case with the tasks.
Here’s the gist of my task definition; when I remove the line setting kwargs['timeout']
everything works.
TIMEOUT = 10. # minutes
class MyTask(Task):
def __init__(
self,
param1 = None,
param2 = None,
cache: bool = False,
**kwargs
):
self.cache = cache
if 'max_retries' not in kwargs and 'retry_delay' not in kwargs:
kwargs['max_retries'] = DEFAULT_MAX_RETRIES
kwargs['retry_delay'] = timedelta(minutes=DEFAULT_RETRY_DELAY)
kwargs['timeout'] = TIMEOUT * 60 # If I remove this line, everything works great
super().__init__(**kwargs)
def run(self, param1=None, param2=None):
# Code!
I fear it’s a bug in prefect but not sure. Would love to get some input before I open an issue