Vincent
01/11/2021, 11:47 PMPhilip MacMenamin
01/12/2021, 1:10 AM@task(trigger=prefect.triggers.always_run) # one new line
def tear_down_cluster(cluster):
if all_successful:
print(":)")
else:
print(":-/")
Amanda Wee
01/12/2021, 4:29 AMs3-flow/<slugified-current-timestamp>
." over here:
https://docs.prefect.io/orchestration/flow_config/storage.html#aws-s3
and also: "If this key is not provided the Flow upload name will take the form slugified-flow-name/slugified-current-timestamp
." over here:
https://docs.prefect.io/api/latest/storage.html#s3
It turns out that the latter is correct (which was not what I expected grr)Klemen Strojan
01/12/2021, 8:55 AMSven Teresniak
01/12/2021, 9:14 AMSven Teresniak
01/12/2021, 9:15 AMTim Pörtner
01/12/2021, 1:18 PMwith Flow("test-flow") as flow:
tasks_to_run = {
(here are information on which tasks should be added to the flow)
}
parameter_tasks_to_run = Parameter("tasks_to_run", default=tasks_to_run)
Can i force it so that this Parameter shows up on the web frontend without actually using the parameter in a task within the flow?Adam
01/12/2021, 1:21 PMThis tenant already has the maximum number of flows
. We have 25 flows in 1 project - I don’t find any reference to an upper limit on flows on the pricing page? Any help would be much appreciatedDolor Oculus
01/12/2021, 2:08 PMDocker
flow storage with base_image
, and need to add a file to the image which can only be determined at registration time (flow.storage assignment). I've tried various combinations of build_kwargs
and extra_dockerfile_commands
but am not having any luck. Would appreciate any feedback or pointers!
my_new_file = "/valid/path/to/file.json"
flow.storage = Docker(
base_image=valid_image_name,
local_image=True,
ignore_healthchecks=True,
build_kwargs={'my_new_file': my_new_file},
extra_dockerfile_commands=['ARG my_new_file',
'COPY $my_new_file .'])
Once it gets to the COPY command I get the error
COPY failed: stat /data/docker/tmp/docker-builder092360870/valid/path/to/file.json: no such file or directory
I've also tried just the basename of the file as input into the build_kwargs, with no avail. How do I get a file into that temporary docker builder dir? CheersSean Talia
01/12/2021, 5:33 PMwith Flow("test-flow") as flow:
if "label1" in flow.labels:
function1()
elif "label2" in flow.labels:
function2()
Zach
01/12/2021, 6:13 PMAlex Rud
01/12/2021, 6:21 PMAlex Rud
01/12/2021, 6:26 PMmanual_only
trigger? I'm trying to do the following: read file into dataframe -> run GreatExpectations validation on dataframe -> if validation passes continue with next task, if not allow the next task to run when manually triggeredVincent
01/12/2021, 6:42 PMArnulfo Soriano
01/12/2021, 7:50 PMDaskKubernetesEnvironment(metadata={'image': 'someImage'},
worker_spec_file=os.path.join(os.path.dirname(os.path.abspath(__file__)),
f'./{worker_spec}'),
min_workers=min_workers,
max_workers=max_workers,
labels=['DASK'])
However when using DaskExecutor:
cluster = KubeCluster(make_pod_spec(image = 'someImage')).from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
#somethings i've tried
#cluster = KubeCluster.from_yaml(os.path.join(os.path.dirname(os.path.abspath(__file__)),f'{worker_spec}'))
DaskExecutor(cluster_class=lambda: cluster,adapt_kwargs={"minimum": min_workers, "maximum": max_workers},)
I keep getting on the registering step:
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/core.py", line 512, in _start
await ClusterAuth.load_first(self.auth)
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 85, in load_first
raise auth_exc
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 71, in load_first
await auth_instance.load()
File "/usr/local/lib/python3.8/site-packages/dask_kubernetes/auth.py", line 125, in load
kubernetes.config.load_kube_config(
File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 661, in load_kube_config
loader = _get_kube_config_loader_for_yaml_file(
File "/usr/local/lib/python3.8/site-packages/kubernetes/config/kube_config.py", line 624, in _get_kube_config_loader_for_yaml_file
raise ConfigException(
kubernetes.config.config_exception.ConfigException: Invalid kube-config file. No configuration found.
john Pedersen
01/12/2021, 7:54 PMAkshay Deodhar
01/12/2021, 10:23 PMmatta
01/13/2021, 1:23 AMM Taufik
01/13/2021, 4:22 AMKrishna Devenda
01/13/2021, 9:47 AMconfig.toml
by context provided in prefect UI's context section while running a flow?
I am facing problem with updating/overwriting context values from prefect UI. I have defined a user configuration file for the agent at
~/.prefect/config.toml
with following syntax
[context]
postgresdb = "production"
Now, I want to update this attribute postgredb
from the UI by passing context with same name and different value like following
{
"postgresdb": "test"
}
When I obtain this context name in task, it gives me the value from config.toml
not the updated value from the context provided in the UI
print(prefect.context.get('postgresdb'))
>>>production
Can someone guide me through this? Please.
I have read the documentation multiple times and have also gone through the prefect source code in an attempt to understand how contexts get merged but I could not understand it. From the code, it appears that whatever context is passed through **kwargs
overwrites the context in the config.toml
. I believe prefect UI is passing context via **kwargs
only.Eric
01/13/2021, 10:16 AMArsenii
01/13/2021, 11:57 AMflow.serialized_hash
to flow.register
as the `idempotency_key`" way of registering flows is very useful for CI pipelines, however it's not really viable if you have to spend a lot of time building Docker
flow storage object before calling flow.register()
.
Can anyone please share a quick way of getting idempotency key for a specific flow name through GraphQL, if there is one?
I am thinking of looping through all the available flows, and using the serialized hash to automatically filter out changed flows. Right now we are using a clunky gitpython
solution that basically does git diff
on flow source files between tagged releases on a Github repo.
Thank you.Slackbot
01/13/2021, 4:50 PMDane Gardner
01/13/2021, 4:51 PMStorage
classes seems to only be able to reference a single file, not entire modules/directories. Are there any examples of a somewhat complicated suite of flows with code shared between them?Felix Schran
01/13/2021, 6:02 PM@task(
_name_="Extract",
_checkpoint_=True,
_result_=LocalResult(cache_dir),
_target_="{task_name}--{today}"
)
_def_ extract(x):
return 1
@task(
_name_="Transform",
_checkpoint_=True,
_result_=LocalResult(cache_dir),
_target_="{task_name}--{today}"
)
_def_ transform(x):
return x * 2
Caching works with this example. With each run of the flow, prefect looks for a file "transform--2021-13-01" and if it exists, it uses the cached result.
I want to add the following features:
1. eWhenever, I change the source code of theWhenever I change the source code of the extract
task (say to return 20
instead of return 1
) I obviously don't want to use the cached result (i.e. 1
) as an input for the next task. Instead I want to make extract
recompute whenever I change the content of extract
. How can I do that?
2. When the result of an upstream DAG changes, I want to execute the DAG which follows from that point downwards (although the downstream tasks might also be tasks with a cache). For instance, in this example, I want transform
to take the new input of 20 and compute again with that input (although the result is already cached).Marc Lipoff
01/13/2021, 8:22 PMRunGreatExpectationsValidation
tasks, whats the best way to grab the validation results, and log them? It doesn't appear I can do it with a state handler.Joseph
01/13/2021, 8:24 PMVenkatesh Tadinada
01/13/2021, 10:13 PMmatta
01/14/2021, 12:41 AMRiley Hun
01/14/2021, 2:46 AMAttributeError: 'FunctionTask' object has no attribute task_run_name. Did you call this object within a function that should have been decorated with @prefect.task?
I've encountered this error before and usually it's due to version conflicts. This time, I can confirm that all my prefect versions are consistent between my local machine and docker images. I also submitted the flow using the same prefect version 0.13.18
.
My prefect server is hosted on kubernetes, and I'm submitting my flow using docker storage from my local machine. When I bash into my docker container, I can see that the flow is indeed there.
I'm able to run the flow against the Dask Cluster no problem.