• l

    Luke Orland

    2 years ago
    When I register a flow, I'd like to save arbitrary metadata with the flow version using the Prefect Python API. Can anyone suggest a way to do so? I would like to look at the flow version in Prefect Cloud and perhaps see my custom key/values in the details.
    l
    Chris White
    3 replies
    Copy to Clipboard
  • n

    nuks

    2 years ago
    Hey guys, I am looking for a solution to abstract computations from supercomputers, mostly for fmri data preprocessing (and, on a lower note, analysis on the underlying output) using already developed binaries within singularity (docker-like) containers on large static dataset. Each fmri session is to be processed parallely. We need to assess the current state of the computing environment (is the inode / lustre file limit close to be exceed, etc) and adapt/restrict the job count to prevent crash + perhaps remotely archive outputs and cleanup space as it is going. This can technically be done beforehand using some heuristic or dynamically. Would you consider prefect (relying on stg like dask_jobqueue.SLURMCluster) to be a good fit to develop such system, or would you have something else to recommend ?
  • n

    nuks

    2 years ago
    (On top of that, we’ll need some versioning system + manual quality control of the data through a web interface.)
  • Brad

    Brad

    2 years ago
    Hey @josh, one more PR to add
    extra_docker_kwargs
    to
    CreateContainer
    https://github.com/PrefectHQ/prefect/pull/2915
    Brad
    1 replies
    Copy to Clipboard
  • n

    nuks

    2 years ago
    I guess we can’t run prefect with singularity instead of docker (cf. https://docs.prefect.io/core/getting_started/installation.html#running-the-local-server-and-ui). Would this be stg easy to change/patch for an external contributor ?
  • k

    Kostas Chalikias

    2 years ago
    Hello everyone, I have a couple of questions regarding deployment. I am considering using Docker or GCS Storage for my flows and trying to understand what makes the most sense. 1. When I use either, do I still run the agent in a virtual environment that includes the imports used by my tasks? Perhaps one is different than the other in that aspect? 2. What is the usual way to register a flow? Do people just use an ad hoc script whenever they have a new flow or perhaps do some kind of diffing & registering what isn't there as part of a release process? When is re-registering a flow ever required? Perhaps when changing the schedule?
    k
    nicholas
    5 replies
    Copy to Clipboard
  • k

    Kai Weber

    2 years ago
    Hi, can anybody help me with the docker-tasks? I want to design a flow to renew an docker image:1. pull the latest MQTT-image (docker pull eclipse-mosquitto), 2. remove the running MQTT-container (docker rm MQTT-NAME) 3. start the new MQTT-container (docker run -tid -p 1883:1883 -p 9001:9001 --name=msg-broker --restart=always --network %x_Network% eclipse-mosquitto) How do I phrase that in Prefect? I can't even pull the latest image?! Thanks a lot!
    k
    nicholas
    13 replies
    Copy to Clipboard
  • b

    Ben Fogelson

    2 years ago
    I’m running into an error using
    flow.replace
    , which seems to have to do with the fact that
    flow.remove(task)
    doesn’t remove
    task
    from flow.slugs:
    from prefect import Parameter, Flow, Task
    
    p1 = Parameter('p')
    p2 = Parameter('p')
    t1 = Task()
    t2 = Task()
    
    flow = Flow('flow')
    flow.add_task(p1)
    flow.add_task(t1)
    
    print(flow.slugs)
    # {<Parameter: p>: 'p', <Task: Task>: 'Task-1'}
    
    flow.replace(t1, t2)
    print(flow.slugs)
    # {<Parameter: p>: 'p', <Task: Task>: 'Task-1', <Task: Task>: 'Task-1'}
    
    flow.replace(p1, p2)
    ---------------------------------------------------------------------------
    ValueError                                Traceback (most recent call last)
    <ipython-input-7-5ec138031eb6> in <module>
    ----> 1 flow.replace(p1, p2)
    
    /opt/conda/envs/drugdiscovery/lib/python3.6/site-packages/prefect/core/flow.py in replace(self, old, new, validate)
        291         # update tasks
        292         self.tasks.remove(old)
    --> 293         self.add_task(new)
        294 
        295         self._cache.clear()
    
    /opt/conda/envs/drugdiscovery/lib/python3.6/site-packages/prefect/core/flow.py in add_task(self, task)
        482                 raise ValueError(
        483                     'A task with the slug "{}" already exists in this '
    --> 484                     "flow.".format(task.slug)
        485                 )
        486             self.slugs[task] = task.slug or self._generate_task_slug(task)
    
    ValueError: A task with the slug "p" already exists in this flow.
    b
    Chris White
    +1
    4 replies
    Copy to Clipboard
  • m

    Marwan Sarieddine

    2 years ago
    Hi folks, what is the simplest way in prefect to specify that a task only execute based on the value of a boolean parameter, otherwise should get skipped ? nevermind - I just spotted signals
    from prefect.engine import signals
    
    @task
    def signal_task(run_task: bool):
       if not run_task:
           raise signals.SKIP()
    m
    Kyle Moon-Wright
    3 replies
    Copy to Clipboard
  • james.lamb

    james.lamb

    2 years ago
    Hello from Chicago! I've tried searching this Slack and the Prefect documentation but haven't found a good answer for this.
    How should I handle prefect version differences between the environment where a flow is created and registered (
    flow.register()
    ) and the environment where the agent runs?
    So, concretely, let's say that I have an agent running using the image
    prefecthq/prefect:0.12.1-python3.7
    . I have some flows already being run by that agent, and those flows were created with
    prefect
    0.12.1. Imagine that
    prefect
    0.13.0 has just been released on PyPi, and data scientists on my team are going to
    pip install prefect
    , create flows, and register them with
    flow.register()
    What should I do? 1. Add a label to each agent with the prefect major + minor version (e.g. v0.12, v0.13). Make sure flows are registered with such a version label, so 0.12 flows run on the 0.12 agent and 0.13 flows run on the 0.13 agent.
    prefect
    uses semantic versioning, which means that there can be breaking changes between minor releases in the
    0.x
    series. 2. Do nothing. a flow created with 0.13.x should be expected to work with an agent running 0.12.x 3. Upgrade all those 0.12 flows to 0.13. Prefect Cloud is only ever running a single version of
    prefect
    and if your flows + agent are not in sync with that version, bad things will happen. 4. something else Thanks for your time and consideration!
    james.lamb
    Kyle Moon-Wright
    5 replies
    Copy to Clipboard