Tim Enders
01/24/2022, 7:10 PMKeith Veleba
01/24/2022, 7:39 PMMuddassir Shaikh
01/25/2022, 6:24 AMEmma Rizzi
01/25/2022, 9:11 AMAndreas Tsangarides
01/25/2022, 10:55 AMLocalResult
and S3Result
?Vincent Chéry
01/25/2022, 11:07 AMwith Flow("my-flow") as f:
a = make_a()
b = make_b()
c = a + b
• Do make_a
and make_b
need to be tasks or can they be regular functions ?
• In particular, when using builtin functions (for instance, datetime.utcnow()
), can we use it directly or must we wrap it in a task ?
• Is c = a + b
allowed ?
Thanks a lot !Jons Cyriac
01/25/2022, 1:23 PMPREFECT__CONTEXT__SECRETS__DB_URI
on the agent. Is this supposed to be available as prefect.context.secrets["DB_URI"] when the flow is run on dask worker without any other setup?Jean-Baptiste Six
01/25/2022, 2:25 PMfrom dotenv import load_dotenv
load_dotenv()
@task()
def add(x, y=1):
return x + y
with Flow("Test GCSResult", result=GCSResult(bucket='thread-prefect-flows')) as GCSResult_flow:
first_result = add(1, y=2)
second_result = add(x=first_result, y=100)
Without success, however :
from dotenv import load_dotenv
load_dotenv()
def implicit():
from google.cloud import storage
# If you don't specify credentials when constructing the client, the
# client library will look for credentials in the environment.
storage_client = storage.Client()
# Make an authenticated API request
buckets = list(storage_client.list_buckets())
print(buckets)
implicit()
works well, then I'm not sure to understand : "_Make sure your Prefect installation can authenticate to Google's Cloud API_" (https://docs.prefect.io/core/advanced_tutorials/using-results.html#example-running-a-flow-with-gcsresult)
Then, I tried to use Local Secret (https://docs.prefect.io/core/advanced_tutorials/using-results.html#example-running-a-flow-with-gcsresult)
[context.secrets]
GOOGLE_APPLICATION_CREDENTIALS="auth/token.json"
GCP_CREDENTIALS="auth/token.json"
(don't know what to use between GCP_CREDENTIALS (prefect.utiliities.gcp) or GOOGLE_APPLICATION_CREDENTIALS (in the documentation of GCP), anyway I tried both and it didn't work neither.
What did I do wrong?
Please help 🙏Mukamisha jocelyne
01/25/2022, 2:36 PMSam Werbalowsky
01/25/2022, 4:57 PMMy Flow - today - today
rather than creating the date range
start_date = Parameter("start_date", default="today")
end_date = Parameter("end_date", default="today")
# this function handles parameter typing and converts today to YYYY-MM-DD
start_date_str = get_date_param_as_string.run(start_date)
end_date_str = get_date_param_as_string.run(end_date)
create_flow_run(
run_name=f"My Flow - {start_date_str.run()}-{end_date_str.run()}",
flow_name="My Flow Name",
project_name=prefect_project,
parameters={"start_date": start_date, "end_date": end_date, "file_type": "file_type"},
)
Jason Motley
01/25/2022, 4:58 PMSuresh R
01/25/2022, 5:33 PMJons Cyriac
01/25/2022, 5:43 PMFina Silva-Santisteban
01/25/2022, 5:55 PMAdi Gandra
01/25/2022, 6:26 PMprefect agent kubernetes install
what are the default mem and cpu limits? I have my tasks running on kubernetes spun up on EKS, and setup the agent using that command. However, the task is a download FTP file and it just stalls. No progress for hours. I’m wondering if this could be a possible mem and cpu limit, and if I should basically run the install command again - and will that overwrite my old manifest?Rob Landis
01/25/2022, 9:53 PMMathijs Miermans
01/25/2022, 10:33 PMStarting task run...
for tasks that are skipped because an upstream task fails? (Example code in thread.)Leon Kozlowski
01/25/2022, 11:39 PMjoshua mclellan
01/26/2022, 2:27 AMOvo Ojameruaye
01/26/2022, 5:54 AMTomek Florek
01/26/2022, 9:58 AMHenning Holgersen
01/26/2022, 11:52 AM'Connection aborted.', OSError(0, 'Error')
. I have added the RBAC stuff to the deployment, and opened outbound traffic to api.prefect.io on port 443, but I have not been able to get anything to run on the AKS agent. Any ideas?Romain
01/26/2022, 12:17 PMSidney Fox
01/26/2022, 2:48 PMValueError: The task result has no `location` so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly.
I’m not explicitly providing where or how results of the child flow should be stored, and I believe that is the root cause of the error thrown. Is that correct? If so, what is the preferred method for storing results? e.g. is PrefectResult preferred over S3Result or is it a matter of opinion?Sophie Courtemanche-Martel
01/26/2022, 4:09 PMstream = io.BytesIO(data.encode())
AttributeError: 'tuple' object has no attribute 'encode'
David Yang
01/26/2022, 4:12 PMKevin Kho
01/26/2022, 4:36 PMEli Treuherz
01/26/2022, 5:20 PMTask.map()
it iterates through both of them together like zip()
. Is there a way to loop over every combination instead, like itertools.product()
?Heeje Cho
01/26/2022, 6:04 PMMatthew Seligson
01/26/2022, 7:03 PMMatthew Seligson
01/26/2022, 7:03 PMKevin Kho
01/26/2022, 7:05 PMMatthew Seligson
01/26/2022, 7:12 PMKevin Kho
01/26/2022, 7:17 PMname
or slug
. We also have tags
if that helps.Matthew Seligson
01/26/2022, 7:32 PMAnna Geller
01/26/2022, 7:52 PMtags
for such use case - imo it’s probably the cleanest approach and it will work nicely in Orion. But even using the task name or slug can be used in your use case. It’s all a matter of combining data from multiple queries. For example, this is how you can get all the versions of a specific flow:
query {
flow(where: { name: { _ilike: "%dask%" } }) {
id
name
version
tasks {
name
id
slug
tags
}
}
}
you can replace the “dask” text with some part of your flow name. And then to get the task run history for that, you would need another query.
But note that storing logs and the history of your flow and task runs over such a long period of time (you want to analyze a period of potentially two years of run history?) would be difficult or at least expensive. For example, Prefect Cloud standard plan has a 2 weeks of log retention.