Hey all, how would you write a (complex) schedule ...
# prefect-community
j
Hey all, how would you write a (complex) schedule to trigger a flow run on both the first of the week and the first of the month, but only once if
first_of_the_week == first_of_the_month
? Is this possible, just using schedule and filters or do I need to check this condition within the flow run and stop one of the schedule runs? It's crucial the flow run is not executed twice, just because both conditions are met
j
Hi Julian, this can be done by making use of a
filter
on a schedule. If you combine: ā€¢ A
clock
to trigger an event every day ā€¢ A
filter
to filter out only days that are the first of the week or the first of the month you get your desired behavior. Something like:
Copy code
import prefect

from prefect.schedules import clocks, Schedule

clock = clocks.CronClock("0 0 * * *")

def is_first_of_month_or_week(dt):
    return dt.day == 1 or dt.weekday() == 1

schedule = Schedule(clocks=[clock], filters=[is_first_of_month_or_week])
j
A cool, that's a very nice solution. Thank you!
I tried to implement this schedule but prefect throws an error if I try to register the flow to ui.
Copy code
Traceback (most recent call last):
  File "stats/scale_cluster.py", line 121, in <module>
    flow.register(project_name='default')
  File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 1588, in register
    registered_flow = client.register(
  File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 734, in register
    serialized_flow = flow.serialize(build=build)  # type: Any
  File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 1425, in serialize
    serialized = schema(exclude=["storage"]).dump(flow_copy)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 557, in dump
    result = self._serialize(processed_obj, many=many)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 521, in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/fields.py", line 312, in serialize
    return self._serialize(value, attr, obj, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/fields.py", line 567, in _serialize
    return schema.dump(nested_obj, many=many)
  File "/usr/local/lib/python3.8/site-packages/marshmallow_oneofschema/one_of_schema.py", line 72, in dump
    result = result_data = self._dump(obj, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/marshmallow_oneofschema/one_of_schema.py", line 107, in _dump
    result = schema.dump(obj, many=False, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 557, in dump
    result = self._serialize(processed_obj, many=many)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/schema.py", line 521, in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/fields.py", line 312, in serialize
    return self._serialize(value, attr, obj, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/marshmallow/fields.py", line 700, in _serialize
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
  File "/usr/local/lib/python3.8/site-packages/marshmallow/fields.py", line 700, in <listcomp>
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/serialization.py", line 397, in _serialize
    raise ValidationError("Invalid function reference: {}".format(value))
marshmallow.exceptions.ValidationError: Invalid function reference: <function is_first_of_month_or_week at 0x7f0e290bd160>
Note that
is_serializable
from
prefect.utilities.debug
states that both the flow and the schedule are serializable. Seems like in
class NewScheduleSchema(ObjectSchema)
in
prefect.serialization.schedule.py
there is a static list named
FILTERS
of all allowed filters since
Copy code
filters = fields.List(
        StatefulFunctionReference(
            valid_functions=FILTERS, reject_invalid=True, allow_none=True
        )
    )
rejects other filter-functions. Note that this also applies to or_filters,not_filters and adjustments
n
Hi @Julian - just tested this and I think you're right; this seems like a bug on our end but I'll need to check with the Core team tomorrow to be certain. I managed to get the flow registered by either adding the filter above to the allowed list in
serialization.schedule.py
or changing
reject_invalid
to
False
. I don't think either are good solutions so let us look into this.
j
Thank you very much. It would be very nice to define own filters, since the given ones seem to be arbitrarily chosen anyway.
n
100% agreed, I was under the impression we were already allowing this so I'm sure there are some small updates we can make to enable it.
j
šŸ‘
n
Hi @Julian - just checked into this and realized why custom filters aren't allowed: those are run by our scheduler and therefore aren't something we allow users to control by passing in arbitrary code (since this would be a large security risk for all our users). That said, we welcome contributions to the filters and I've opened a PR with filters that would provide the building blocks to solve your issue with an
_or_filters
here: https://github.com/PrefectHQ/prefect/pull/3330
j
Thank you for opening an issue to support these new filters. I still think this is kind of a design issue that could be resolved in a more elegant way. I try to come up with some idea to marry security with the flexibility to design your own filters (without side-effects) Finally, if someone else wants to implement this additional filters I would recommend these filters to be more flexible, e.g. is_day_of_week(dt,day) with some day in range(0,7)
n
That PR will implement both of the additional filters and
is_day_of_week
will take a day in the range 0-6!
I expect it to be merged shortly and will most likely go out in the next release.
j
I like it šŸ™‚