Jai Deo
08/05/2021, 1:52 PMhaven
08/05/2021, 3:48 PM@task
removes all type signatures that the function was given and a bare FunctionTask
is returned. Any thoughts of improving the @task
decorator such that all type signatures persist? That would help tremendously with type-checking for flows. Happy to contribute too, if such a proposal is somewhat sensible.Constantino Schillebeeckx
08/05/2021, 5:28 PMGithub
storage. I'm configuring it like
GitHub(
repo="some_org/repo_name",
path="path/to/flow.py",
access_token_secret="secret_name",
ref="stg"
)
When I execute the flow I get the following failure:
Failed to load and execute Flow's environment: GithubException(422, {'message': 'No commit found for SHA: stg', 'documentation_url': '<https://docs.github.com/rest/reference/repos#get-a-commit'}>).
That repo definitely has the branch, do I need to reference it in a special way?Mehdi Nazari
08/05/2021, 6:08 PMLauri Makinen
08/05/2021, 6:09 PMextract_flow_task = StartFlowRun(flow_name=extract_flow.name, wait=True, project_name=PROJECT_NAME)
transform_flow_task = StartFlowRun(flow_name=transform_flow.name, wait=True, project_name=PROJECT_NAME)
#
# Parent flow, executed on Kubernetes agent
#
with Flow("parent_flow", storage=storage, run_config=KubernetesRun(labels=["kubernetes"])) as parent_flow:
date_YYYYmmdd = Parameter("date_YYYYmmdd")
extract_task = extract_flow_task() # How to pass date_YYYYmmdd to this flow/task?
transform_flow_task(upstream_tasks=[extract_task]) # How to pass date_YYYYmmdd to this flow/task?
Ben Muller
08/05/2021, 10:26 PMAutomation
for notifications for all flows except flow_name
as an option ? Or would I need to do something custom to do that?Fina Silva-Santisteban
08/05/2021, 11:02 PMtrigger
in the run()
method, but these tasks are written using the task decorator only.
@task
def parent():
@task
def child():
What are the best next steps?Luuk
08/06/2021, 6:23 AMfrom prefect import task, Flow
from prefect.run_configs import DockerRun
@task
def say_hello(x=1):
return f"Test {x}'
with Flow("hello-world") as flow:
first = say_hello()
second = say_hello(2)
flow.config_run = DockerRun()
flow.register(project_name='test', labels=['prod'])
So question: How do I send the flow created inside my Azure VM to the VM's DockerAgent using the Prefect Cloud orchestration?flavienbwk
08/06/2021, 8:32 AMresources
parameters to specify resource affinities (RAM, CPU cores, GPU environment) a Flow requires to be run ?
Ex (Kubernetes) :
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
Thanks !Adam Everington
08/06/2021, 8:35 AMArun Giridharan
08/06/2021, 5:24 PMConstantino Schillebeeckx
08/06/2021, 5:56 PMDbtShellTask
along with secrets (more details in thread)Daniel Bast
08/06/2021, 5:58 PMjoshua mclellan
08/06/2021, 6:25 PMRiley Hun
08/06/2021, 9:04 PMKilledWorker
error. It keeps restarting the first task. Do you know what the issue could be?Nelson Griffiths
08/07/2021, 8:13 PMHugo Shi
08/08/2021, 12:11 AMNivi Mukka
08/08/2021, 1:35 AMgoogle-cloud-secret-manager
, google-cloud-logging
, etc.). All the Google python packages are running into a grpc transport error. It may be a package version issue or maybe some conflict.Kien Nguyen
08/08/2021, 3:27 AMScott Vermillion
08/08/2021, 6:16 PMNadav
08/09/2021, 12:38 AMJelle Vegter
08/09/2021, 9:22 AMBen Collier
08/09/2021, 10:22 AMLauri Makinen
08/09/2021, 12:27 PMSamuel Hinton
08/09/2021, 12:39 PMlocal_agent
slowly leaking memory? Over a day or two the memory usage of our local agent service will grow until its out and then we run into exceptions when new flows try to be scheduled. It might not be a memory error, but our 3GB memory cap on the service seems to be not be sufficient.Samuel Hinton
08/09/2021, 2:30 PMNicholas Chammas
08/09/2021, 4:33 PMparameter_defaults
, where the defaults are derived from the date that the schedule is triggered on. In other words, I’m looking for something like this:
schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
parameter_defaults={"version": date.today().isoformat()},
),
],
...
Except I don’t want date.today().isoformat()
to be calculated just once when the flow is registered and then fixed for all time. I want it to be calculated at the time the clock fires.
How do I do this? Pass in a lambda as the value in the parameter_defaults
dictionary? Construct a dict-like object that calculates the value on key lookup?
I feel like I’m thinking about this incorrectly.Mehdi Nazari
08/09/2021, 4:41 PMKyle McChesney
08/09/2021, 4:52 PMrun_config
and storage
attributes for each flow before registration.Kyle McChesney
08/09/2021, 4:53 PMprefect build ...
CLI command directly within a custom python script