Nicholas Chammas
08/09/2021, 7:51 PMschedule = Schedule(
clocks=[IntervalClock(interval=timedelta(days=1)),],
or_filters=[
between_dates(start_month=1, start_day=1, end_month=1, end_day=1),
between_dates(start_month=4, start_day=1, end_month=4, end_day=1),
between_dates(start_month=7, start_day=1, end_month=7, end_day=1),
between_dates(start_month=10, start_day=1, end_month=10, end_day=1),
],
)
Is that idiomatic Prefect? Or is there a more natural way of doing this?
Another potential solution is to use IntervalClock
with an interval of 3 months, except you can’t say timedelta(months=3)
and get the interval to fire on the first of the month easily.Cab Maddux
08/09/2021, 8:52 PMprefect.core.task
understandably does not override __eq__
__neq__
methods to be able to suppoort task comparisons as explained in is_equal
method docstring: https://github.com/PrefectHQ/prefect/blob/1959eecf1bbbb8e3b194b288c197a6108deb8693/src/prefect/core/task.py#L924-L938
Just to confirm, this would be an appropriate way to resolve task result equality within a flow, right?
with Flow('my-flow') as flow:
task_a = get_task_a_result()
task_a_is_value = task_a.is_equal('value')
YD
08/09/2021, 11:18 PMdef my_state_handler(obj, old_state, new_state):
if new_state.is_finished() and new_state.is_failed():
# send notification
but this did not work
I want to send a notification only if the last retry failMichael Booth
08/10/2021, 1:23 AM(prefect) mjboothaus@Michaels-Air tutorial % python 01_etl.py
...TRUNCATED - See thread
As a follow-up -- I ran the 2nd part of the tutorial 02_etl_flow.py
and I guess this illustrates the more helpful messages/info provided by the prefect library when tasks fail
(prefect) mjboothaus@Michaels-Air tutorial % python 02_etl_flow.py
...TRUNCATED... See thread
SOLVED - seems API was down - now OKBrad I
08/10/2021, 1:53 AMhaven
08/10/2021, 7:12 AMfrom prefect import context as prefect_context, task
@task
def enrich_context():
prefect_context["custom_key"] = 1
@task
def main_task():
value = prefect_context["custom_key"]
print(value)
with Flow("test-enrich_context") as flow:
_enrich_context_task = enrich_context()
main_task(upstream_tasks=[_enrich_context_task])
Salohy
08/10/2021, 8:13 AMModuleNotFoundError: No module named 'utils'
when running python file.py
It seems like that my local package utils is not found during docker build. Here is the structure of my folder.
src
utils/
file.py
__init__.py
I am using a custom docker image where I already copied every thing in src in the image.
Please help me on this. Any help is appreciated 🙏Many thanks alreadyItalo Barros
08/10/2021, 11:54 AMKien Nguyen
08/10/2021, 12:12 PMXyp Jn
08/10/2021, 12:56 PMrstrip
anywhere in my code and the flow runs smoothly with LocalRun. I wounder is it possible for me to get a full stacktrace on this error...
Thanks for your help!Tim Enders
08/10/2021, 1:22 PMprefect run
?Philip MacMenamin
08/10/2021, 1:57 PMprefect agent local start --api http://<server_IP>:4200
Pedro Machado
08/10/2021, 2:47 PMget_task_run_result
and create_flow_run
tasks described here. I'd like to treat the child flow as a single unit that can be retried or restarted if the flow fails. Currently, these are two separate tasks and I haven't been able to set them up this way. How could I change my flow to support retries/restart when the child flow fails?Joe Hamman
08/10/2021, 3:35 PMprefect agent kubernetes install -k $KEY --namespace=staging --rbac | kubectl apply --namespace=staging -f -
I then submit a flow that uses the KubernetesRun
config and GCS
storage configured as:
run_config = KubernetesRun(cpu_request=2, memory_request="2Gi", image='<http://gcr.io/carbonplan/hub-notebook:c89f7f1|gcr.io/carbonplan/hub-notebook:c89f7f1>', env={'TZ': 'UTC'})
storage = GCS("carbonplan-scratch", project='carbonplan')
This results in an error message like this:
└── 23:15:59 | INFO | Submitted for execution: Job prefect-job-d8a8a648
└── 23:16:05 | INFO | Entered state <Failed>: Failed to load and execute Flow's environment: Forbidden('GET <https://storage.googleapis.com/storage/v1/b/carbonplan-scratch?projection=noAcl&prettyPrint=false>: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')
So, I gather my agent doesn’t have the correct IAM privileges to read the flow from GCS. Next I tried adding a service account to my agent:
prefect agent kubernetes install -k $KEY --namespace=staging --service-account-name pangeo --rbac | kubectl apply --namespace=staging -f -
Here I’m pointing to my kubernetes service account called pangeo
which has been given storage.objectAdmin
permissions. However this results in the same error as above. So now I’m wondering if I’m missing something more fundamental here. If anyone has suggestions on where to look for more details on setting up prefect on GKE, I’d certainly appreciate it.Nivi Mukka
08/10/2021, 6:02 PMprefect_directory
param in the Docker
storage. I am able read files from that defined path but I cannot write files to that directory - these are files that will be used in the flow execution. Getting [Errno 13] Permission denied: my/path/filename
Mehdi Nazari
08/10/2021, 6:24 PMLeon Kozlowski
08/10/2021, 6:28 PMKubernetesRun
at runtime as an image is not required for the job spec?YD
08/10/2021, 7:37 PMNivi Mukka
08/10/2021, 8:08 PMpandas.read_gbq()
or <http://pandas.to|pandas.to>_gbq()
defined within my tasks but the execution of flow seems to be using only one Dask worker instead of distributing the load and I’m getting into memory/timeout issues. I’m getting errors and warnings like this in the GKE dask worker logs:
INFO - Event loop was unresponsive in Worker for 4.40s. This is often caused by long running GIL holding functions or moving large chunks of data. This can cause timeouts and instability.
WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 6.40 GB -- Worker memory limit: 8.59 GB.
WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 6.89 GB -- Worker memory limit: 8.59 GB
WARNING - Worker exceeded 95% memory budget. Restarting.
/opt/conda/lib/python3.7/multiprocessing/semaphore_tracker.py:144: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown.
distributed.utils_perf - INFO - full garbage collection released 842.28 MB from 584 reference cycles (threshold: 10.00 MB)
distributed.nanny - INFO - Worker process 10 exited with status 1
Do I have to use Prefect’s BigQueryTask
class if I want the DaskExecutor
to be utilizing all of its workers and cluster options as set? or Do I have to change something in the Dask Gateway config?
How do I tell Prefect to use all the assigned Dask workers when running tasks?Brett Naul
08/10/2021, 8:18 PMto_dict()
approach that you mentioned in this thread is similar to what we're doing (we return list(df.itertuples())
instead) but it's not ideal for the same reasons that PandasSerializer
is nice in general compared to using cloudpickle. I can't really think of anything off the top of my head though that wouldn't require messing with the .map internals (cc @Chris White @Zanie too in case y'all have any clever ideas). can also make a discussion on github if that's preferableLeon Kozlowski
08/10/2021, 8:52 PMKuberneetsRun
?Kathryn Klarich
08/10/2021, 9:13 PMCloning <https://github.com/PrefectHQ/prefect.git> (to revision master) to /tmp/pip-install-2dvduvf6/prefect_9381dd189c7849aa9cd40f1a3fa2afae
at various points today.Ben Muller
08/10/2021, 10:31 PMNivi Mukka
08/11/2021, 1:30 AMsys.exit(1)
. I have this line inside of a task and if the execution reaches this line, does it exit with 1
and mark the task
as successful or failed? Or does it think the flow
failed and try to restart the flow
?Sumit Kumar Rai
08/11/2021, 4:28 AMfrom prefect import task
from prefect.tasks.shell import ShellTask
from prefect.engine import signals
shell_task_invoke = ShellTask()
@task
def skip_if_non_prod(env):
if env == "PROD":
return shell_task_invoke(command="ls")
else:
raise signals.RETRY()
Can I skip subclasses of class Task i.e. ShellTask, DBTShellTask by passing parameter something like below?
is_not_prod = False if env == "PROD" else True
ShellTask(skip=is_not_prod)
YD
08/11/2021, 5:18 AMimport os
import sys
PATH = 'Module path'
sys,path.append(PATH)
from <my module> import <my func>
when doing a test function, without a Prefect flows, I can run it from the command line.
when running the same function from a flow, I can run it when using flow.run()
, but when registering it in the cloud Prefect and trying to run it, I get
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'<my module>\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
I am running a local agent on my machine, prefect agent local start
I do not use Docker containers
any suggestions ?Abuzar Shaikh
08/11/2021, 8:01 AMwith Flow(...)
block. If you're trying to run this task outside of a Flow context, you need to call map_fn.run(...)
Below is the replication of what I want to do:
from prefect import Flow, task
numbers = [1, 2, 3]
@task
def map_fn(x):
return x + 1
@task
def reduce_fn():
res = map_fn.map(numbers)
print(res)
return res + [1]
with Flow('flow1') as flow1:
print(reduce_fn())
flow1.run()
Any suggestions or workaround will be appreciated.Michael Law
08/11/2021, 8:05 AMAndreas Eisenbarth
08/11/2021, 9:42 AMprefect.config.flows.checkpointing = True
or
os.environ["PREFECT__FLOWS__CHECKPOINTING"] = "True"
Instead, we need to ensure that no prefect
import statement (direct or indirect) occurs before setting this environment variable. It seems impractical that prefect reads such configuration in top-level code (executed on import) and not in functions. Are there any better solutions?Didier Marin
08/11/2021, 10:26 AMfrom prefect import task, Flow, Parameter
from prefect.engine.signals import PAUSE
@task
def dummy_task(x):
confirmed = ???
if x > 100 and not confirmed:
raise PAUSE("Are you sure ?")
return x + 1
with Flow("dummy") as flow:
x = Parameter('x', default=1)
dummy_task(x)
Is there a clean way of getting previous states of a task, maybe from prefect.context
, such that I know confirmation happened ?
Or do I have to add some branching to handle this ?