• Emma Willemsma

    Emma Willemsma

    1 year ago
    Hey friends, I'm looking for some guidance on versioning flows. I like the way flows are up-versioned when they get registered to Prefect Cloud and older versions are automatically archived. However, I want to be able to trace a flow version to its code in github and I'm not sure the best way to do that. We tag our commits with version strings, so I just need to be able to add that information to the flow somehow. Options I've thought of: • Can I add some kind of tag when registering my flow? I know there are labels, but as far as I understand the labels are just for linking flows with agents. I haven't seen anything about being able to add other types of tags. • Can I override the auto-incremented version number so it matches our version string? Again I haven't seen how to do this in the docs. • Can/should I add our version string to the flow name? It sounds like I can set a common version group id for each new version of my flow so that even though they're named differently, Prefect Cloud will still consider them to be versions of each other. However, the docs also say that version group ids are deprecated, so I'm not sure that's the right thing to use. How are other people handling this?
    Emma Willemsma
    Jim Crist-Harif
    2 replies
    Copy to Clipboard
  • h

    Hui Zheng

    1 year ago
    Hello, I am running into a strange failure at the
    Cloudpickle serialization check
    step of the health-check during flow.register(). Could you help? The error message is below
    Beginning health checks...
    System Version check: OK
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 147, in <module>
        flows = cloudpickle_deserialization_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 40, in cloudpickle_deserialization_check
        flows.append(cloudpickle.load(f))
      File "/usr/local/lib/python3.8/site-packages/environs/__init__.py", line 334, in __getattr__
        return functools.partial(self.__custom_parsers__[name], self)
      File "/usr/local/lib/python3.8/site-packages/environs/__init__.py", line 334, in __getattr__
        return functools.partial(self.__custom_parsers__[name], self)
      File "/usr/local/lib/python3.8/site-packages/environs/__init__.py", line 334, in __getattr__
        return functools.partial(self.__custom_parsers__[name], self)
      [Previous line repeated 994 more times]
    RecursionError: maximum recursion depth exceeded
    
    Removing intermediate container 98613182a8f2
    The command '/bin/sh -c python /opt/prefect/healthcheck.py '["/opt/prefect/flows/20-06-0-standard-ag-stage.prefect"]' '(3, 8)'' returned a non-zero code: 1
    Traceback (most recent call last):
      File "build_and_register.py", line 174, in <module>
        flow.register(
      File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 1588, in register
        registered_flow = client.register(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 734, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/usr/local/lib/python3.8/site-packages/prefect/core/flow.py", line 1439, in serialize
        storage = self.storage.build()  # type: Optional[Storage]
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 346, in build
        self._build_image(push=push)
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/docker.py", line 412, in _build_image
        raise ValueError(
    ValueError: Your docker image failed to build!  Your flow might have failed one of its deployment health checks - please ensure that all necessary files and dependencies have been included.
    Please see more context and the details in the thread.
    h
    Jim Crist-Harif
    17 replies
    Copy to Clipboard
  • m

    Marwan Sarieddine

    1 year ago
    Hi folks, I am trying to schedule a flow to run every 4th business day - so I created my own filter function following a similar format as how prefect schedule filter functions are implemented but I am getting a marshmallow.exceptions.ValidationError
    m
    Jim Crist-Harif
    10 replies
    Copy to Clipboard
  • Pedro Machado

    Pedro Machado

    1 year ago
    Hi there. Is it possible to use Prefect cloud to send a notification from a state handler? I need to send an email conditionally depending on the result of a task.
    Pedro Machado
    Chris White
    9 replies
    Copy to Clipboard
  • Krzysztof Nawara

    Krzysztof Nawara

    1 year ago
    Hi, I need all of the tasks in the flow to have some shared configuration, so I decided to follow into Prefect example and create a custom decorator. But I have problem with correctly forwarding the arguments to the task. EDIT: details (code and stackstrackes) moved to the thread Any suggestions how to make it work?
    Krzysztof Nawara
    Jenny
    +1
    13 replies
    Copy to Clipboard
  • a

    Alberto de Santos

    1 year ago
    Hi guys, Can anyone give me a piece of advice on how to run a flow from python without manually inserting the flow_id?
    a
    Jenny
    +1
    21 replies
    Copy to Clipboard
  • a

    as

    1 year ago
    HI, I was wondering what is supposed to happen when a the target parameter is set but not the checkpoint parameter. The target is here set to the path of a file that is produced by the task (onnx file in this case) the result to the task is a string with the file path to that object. Since checkpointing is not defined, the result of the task is not saved but the task is not run because the target exists. So is there a conflict here? If the target exists, how is the result passed on to the next task in the flow? This is a bit confusing to me. Can somebody explain to me what happens in this situation? Thanks EDIT: I just figured out that the object generated by the task (of which the path is specified at the target parameter) is overwritten with the result of the task. Ideally I would expect to be able to somehow define an artifact generated by the task as a target. If the object exits do not redo the task and pass on the path to the object to the next step in the flow. Is this something that would make sense? Or does this allready exist and I am doing it wrong?
    a
    j
    6 replies
    Copy to Clipboard
  • Raphaël Riel

    Raphaël Riel

    1 year ago
    Hi all! 👋 I’m facing some trouble executing parallel/concurrent Tasks on my agent. I have a Task that I start in my flow using
    the_task.map(list_of_ints)
    . The Task will have to Map >10 items. When A) running the flow directly from within the .py file using
    flow.run()
    AND B) setting
    executor=LocalDaskExecutor()
    I’m able to have it use more than 1 thread. But as soon as I try to run this Flow in an Agent OR if I remove the Dask Executor (While executing the .py file directly), I can’t make it run in parallel! Recap:1. Execute the flow from .py file WITH
    executor=LocalDaskExecutor()
    = Works 2. Execute the flow from .py file with “default” Executor = Nope 3. Any combinaison of executor running in an agent = Nope Any suggestion will be welcome! Thanks.
    Raphaël Riel
    j
    13 replies
    Copy to Clipboard
  • Jeff Brainerd

    Jeff Brainerd

    1 year ago
    Hi 😛refect: team I am seeing consistent timeout issues trying to run graphql queries, either programmatically or in the UI. Other parts of the UI seem fine. Is this just me?
    {
      "graphQLErrors": [
        {
          "path": [
            "flow_run"
          ],
          "message": "Operation timed out",
          "extensions": {
            "code": "API_ERROR"
          }
        }
      ],
      "networkError": null,
      "message": "GraphQL error: Operation timed out"
    }
    Jeff Brainerd
    j
    +1
    24 replies
    Copy to Clipboard