Hawkar Mahmod
03/04/2021, 3:00 PMDocker
storage), or use the default configured on the agent.
and I am using Docker Storage, which has worked in the past and does get stored in my image repository (ECR). It seems as though the specfiying an explicit task definition, either through the task_definition_path
or task_definition
arguments and using Docker Storage does not work, or there is a bug in the image being picked up from Storage.
Any guidance would be welcome.Jack Sundberg
03/04/2021, 4:01 PMMarwan Sarieddine
03/04/2021, 4:28 PMMarwan Sarieddine
03/04/2021, 4:53 PM0.14.11
that made the Flow object no longer pickleable using pickle
- I am wondering if others have bumped into thisJitesh Khandelwal
03/04/2021, 5:52 PMFelipe Saldana
03/04/2021, 7:40 PMaurora_user_val = EnvVarSecret("AURORA_USERNAME", raise_if_missing=True).run()
aurora_pass_val = EnvVarSecret("AURORA_PASSWORD", raise_if_missing=True).run()
aurora_host_val = EnvVarSecret("AURORA_HOST", raise_if_missing=True).run()
with Flow("test_vars") as flow:
Samuel Hinton
03/04/2021, 8:18 PM@task(timeout=20)
, but they just completed successfully with a normal @task
annotation. Will update prefect now and check to see if that helpsScott Moreland
03/04/2021, 9:16 PM@cache(subsample=100, sdf_key='sdf_large')
@task
def some_large_spark_dataframe():
"intensive ETL process here"
...
return sdf
@task
def downstream_task(sdf_large):
"some intensive computation on sdf_large"
...
return sdf
with Flow() as flow:
sdf_large_sample = read_from_cache('sdf_large')
downstream_task(sdf_large_sample)
...but I've had difficulty stacking decorators with the task
decorator. Moreover, the usual challenges of the task result not being available until evaluation time. Any recipes you'd recommend?Daniel Ahn
03/04/2021, 10:38 PMupstream_tasks
, it would cause the parent flow to run multiple times.
I think what I need is like the factory pattern in OO paradigm. In Airflow, I think I can use ExternalTaskSensor to achieve this.
- Does my question/use case make sense?
- Is there a preferred "Prefect" way to solve this?
- Or should I keep the state of the parent job externally?Trevor Kramer
03/05/2021, 1:39 AMJack Sundberg
03/05/2021, 2:22 AMfuture = CustomExecutor(flow.run, *my_args, **my_kwargs)
working just fine. The issue comes when I try flow.executor = CustomExecutor(); state = flow.run()
. When I do this with, I recieve...
Unexpected error occured in FlowRunner: AttributeError("'CustomFuture' object has no attribute 'is_mapped'")
I read through the FlowRunner code and have a good guess as to what's happening, but I could use some insight from a dev. Has anyone seen this before?Thomas Hoeck
03/05/2021, 10:05 AMfrom prefect import task, Flow, unmapped
import requests
def task_that_goes_stale():
token = code_to_get_token_at_some_cost()
return token
def task_using_token(user,token):
r = requests.get('<https://api.github.com/user/{user}>', headers = {'TOKEN': token})
with Flow("my_flow") as my_flow:
all_users = [i for i in range(1000)]
token = task_that_goes_stale()
task_using_token.map(all_users,unmapped(token))
I know i could just get a new token for each call but this seems like the wrong way to do it. Is there a Prefect way to do it? I can see that there is output caching but as I read it this is meant for bringing results between flows https://docs.prefect.io/core/concepts/persistence.html#output-cachingMoritz Rocholl
03/05/2021, 12:34 PMwith Flow("A Flow") as my_flow:
task_res = my_task()
task.run()
followed by:
latest_flow_state = my_flow.last_successful_run
latest_flow_state.tasks[task_res]
Kieran
03/05/2021, 2:45 PM0.13.18
. Has anyone had this before -- all of a sudden a large row of cancelled flows appear in the UI?
(these flows run using the CronSchedule)Berty
03/05/2021, 3:24 PM$ prefect server start
ERROR: The Compose file './docker-compose.yml' is invalid because:
services.ui.depends_on contains an invalid type, it should be an array
services.apollo.depends_on contains an invalid type, it should be an array
services.towel.depends_on contains an invalid type, it should be an array
services.graphql.depends_on contains an invalid type, it should be an array
services.hasura.depends_on contains an invalid type, it should be an array
Exception caught; killing services (press ctrl-C to force)
ERROR: The Compose file './docker-compose.yml' is invalid because:
services.ui.depends_on contains an invalid type, it should be an array
services.apollo.depends_on contains an invalid type, it should be an array
services.towel.depends_on contains an invalid type, it should be an array
services.graphql.depends_on contains an invalid type, it should be an array
services.hasura.depends_on contains an invalid type, it should be an array
Kao Phetchareun
03/05/2021, 5:48 PMemail_to_cc
only allow for sending one email for cc?Daniel Ahn
03/05/2021, 6:16 PMShellTask
, but it needs the location of the file i'm submitting with. Right now, I've hardcoded the location in my file, and switching to that location with helper_script
option on ShellTask
Alex Welch
03/05/2021, 8:20 PMGITHUB Storage
if you are referencing an outside file that is in the same repository, is the flow able to find and run it?CA Lee
03/06/2021, 9:18 AMprefect agent kubernetes install -t token_value -l label -e aws_access_key_id=XXX -e aws_secret_access_key=YYY --rbac | kubectl apply -f -
The agent is installed and running in the kube cluster on inspection using kubectl get pods
NAME READY STATUS RESTARTS AGE
prefect-agent-58cf5b46d5-84hh6 1/1 Running 0 26m
Running the flow in Prefect server, and the flow gets picked up by the agent, but encountering the error: Failed to load and execute Flow's environment: NoCredentialsError('Unable to locate credentials')
This is despite the agent having knowledge of the S3 creds via the -e flags. I have also tried passing the S3 creds as a dict to KubernetesRun inside my flow:
KubernetesRun(
image="my_image",
labels=["my_labels"],
env={
"aws_access_key_id": "xxx",
"aws_secret_access_key": "yyy"
}
)
Am I missing something for my kube cluster to authenticate to S3?CA Lee
03/06/2021, 10:14 AMPod prefect-job-7b9f3bf6-z4p27 failed.
Container 'flow' state: terminated
Exit Code:: 1
Reason: Error
Alex Welch
03/07/2021, 6:40 AMDocker Storage
with a ECSRun Config
. What I am looking to make happen is to have the github repo cloned to the docker container so that my flow has access to various files (jupyter notebooks primarily).
I have been trying solutions for a number of days and I am currently stuck on
Error while fetching server API version: {0}'.format(e)
docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or direct
This would indicate that my prefect image does not have access to the docker deamon. But I can’t figure out what I am doing wrong.
I have prefect backend cloud
set. And below are my files.Jay Vercellone
03/07/2021, 11:40 PMset_schedule_active
parameter that can be specified to the .register()
call in order to automatically enable or disable the scheduling of the new flow version.
These values can be either True
(enable the scheduling) or False
(disable the scheduling), but there's no option to "leave it as it is right now", meaning that if the flow is active/inactive, the new registration should leave the new workflow as active/inactive as well.
What's the best approach in this scenario? I want to avoid 2 potentially dangerous situations:
- Accidentally enabling a flow that should remain disabled
- Accidentally disabling a flow that should remain enabled
Avoiding any specification in the code would be ideal, since we don't want to enable or disable flows using the code, but rather the UI/API.
Thanks!John
03/08/2021, 1:25 AMprev_execution_date_success
variable).
in reading the docs, caching doesn’t quite seem like the right fit. persisting output makes sense, but not clear to me the right way to re-consume that output on a future flow run. so at this point my inclination is to simply read/write the last run timestamp to a local file.
anyone have a recipe, docs, or otherwise on a better approach?Alfie
03/08/2021, 8:33 AMAlfie
03/08/2021, 8:33 AMAvi A
03/08/2021, 10:28 AMHawkar Mahmod
03/08/2021, 4:31 PM/opt/prefect/healthcheck.py
Trevor Kramer
03/08/2021, 4:47 PMAndor Tóth
03/08/2021, 5:09 PMyesterday
in context?
Like this:
with prefect.context(yesterday="2021-01-01"):
flow.run(run_on_schedule=False, executor=LocalExecutor())
Andor Tóth
03/08/2021, 5:11 PM