https://prefect.io logo
m

Marwan Sarieddine

10/13/2020, 9:51 PM
Hi folks, I am trying to schedule a flow to run every 4th business day - so I created my own filter function following a similar format as how prefect schedule filter functions are implemented but I am getting a marshmallow.exceptions.ValidationError
Here is my implementation:
Copy code
def is_fourth_business_day(provided_date: datetime) -> Callable[[datetime], bool]:
    """
    Filter that allows events occuring on the fourth business day between 2020 and 2050.

    Args:
        - dt (datetime): the datetime to match

    Returns:
        - Callable[[datetime], bool]: a filter function
    """

    def _filter_fn(dt: datetime) -> bool:
        dates = pd.Series(pd.bdate_range("2020-01-01", "2050-01-01"))
        valid_dates = (
            dates.groupby([dates.dt.year, dates.dt.month]).nth(4).dt.date.to_numpy()
        )

        return dt.date() in valid_dates

    return _filter_fn
Here is the error I am getting
Copy code
(etl-embs)  ~/infima/etl/mirror_embs   adjust_mirror_flow_frequency ●  python register.py
Traceback (most recent call last):
  File "register.py", line 299, in <module>
    register_flow(
  File "register.py", line 292, in register_flow
    flow.register(**params)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/prefect/core/flow.py", line 1608, in register
    registered_flow = client.register(
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/prefect/client/client.py", line 734, in register
    serialized_flow = flow.serialize(build=build)  # type: Any
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/prefect/core/flow.py", line 1445, in serialize
    serialized = schema(exclude=["storage"]).dump(flow_copy)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/schema.py", line 557, in dump
    result = self._serialize(processed_obj, many=many)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/schema.py", line 521, in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/fields.py", line 316, in serialize
    return self._serialize(value, attr, obj, **kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/fields.py", line 571, in _serialize
    return schema.dump(nested_obj, many=many)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow_oneofschema/one_of_schema.py", line 72, in dump
    result = result_data = self._dump(obj, **kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow_oneofschema/one_of_schema.py", line 107, in _dump
    result = schema.dump(obj, many=False, **kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/schema.py", line 557, in dump
    result = self._serialize(processed_obj, many=many)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/schema.py", line 521, in _serialize
    value = field_obj.serialize(attr_name, obj, accessor=self.get_attribute)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/fields.py", line 316, in serialize
    return self._serialize(value, attr, obj, **kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/fields.py", line 704, in _serialize
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/marshmallow/fields.py", line 704, in <listcomp>
    return [self.inner._serialize(each, attr, obj, **kwargs) for each in value]
  File "/Users/marwansarieddine/.pyenv/versions/3.8.5/envs/etl-embs/lib/python3.8/site-packages/prefect/utilities/serialization.py", line 397, in _serialize
    raise ValidationError("Invalid function reference: {}".format(value))
marshmallow.exceptions.ValidationError: Invalid function reference: <function is_fourth_business_day at 0x120a0e790>
j

Jim Crist-Harif

10/13/2020, 9:52 PM
Sorry, cloud doesn't accept custom scheduler filters (since the filters run on our infrastructure).
We should be able to provide a better error message than that, I'll open an issue.
m

Marwan Sarieddine

10/13/2020, 9:53 PM
I see - thank you for your quick response
so there is now way I can schedule a flow for every 4th business day using cloud ?
j

Jim Crist-Harif

10/13/2020, 9:54 PM
Every 4th business day is an interesting schedule, can you open an issue describing your use case so we can figure out what other builtins we'd need to add to make this possible?
Right now I see a way to: • run a flow every 4th day • run a flow every 4th day, excluding non business days but not every 4th business day
m

Marwan Sarieddine

10/13/2020, 9:56 PM
I see - ok will try to open up an issue soon describing our usecase
👍 1
3 Views