Peter Roelants
01/11/2021, 7:20 PMflow.register
the creation and registration need to happen in the same call. Is there an example somewhere on how to decouple these steps?
For example how to create and store a Docker build artefact that encapsulate a flow, and running/registering the flow stored in the Docker artefact at a later time without access to the original flow file.Joël Luijmes
01/11/2021, 7:33 PMJeremy Phelps
01/11/2021, 10:16 PMgoogle.cloud
Python library does Prefect depend on?Jeremy Phelps
01/11/2021, 11:10 PMFailed to load and execute Flow's environment: DefaultCredentialsError('Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see <https://cloud.google.com/docs/authentication/getting-started>')
Vincent
01/11/2021, 11:47 PMPhilip MacMenamin
01/12/2021, 1:10 AM@task(trigger=prefect.triggers.always_run) # one new line
def tear_down_cluster(cluster):
if all_successful:
print(":)")
else:
print(":-/")
Amanda Wee
01/12/2021, 4:29 AMs3-flow/<slugified-current-timestamp>
." over here:
https://docs.prefect.io/orchestration/flow_config/storage.html#aws-s3
and also: "If this key is not provided the Flow upload name will take the form slugified-flow-name/slugified-current-timestamp
." over here:
https://docs.prefect.io/api/latest/storage.html#s3
It turns out that the latter is correct (which was not what I expected grr)Klemen Strojan
01/12/2021, 8:55 AMSven Teresniak
01/12/2021, 9:14 AMSven Teresniak
01/12/2021, 9:15 AMTim Pörtner
01/12/2021, 1:18 PMwith Flow("test-flow") as flow:
tasks_to_run = {
(here are information on which tasks should be added to the flow)
}
parameter_tasks_to_run = Parameter("tasks_to_run", default=tasks_to_run)
Can i force it so that this Parameter shows up on the web frontend without actually using the parameter in a task within the flow?Adam
01/12/2021, 1:21 PMThis tenant already has the maximum number of flows
. We have 25 flows in 1 project - I don’t find any reference to an upper limit on flows on the pricing page? Any help would be much appreciatedDolor Oculus
01/12/2021, 2:08 PMDocker
flow storage with base_image
, and need to add a file to the image which can only be determined at registration time (flow.storage assignment). I've tried various combinations of build_kwargs
and extra_dockerfile_commands
but am not having any luck. Would appreciate any feedback or pointers!
my_new_file = "/valid/path/to/file.json"
flow.storage = Docker(
base_image=valid_image_name,
local_image=True,
ignore_healthchecks=True,
build_kwargs={'my_new_file': my_new_file},
extra_dockerfile_commands=['ARG my_new_file',
'COPY $my_new_file .'])
Once it gets to the COPY command I get the error
COPY failed: stat /data/docker/tmp/docker-builder092360870/valid/path/to/file.json: no such file or directory
I've also tried just the basename of the file as input into the build_kwargs, with no avail. How do I get a file into that temporary docker builder dir? CheersSean Talia
01/12/2021, 5:33 PMwith Flow("test-flow") as flow:
if "label1" in flow.labels:
function1()
elif "label2" in flow.labels:
function2()
Zach
01/12/2021, 6:13 PMAlex Rud
01/12/2021, 6:21 PMAlex Rud
01/12/2021, 6:26 PMmanual_only
trigger? I'm trying to do the following: read file into dataframe -> run GreatExpectations validation on dataframe -> if validation passes continue with next task, if not allow the next task to run when manually triggeredVincent
01/12/2021, 6:42 PMArnulfo Soriano
01/12/2021, 7:50 PMDaskKubernetesEnvironment(metadata={'image': 'someImage'},
worker_spec_file=os.path.join(os.path.dirname(os.path.abspath(__file__)),
f'./{worker_spec}'),
min_workers=min_workers,
max_workers=max_workers,
labels=['DASK'])
However when using DaskExecutor:
cluster = KubeCluster(make_pod_spec(image = 'someImage')).from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
#somethings i've tried
#cluster = KubeCluster.from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
DaskExecutor(cluster_class=lambda: cluster,adapt_kwargs={"minimum": min_workers, "maximum": max_workers},)
I keep getting on the registering step:
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/core.py", line 512, in _start
await ClusterAuth.load_first(self.auth)
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 85, in load_first
raise auth_exc
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 71, in load_first
await auth_instance.load()
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 125, in load
kubernetes.config.load_kube_config(
File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 661, in load_kube_config
loader = _get_kube_config_loader_for_yaml_file(
File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 624, in _get_kube_config_loader_for_yaml_file
raise ConfigException(
kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
john Pedersen
01/12/2021, 7:54 PMAkshay Deodhar
01/12/2021, 10:23 PMmatta
01/13/2021, 1:23 AMM Taufik
01/13/2021, 4:22 AMKrishna Devenda
01/13/2021, 9:47 AMconfig.toml
by context provided in prefect UI's context section while running a flow?
I am facing problem with updating/overwriting context values from prefect UI. I have defined a user configuration file for the agent at
~/.prefect/config.toml
with following syntax
[context]
postgresdb = "production"
Now, I want to update this attribute postgredb
from the UI by passing context with same name and different value like following
{
"postgresdb": "test"
}
When I obtain this context name in task, it gives me the value from config.toml
not the updated value from the context provided in the UI
print(prefect.context.get('postgresdb'))
>>>production
Can someone guide me through this? Please.
I have read the documentation multiple times and have also gone through the prefect source code in an attempt to understand how contexts get merged but I could not understand it. From the code, it appears that whatever context is passed through **kwargs
overwrites the context in the config.toml
. I believe prefect UI is passing context via **kwargs
only.Eric
01/13/2021, 10:16 AMArsenii
01/13/2021, 11:57 AMflow.serialized_hash
to flow.register
as the `idempotency_key`" way of registering flows is very useful for CI pipelines, however it's not really viable if you have to spend a lot of time building Docker
flow storage object before calling flow.register()
.
Can anyone please share a quick way of getting idempotency key for a specific flow name through GraphQL, if there is one?
I am thinking of looping through all the available flows, and using the serialized hash to automatically filter out changed flows. Right now we are using a clunky gitpython
solution that basically does git diff
on flow source files between tagged releases on a Github repo.
Thank you.Slackbot
01/13/2021, 4:50 PMDane Gardner
01/13/2021, 4:51 PMStorage
classes seems to only be able to reference a single file, not entire modules/directories. Are there any examples of a somewhat complicated suite of flows with code shared between them?Felix Schran
01/13/2021, 6:02 PM@task(
_name_="Extract",
_checkpoint_=True,
_result_=LocalResult(cache_dir),
_target_="{task_name}--{today}"
)
_def_ extract(x):
return 1
@task(
_name_="Transform",
_checkpoint_=True,
_result_=LocalResult(cache_dir),
_target_="{task_name}--{today}"
)
_def_ transform(x):
return x * 2
Caching works with this example. With each run of the flow, prefect looks for a file "transform--2021-13-01" and if it exists, it uses the cached result.
I want to add the following features:
1. eWhenever, I change the source code of theWhenever I change the source code of the extract
task (say to return 20
instead of return 1
) I obviously don't want to use the cached result (i.e. 1
) as an input for the next task. Instead I want to make extract
recompute whenever I change the content of extract
. How can I do that?
2. When the result of an upstream DAG changes, I want to execute the DAG which follows from that point downwards (although the downstream tasks might also be tasks with a cache). For instance, in this example, I want transform
to take the new input of 20 and compute again with that input (although the result is already cached).Marc Lipoff
01/13/2021, 8:22 PMRunGreatExpectationsValidation
tasks, whats the best way to grab the validation results, and log them? It doesn't appear I can do it with a state handler.Marc Lipoff
01/13/2021, 8:22 PMRunGreatExpectationsValidation
tasks, whats the best way to grab the validation results, and log them? It doesn't appear I can do it with a state handler.Michael Adkins
01/13/2021, 9:12 PMcase
https://docs.prefect.io/api/latest/tasks/control_flow.html#case ?Marc Lipoff
01/13/2021, 9:13 PMRunGreatExpectationsValidation
raises a signal.FAIL. How do I deal with that gracefully and branch (via case or something else)?Michael Adkins
01/13/2021, 9:18 PMMarc Lipoff
01/13/2021, 9:18 PMMichael Adkins
01/13/2021, 9:21 PMimport prefect
PRETEND_VALIDATION_PASSED = True
def handle_failed_validation(task, old_state, new_state):
if isinstance(new_state, prefect.engine.state.ValidationFailed):
return prefect.engine.state.Success(result=new_state.result)
return new_state
@prefect.task(state_handlers=[handle_failed_validation])
def fail_task():
class FakeResult:
success: bool = PRETEND_VALIDATION_PASSED
if PRETEND_VALIDATION_PASSED:
return FakeResult()
raise prefect.engine.signals.VALIDATIONFAIL(result=FakeResult())
@prefect.task()
def check_ge_result(result):
return result.success
@prefect.task(log_stdout=True)
def log(message):
print(message)
with prefect.Flow("test") as flow:
result = fail_task()
validation_passed = check_ge_result(result)
with prefect.case(validation_passed, True):
log("Handle validation success using result")
with prefect.case(validation_passed, False):
log("Validation failed, log it or whatever")
flow.run()
Marc Lipoff
01/13/2021, 9:30 PM