Nicholas Chammas
08/09/2021, 4:33 PMparameter_defaults
, where the defaults are derived from the date that the schedule is triggered on. In other words, I’m looking for something like this:
schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
parameter_defaults={"version": date.today().isoformat()},
),
],
...
Except I don’t want date.today().isoformat()
to be calculated just once when the flow is registered and then fixed for all time. I want it to be calculated at the time the clock fires.
How do I do this? Pass in a lambda as the value in the parameter_defaults
dictionary? Construct a dict-like object that calculates the value on key lookup?
I feel like I’m thinking about this incorrectly.Mehdi Nazari
08/09/2021, 4:41 PMKyle McChesney
08/09/2021, 4:52 PMrun_config
and storage
attributes for each flow before registration.Kyle McChesney
08/09/2021, 4:53 PMprefect build ...
CLI command directly within a custom python scriptNicholas Chammas
08/09/2021, 7:51 PMschedule = Schedule(
clocks=[IntervalClock(interval=timedelta(days=1)),],
or_filters=[
between_dates(start_month=1, start_day=1, end_month=1, end_day=1),
between_dates(start_month=4, start_day=1, end_month=4, end_day=1),
between_dates(start_month=7, start_day=1, end_month=7, end_day=1),
between_dates(start_month=10, start_day=1, end_month=10, end_day=1),
],
)
Is that idiomatic Prefect? Or is there a more natural way of doing this?
Another potential solution is to use IntervalClock
with an interval of 3 months, except you can’t say timedelta(months=3)
and get the interval to fire on the first of the month easily.Cab Maddux
08/09/2021, 8:52 PMprefect.core.task
understandably does not override __eq__
__neq__
methods to be able to suppoort task comparisons as explained in is_equal
method docstring: https://github.com/PrefectHQ/prefect/blob/1959eecf1bbbb8e3b194b288c197a6108deb8693/src/prefect/core/task.py#L924-L938
Just to confirm, this would be an appropriate way to resolve task result equality within a flow, right?
with Flow('my-flow') as flow:
task_a = get_task_a_result()
task_a_is_value = task_a.is_equal('value')
YD
08/09/2021, 11:18 PMdef my_state_handler(obj, old_state, new_state):
if new_state.is_finished() and new_state.is_failed():
# send notification
but this did not work
I want to send a notification only if the last retry failMichael Booth
08/10/2021, 1:23 AM(prefect) mjboothaus@Michaels-Air tutorial % python 01_etl.py
...TRUNCATED - See thread
As a follow-up -- I ran the 2nd part of the tutorial 02_etl_flow.py
and I guess this illustrates the more helpful messages/info provided by the prefect library when tasks fail
(prefect) mjboothaus@Michaels-Air tutorial % python 02_etl_flow.py
...TRUNCATED... See thread
SOLVED - seems API was down - now OKBrad I
08/10/2021, 1:53 AMhaven
08/10/2021, 7:12 AMfrom prefect import context as prefect_context, task
@task
def enrich_context():
prefect_context["custom_key"] = 1
@task
def main_task():
value = prefect_context["custom_key"]
print(value)
with Flow("test-enrich_context") as flow:
_enrich_context_task = enrich_context()
main_task(upstream_tasks=[_enrich_context_task])
Salohy
08/10/2021, 8:13 AMModuleNotFoundError: No module named 'utils'
when running python file.py
It seems like that my local package utils is not found during docker build. Here is the structure of my folder.
src
utils/
file.py
__init__.py
I am using a custom docker image where I already copied every thing in src in the image.
Please help me on this. Any help is appreciated 🙏Many thanks alreadyItalo Barros
08/10/2021, 11:54 AMKien Nguyen
08/10/2021, 12:12 PMXyp Jn
08/10/2021, 12:56 PMrstrip
anywhere in my code and the flow runs smoothly with LocalRun. I wounder is it possible for me to get a full stacktrace on this error...
Thanks for your help!Tim Enders
08/10/2021, 1:22 PMprefect run
?Philip MacMenamin
08/10/2021, 1:57 PMprefect agent local start --api http://<server_IP>:4200
Pedro Machado
08/10/2021, 2:47 PMget_task_run_result
and create_flow_run
tasks described here. I'd like to treat the child flow as a single unit that can be retried or restarted if the flow fails. Currently, these are two separate tasks and I haven't been able to set them up this way. How could I change my flow to support retries/restart when the child flow fails?Joe Hamman
08/10/2021, 3:35 PMprefect agent kubernetes install -k $KEY --namespace=staging --rbac | kubectl apply --namespace=staging -f -
I then submit a flow that uses the KubernetesRun
config and GCS
storage configured as:
run_config = KubernetesRun(cpu_request=2, memory_request="2Gi", image='<http://gcr.io/carbonplan/hub-notebook:c89f7f1|gcr.io/carbonplan/hub-notebook:c89f7f1>', env={'TZ': 'UTC'})
storage = GCS("carbonplan-scratch", project='carbonplan')
This results in an error message like this:
└── 23:15:59 | INFO | Submitted for execution: Job prefect-job-d8a8a648
└── 23:16:05 | INFO | Entered state <Failed>: Failed to load and execute Flow's environment: Forbidden('GET <https://storage.googleapis.com/storage/v1/b/carbonplan-scratch?projection=noAcl&prettyPrint=false>: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')
So, I gather my agent doesn’t have the correct IAM privileges to read the flow from GCS. Next I tried adding a service account to my agent:
prefect agent kubernetes install -k $KEY --namespace=staging --service-account-name pangeo --rbac | kubectl apply --namespace=staging -f -
Here I’m pointing to my kubernetes service account called pangeo
which has been given storage.objectAdmin
permissions. However this results in the same error as above. So now I’m wondering if I’m missing something more fundamental here. If anyone has suggestions on where to look for more details on setting up prefect on GKE, I’d certainly appreciate it.Nivi Mukka
08/10/2021, 6:02 PMprefect_directory
param in the Docker
storage. I am able read files from that defined path but I cannot write files to that directory - these are files that will be used in the flow execution. Getting [Errno 13] Permission denied: my/path/filename
Mehdi Nazari
08/10/2021, 6:24 PMLeon Kozlowski
08/10/2021, 6:28 PMKubernetesRun
at runtime as an image is not required for the job spec?YD
08/10/2021, 7:37 PMNivi Mukka
08/10/2021, 8:08 PMpandas.read_gbq()
or <http://pandas.to|pandas.to>_gbq()
defined within my tasks but the execution of flow seems to be using only one Dask worker instead of distributing the load and I’m getting into memory/timeout issues. I’m getting errors and warnings like this in the GKE dask worker logs:
INFO - Event loop was unresponsive in Worker for 4.40s. This is often caused by long running GIL holding functions or moving large chunks of data. This can cause timeouts and instability.
WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 6.40 GB -- Worker memory limit: 8.59 GB.
WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 6.89 GB -- Worker memory limit: 8.59 GB
WARNING - Worker exceeded 95% memory budget. Restarting.
/opt/conda/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown.
distributed.utils_perf - INFO - full garbage collection released 842.28 MB from 584 reference cycles (threshold: 10.00 MB)
distributed.nanny - INFO - Worker process 10 exited with status 1
Do I have to use Prefect’s BigQueryTask
class if I want the DaskExecutor
to be utilizing all of its workers and cluster options as set? or Do I have to change something in the Dask Gateway config?
How do I tell Prefect to use all the assigned Dask workers when running tasks?Brett Naul
08/10/2021, 8:18 PMto_dict()
approach that you mentioned in this thread is similar to what we're doing (we return list(df.itertuples())
instead) but it's not ideal for the same reasons that PandasSerializer
is nice in general compared to using cloudpickle. I can't really think of anything off the top of my head though that wouldn't require messing with the .map internals (cc @Chris White @Michael Adkins too in case y'all have any clever ideas). can also make a discussion on github if that's preferableLeon Kozlowski
08/10/2021, 8:52 PMKuberneetsRun
?Kathryn Klarich
08/10/2021, 9:13 PMCloning <https://github.com/PrefectHQ/prefect.git> (to revision master) to /tmp/pip-install-2dvduvf6/prefect_9381dd189c7849aa9cd40f1a3fa2afae
at various points today.Ben Muller
08/10/2021, 10:31 PMNivi Mukka
08/11/2021, 1:30 AMsys.exit(1)
. I have this line inside of a task and if the execution reaches this line, does it exit with 1
and mark the task
as successful or failed? Or does it think the flow
failed and try to restart the flow
?Sumit Kumar Rai
08/11/2021, 4:28 AMfrom prefect import task
from prefect.tasks.shell import ShellTask
from prefect.engine import signals
shell_task_invoke = ShellTask()
@task
def skip_if_non_prod(env):
if env == "PROD":
return shell_task_invoke(command="ls")
else:
raise signals.RETRY()
Can I skip subclasses of class Task i.e. ShellTask, DBTShellTask by passing parameter something like below?
is_not_prod = False if env == "PROD" else True
ShellTask(skip=is_not_prod)
YD
08/11/2021, 5:18 AMimport os
import sys
PATH = 'Module path'
sys,path.append(PATH)
from <my module> import <my func>
when doing a test function, without a Prefect flows, I can run it from the command line.
when running the same function from a flow, I can run it when using flow.run()
, but when registering it in the cloud Prefect and trying to run it, I get
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'<my module>\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
I am running a local agent on my machine, prefect agent local start
I do not use Docker containers
any suggestions ?YD
08/11/2021, 5:18 AMimport os
import sys
PATH = 'Module path'
sys,path.append(PATH)
from <my module> import <my func>
when doing a test function, without a Prefect flows, I can run it from the command line.
when running the same function from a flow, I can run it when using flow.run()
, but when registering it in the cloud Prefect and trying to run it, I get
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'<my module>\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
I am running a local agent on my machine, prefect agent local start
I do not use Docker containers
any suggestions ?Kevin Kho
08/11/2021, 2:31 PMcloudpickle
works. You need to package your dependencies into a Docker container and use Docker Storage so that the agent can pull the container with the dependencies.
If you are using Local agent and LocalRun, there are two ways to do this. First, is you can start the agent in the directory where the imports will resolve correctly is so it has access to those files when it runs the flow. Second is that LocalRun
takes in a working_dir
where you can specify where the agent will run the flow from.