Adam Brusselback
03/09/2021, 11:12 PMTrevor Kramer
03/10/2021, 12:15 AMn_estimators = Parameter('n_estimators', default=500)
BatchSubmit()(..., batch_kwargs={
'parameters': {'blah', n_estimators}})
The issue is that batch requires the values for parameters be strings but I want the input Parameters to be integers. Is there a way to convert a Parameter from one type to another? Should I have a task that takes the parameter and returns the string version?Hui Zheng
03/10/2021, 12:29 AMretries=3
for a task_A, we want to retry task_A for up to 3 times when it fails due to some network connection issue or intermittent failures. Meanwhile, however, we have the logic in the task_A
that explicitly and intentionally raise FAIL_A
signal for downstream tasks. That FAIL_A
is not meant to be re-tried by task_A
. When task_A
encounters this intentional FAIL_A
, it shall skip retries and return it directly. How could I do that?Carl
03/10/2021, 4:20 AMwith Flow('MyFlow') as flow:
do_thing = Parameter('do_think', default=False, required=False)
data = get_data()
with case(do_thing, True):
data = do_thing_task(data)
data = do_another_thing(data) # This gets skipped when do_thing = False
Soren Daugaard
03/10/2021, 1:03 PMdelete_artifact
method here: https://docs.prefect.io/api/latest/artifacts/artifacts.html#functions but it is not clear to me how I would obtain the task_run_artifact_id
to use it?Vitalik
03/10/2021, 1:40 PMliren zhang
03/10/2021, 1:44 PMFailed to load and execute Flow's environment: NameError("name 'Local' is not defined")
I am not entirely sure where the name normal is from
Here is my sample code for reference:
from prefect import task, Flow
from prefect.run_configs import DockerRun
from prefect.storage.github import GitHub
@task
def say_hello():
print("Hello, world!")
with Flow(name="My first flow with Docker agent", storage=GitHub(repo="bbbb/aaaa-bdp", path="/PREFECT/hello_world_github.py", access_token_secret="GIT_ACCESS_TOKEN")) as flow:
say_hello=say_hello()
flow.run_config=DockerRun(labels=['<http://prefect.aaaa.com|prefect.aaaa.com>'])
#flow.run()
flow.register("first_prefect_project")
Adam Brusselback
03/10/2021, 3:12 PMSamuel Hinton
03/10/2021, 3:34 PMSteve Aby
03/10/2021, 3:57 PMAaron Goodrich
03/10/2021, 8:35 PMflow.storage = Docker(
path="my_flow.py",
files={"/source/of/my_flow.py": "my_flow.py"},
stored_as_script=True
)
but I don't see how I can get, say, my dynamically generated files back out. Any suggestions?ale
03/10/2021, 10:16 PMCharles Liu
03/10/2021, 11:22 PMflow.run_config = KubernetesRun(image="example/image-name:with-tag")
be a remote image in a private repo?Matthew Blau
03/11/2021, 12:45 AMMaria
03/11/2021, 4:53 AMCA Lee
03/11/2021, 9:11 AM0.14.12
Running into this error when attempting to run a flow using ECS agent and ECSRun:
botocore.errorfactory.InvalidParameterException: An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
I have a working config for prefect agent that executes the flow without errors. However, this involves creating a `task-definitions.yaml`:
prefect agent ecs start -t token \
-n aws-ecs-agent \
-l label \
--task-definition /path/to/task-definition.yaml \
--cluster cluster_arn
task-definitions.yaml
networkMode: awsvpc
cpu: 1024
memory: 2048
taskRoleArn: task_role_arn
executionRoleArn: execution_role_arn
The flow runs without errors, so the error is not due to IAM permissions.
However, when running the ECS Agent using the --task-role-arn
and --execution-role-arn
CLI args, I run into the above-mentioned error. I have also tried running Prefect agent using --launch-type FARGATE
, which I believe is the default and does not need to be specified, but this does not work too.
prefect agent ecs start -t token \
-n aws-ecs-agent \
-l ecs \
--task-role-arn task_role_arn \
--execution-role-arn execution_role_arn \
--cluster cluster_arn
I have also tried to pass in task_role_arn
and execution_role_arn
into the ECSRun() function within my flow, and ran into the same error.
Is there any way to run ECS Agent using CLI args without using the task-definition file?Chris Smith
03/11/2021, 12:45 PMMatthew Blau
03/11/2021, 2:32 PMJustin
03/11/2021, 2:59 PMhaf
03/11/2021, 3:58 PMUnexpected error while running flow: KeyError('Task slug resolve_profiles_dir-1 not found in the current Flow; this is usually caused by changing the Flow without reregistering it with the Prefect API.')
— I set up all agents/cli:s two days ago at their latest versions, and the agent is a k8s one. I'm using Prefect cloud. This happens when I register the flow as such and then run it from the UI (I verified the UI archives the old version and I'm triggering a run of the updated version)
#!/usr/bin/env python
from logging import getLogger
from datetime import timedelta
from os import getenv
from pathlib import Path
from pendulum import today
from prefect.engine.state import Failed
from prefect.schedules import IntervalSchedule
from prefect.storage import Docker
from prefect.utilities.notifications import slack_notifier
from prefect.utilities.storage import extract_flow_from_file
logger = getLogger("dbt.deploy")
with open('requirements.txt') as file:
packages = list(line.strip() for line in file.readlines())
docker = Docker(
registry_url="europe-docker.pkg.dev/projecthere/cd",
# dockerfile='Dockerfile', # Uncomment to use own Dockerfile with e.g. dependencies installed
image_name="analytics-dbt",
image_tag="0.14.11",
python_dependencies=packages
)
slack = slack_notifier(
only_states=[Failed],
webhook_secret='SLACK_WEBHOOK_URL')
every_hour = IntervalSchedule(
start_date=today('utc'),
interval=timedelta(hours=1))
flows = sorted(Path('flows').glob('*.py'))
# Add flows
for file in flows:
flow = extract_flow_from_file(file_path=file)
<http://logger.info|logger.info>('Extracted flow from file before build')
docker.add_flow(flow)
# Build storage with all flows
docker = docker.build()
# Update storage in flows and register
for file in flows:
flow = extract_flow_from_file(file_path=file)
<http://logger.info|logger.info>('Extracted flow from file after build')
flow.storage = docker
flow.state_handlers.append(slack)
flow.schedule = every_hour
<http://logger.info|logger.info>('Registering...')
flow.register(
project_name='dbt',
build=False,
labels=['prod'],
idempotency_key=flow.serialized_hash(),
)
Jack Sundberg
03/11/2021, 4:17 PMJustin Chavez
03/11/2021, 5:21 PMhaf
03/11/2021, 5:54 PMJohn Grubb
03/11/2021, 6:27 PMCharles Liu
03/11/2021, 7:57 PMFrederick Thomas
03/11/2021, 10:31 PMAlex Welch
03/12/2021, 4:03 AMECS Agent
. I get the below error on my flow run. I have checked, and found, the 3.8.5
version (it was the docker container I built the flow in), but I can not determine where the 3.7.10
version is. I looked in the EC2
instance that I set up and launched the agent from and it is at 3.8.7
. Where else is Prefect running the flow that may have the 3.7
Python version and is there any way around this other than upgrading that to 3.8
?Akash Rai
03/12/2021, 4:40 AMJacob Blanco
03/12/2021, 7:23 AMFlorian Kühnlenz
03/12/2021, 10:31 AMFlorian Kühnlenz
03/12/2021, 10:31 AMMichael Adkins
03/12/2021, 2:57 PMDockerRun
config and then store your flow on S3
or Github
Florian Kühnlenz
03/12/2021, 3:01 PMMichael Adkins
03/12/2021, 3:02 PMDockerStorage
and prefect will build the image at registration time and package your flow into itFlorian Kühnlenz
03/12/2021, 3:03 PMMichael Adkins
03/12/2021, 3:04 PMFlorian Kühnlenz
03/12/2021, 3:05 PMMichael Adkins
03/12/2021, 3:36 PMFlorian Kühnlenz
03/12/2021, 5:11 PMMichael Adkins
03/12/2021, 5:22 PMFlorian Kühnlenz
03/12/2021, 5:26 PMFailed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
Michael Adkins
03/12/2021, 8:31 PMfrom prefect import Flow, task
from prefect.storage import Local
from prefect.run_configs import DockerRun
@task(log_stdout=True)
def say_hello():
print("Hello world")
with Flow("docker-storage-example") as flow:
say_hello()
flow.storage = Local(stored_as_script=True, path="/flow.py", add_default_labels=False)
flow.run_config = DockerRun(image="image-with-flow:latest")
if __name__ == "__main__":
flow.register("default")
Here's the minimal Dockerfile
FROM prefecthq/prefect:latest
ADD test-docker-storage.py /flow.py
--tag image-with-flow:latest
Florian Kühnlenz
03/12/2021, 8:53 PMMichael Adkins
03/12/2021, 8:58 PMDocker
without building DockerStorage
at registration time"Marvin
03/12/2021, 8:58 PM