https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • v

    Vince

    12/20/2021, 9:35 PM
    Hi, is there a way to iteratively run a flow for batch processing over a large dataset? Here’s the flow I’m trying to achieve: 1. Start flow with 3 parameters: start timestamp (ts_start), batch size (n), end timestamp (ts_end) 2. Task extracts batch of n documents starting from ts_start 3. Cache nth document’s timestamp (ts_last) 4. Go back to (1) passing in ts_last as ts_start if ts_last < ts_end In other words, I want to iteratively process batches until we get a document with timestamp ts_end
    a
    • 2
    • 2
  • b

    Ben Muller

    12/20/2021, 11:00 PM
    Is there currently an issue with the prefect dockerhub image? I am getting an issue pulling it
    Head <https://registry-1.docker.io/v2/prefecthq/prefect/manifests/0.15.5-python3.8>: received unexpected HTTP status: 503 Service Unavailable
    a
    • 2
    • 4
  • y

    Yusuf Khan

    12/21/2021, 2:21 AM
    I have a use case where I have some equipment in a lab and connected to a desktop. I wanted to use Prefect to schedule a shell task to run on the machine that backs up its data to blob storage and then other downstream tasks are fired off. I walked through and did examples from a decent chunk of the documentation yesterday. I'm a little bit unclear on best practices for agents. Are they supposed to be on all the time? So in the given example the desktop will sometimes be on, sometimes be off. Do I just need to set the machine to start the agent on bootup or can that behavior be controlled from within prefect? Additionally, for best practice, if I have several of these machines like this would it be better to put an agent on each machine, or to setup one 'staging' cloud vm, and then run the agent there and execute tasks that live on the remote machines, but via ssh commands from the cloud vm?
    a
    • 2
    • 1
  • y

    Yehor Sikachov

    12/21/2021, 11:41 AM
    Hi folks. As part of working on https://github.com/PrefectHQ/prefect/issues/5171 I created draft PR https://github.com/PrefectHQ/prefect/pull/5258 Please take a look at the concept and if you agree I will add tests and proper formatting
    a
    • 2
    • 2
  • l

    Liri Rozenthal

    12/21/2021, 2:56 PM
    Hello there! 🙂 I'd appreciate any help, as I'm truly stuck. I've locally developed a flow on Jupyter Notebook (Python). Agent - Vertex Storage - GCP run_config - VertexRun Executer - LocalExecutor In VertexRun I've defined an image, which I've locally created on my CMD (Based on a simple Docker file, based on prefect:latest + run pip installs that are relevant for the specific flow). I've pushed this image to my repository in docker hub. Following what I've read here, I understood that on my script I should login to my docker account before activating the agent, and so I've done. One of the packages I've set to install on my image is "Pandas". Yet, when trying to run the flow through the UI I get this error: "Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'pandas\'")\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - cloudpickle: (flow built with \'1.3.0\', currently running with \'2.0.0\')\n - prefect: (flow built with \'0.15.9\', currently running with \'0.15.10\')\n - python: (flow built with \'3.7.6\', currently running with \'3.7.12\')\nThis also may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')" What do I miss? Thanks in advance! 🍪
    a
    • 2
    • 9
  • j

    John-Craig Borman

    12/21/2021, 4:03 PM
    Hi all, for testing purposes is there any way to configure `.map`ped tasks to run sequentially instead of in parallel?
    a
    • 2
    • 6
  • j

    Jason Motley

    12/21/2021, 4:27 PM
    If a scheduled flow did not run and the errors I see say a lazarus process was attempted, what does that mean?
    a
    • 2
    • 2
  • a

    Alejandro Sanchez Losa

    12/21/2021, 5:27 PM
    Hi every body !!! Is there any way to separete @task in diferent files using github storage? when I try that (new folder, import this file with def with @task decoraction and import of the prefect task …) the output tall me: ModuleNotFoundError: No module named ‘tasks’
    a
    • 2
    • 13
  • m

    Max Watermolen

    12/21/2021, 5:35 PM
    Howdy, Running into so Django weirdness, anyone seen this?
    Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n  AppRegistryNotReady("Apps aren\'t loaded yet.")')
    a
    • 2
    • 9
  • t

    Trevor Campbell

    12/21/2021, 5:41 PM
    Hi folks! Quick Q: what would be the idiomatic way in Prefect Orion to create "make-like" tasks? say, e.g., I have a task that uses a file
    a.log
    as an input, and produces another file
    b.log
    as an output, and I want the task to run only when the timestamp of
    a.log
    is newer than
    b.log
    (in addition to the usual waiting for predecessor Prefect tasks). I could of course just do this manually inside the task, but was wondering if there was a better way to go about it
    a
    • 2
    • 3
  • v

    Vipul

    12/21/2021, 6:45 PM
    Hi, quick question on Orion. We are on Prefect Server and doing initial PoC on the Orion and would like to know if there is a way to limit the number of flow or task to one based on the parameter or context provided during the run. Our flow are very compute intensive and run overnight and we want to avoid multiple flow or task to run at the same time if they have same parameter or context as input.
    m
    • 2
    • 14
  • j

    Jason Motley

    12/21/2021, 7:12 PM
    Is this the correct way to specify the use of multiple additional packages?
    flow.run_config = ECSRun(
    env={"EXTRA_PIP_PACKAGES": "requests" "numpy"})
    a
    • 2
    • 2
  • j

    Jason Motley

    12/21/2021, 10:12 PM
    Quick question that I may have asked before, if I need to extract a series of rows/columns from a data warehouse and then "push" them into an SFTP server, is there a good way in Prefect to do that?
    a
    • 2
    • 2
  • d

    Daniel Komisar

    12/21/2021, 10:13 PM
    Hello everyone, I’m trying to see if it’s possible to query for flows runs with a specific parameter value. I’ve been able to query for runs where the parameter has a certain key using
    _has_key
    . I’ve tried using
    _contains
    with no luck, although I’m not sure if that’s the right one either, or if this is even possible. Thanks!
    a
    • 2
    • 4
  • d

    Danny Vilela

    12/21/2021, 11:10 PM
    Hi all! A co-worker is trying to schedule a Flow to run on the 2nd of every month, at 8:00 AM PT. I pointed him to the
    IntervalSchedule
    with
    pendulum
    (since that’s what I’ve used for daily/weekly tasks) but he noticed that the results don’t quite line up with what he was expecting:
    import pendulum
    from prefect.schedules.schedules import IntervalSchedule
    from prefect.schedules.clocks import CronClock
    
    # Set our start date.
    next_start_date: pendulum.DateTime = (
        pendulum.now(tz="America/Los_Angeles")
        .start_of(unit="month")
        .set(day=2, hour=8, minute=0, second=0)
    )
    
    # Set our monthly interval.
    monthly: pendulum.Duration = pendulum.duration(months=1)
    
    # Inspect the next few clock emissions.
    schedule: IntervalSchedule = IntervalSchedule(start_date=next_start_date, interval=monthly)
    print(schedule.next(n=3))
    # [
    #   DateTime(2022, 1, 1, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles')), 
    #   DateTime(2022, 1, 31, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles')), 
    #   DateTime(2022, 3, 2, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles'))
    # ]
    Why does the
    IntervalSchedule
    not fire on
    2022-01-02
    ,
    2022-02-02
    ,
    2022-03-02
    , etc? It appears to just be incrementing by 30 days, but that’s not quite what I’d expect. Is this a
    pendulum
    thing? (Edit: it’s maybe worth noting that in the example above, just doing
    next_start_date + monthly
    does give you the correct
    DateTime(2022, 1, 2, 8, 0, 0, tzinfo=Timezone('America/Los_Angeles'))
    . So I think it may actually be a Prefect thing?)
    a
    z
    • 3
    • 4
  • b

    Brian S

    12/22/2021, 1:41 AM
    Hi All, hope all is well. I'm here for another (likely noob) question. I've been working with a Prefect parameter that contained a json object. This parameter used to pass a dict but all of a sudden it's coming in as a tuple. Is there a reason this would happen? JSON seems to be valid.
    • 1
    • 1
  • r

    Ryan Sattler

    12/22/2021, 5:32 AM
    Is there a way to increase the polling frequency of Agents? The (apparent) default of 10s feels a little slow to pick up jobs at times.
    a
    • 2
    • 1
  • r

    rilshok

    12/22/2021, 8:27 AM
    :thor: Hello everyone, I'm taking my first steps with 😛refect:. Everything will work out!
    👋 2
    a
    a
    c
    • 4
    • 3
  • a

    Alfredo Prada Giorgi

    12/22/2021, 8:56 AM
    👋
    👋 2
  • r

    rilshok

    12/22/2021, 8:56 AM
    To get to know the community better, I'll use a small example to tell you what kind of jungle a newbie can get into. The prefect has a native way of filtering result items, but it only works at the level of one list. 😛iggy: Like a fool, I figured I just needed a super flexible filter to keep in sync as many argument lists as I want. So, let's say we have a goal: to process the list of arguments of one task, depending on how the other task handled them.
    @task
    def get_paths() -> List[Path]:
        ...
    
    @task
    def dosmth(path: Path) -> Any:
        ...
    
    @task
    def finally(path: Path, smth: Any):
        ...
    
    with Flow('fucking-prefect') as flow:
        paths = get_paths()
        smth = dosmth.map(paths) # maybe there are exceptions
        # TODO: need to synchronize paths and smth
        finally.map(paths, smth)
    :dusty_stick:I implemented my filter like
    prefect.tasks.control_flow.FilterTask
    from typing import List, Any, Tuple, Union
    
    from prefect import Task
    from prefect.triggers import all_finished
    
    
    class CrossSkip(Task):
        def __init__(self, *skip, **kwargs) -> None:
            kwargs.setdefault("skip_on_upstream_skip", False)
            kwargs.setdefault("trigger", all_finished)
            self._types = tuple([s for s in skip if isinstance(s, type)])
            self._values = [s for s in skip if not isinstance(s, type)]
            if not skip:
                self._types = (type(None), )
            super().__init__(**kwargs)
    
        def _filter(self, value) -> bool:
            return not isinstance(value, self._types) and not any([value == v for v in self._values])
    
        def run(self, *task_results: List[Any]) -> Union[List[Any], Tuple[List[Any], ...]]:
            """Task run method."""
            assert task_results
            assert len({*map(len, task_results)}) == 1
            if len(task_results) == 1:
                return [r for r in task_results[0] if self._filter(r)]
            return tuple([*map(list, zip(*[
                r for r in zip(*task_results)
                if all([self._filter(v) for v in r])
            ]))])
    The flow should have turned into something like this, and everything would have worked fine
    with Flow('best-prefect-flow') as flow:
        paths = get_paths()
        smth = dosmth.map(paths) # maybe there are exceptions
        # >>>
        paths, smth = CrossSkip(Exception, None)(paths, smth)
        # <<<
        finally.map(paths, smth)
    BUT
    ValueError: Tasks with variable positional arguments (*args) are not supported, because all Prefect arguments are stored as keywords. As a workaround, consider modifying the run() method to accept **kwargs and feeding the values to *args.
    In general, I know how to use python magic to solve this problem, but I refuse to conjure further 🙂 TLDR: Prefect's tasks can't unpack arguments:
    @task
    def todosmth(*arg) -> Any:
        ...
    a
    • 2
    • 1
  • m

    Martim Lobao

    12/22/2021, 10:23 AM
    not to rant, but Prefect’s web app is by far the most frustrating part about using prefect, and is one of the most frustrating tools i’ve ever worked with. trying to restart a flow and the restart pop-up just hangs indefinitely. no error anywhere, including in the console. I’ve tried going through incognito and using a different browser but i get the same thing. a basic REST API that just worked reliably would be such a better alternative to a GUI that doesn’t work half the time.
    a
    • 2
    • 7
  • p

    Paul Gierz

    12/22/2021, 10:59 AM
    I hope this is the right channel to ask for general help. I was curious why this happens:
    lat_size = Parameter("Latitude Size (e.g 1 for a 1x1 degree grid)", default=1.0)
    lon_size = Parameter("Longitude Size (e.g 1 for a 1x1 degree grid)", default=1.0)
    lats = np.arange(-90, 90, lat_size)
    lons = np.arange(-180, 180, lon_size)
    but then:
    $ prefect register --project tutorial -p simulation_workflows/workflows
    Collecting flows...
    osgeo is not installed, conversion to Geo formats like Geotiff (fesom2GeoFormat) will not work.
    Error loading 'simulation_workflows/workflows/fesom_2d_variable.py':
      Traceback (most recent call last):
        File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/site-packages/prefect/cli/build_register.py", line 134, in load_flows_from_script
        namespace = runpy.run_path(abs_path, run_name="<flow>")
        File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/runpy.py", line 268, in run_path
        return _run_module_code(code, init_globals, run_name,
        File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/runpy.py", line 97, in _run_module_code
        _run_code(code, mod_globals, init_globals,
        File "/Users/pgierz/.local/opt/miniconda3/envs/scicomp_esm_sim_prefect_workflows/lib/python3.9/runpy.py", line 87, in _run_code
        exec(code, run_globals)
        File "/Users/pgierz/Documents/SciComp/Projects/Workflows/simulation_workflows/simulation_workflows/workflows/fesom_2d_variable.py", line 27, in <module>
        lons = np.arange(-180, 180, lon_size.value)
      AttributeError: 'Parameter' object has no attribute 'value'
    Is the stupid solution just to make a mini task instead of directly using numpy?
    a
    • 2
    • 6
  • t

    Tom Klein

    12/22/2021, 11:51 AM
    Hey again - yesterday i presented the results of my Prefect PoC to my team and my team lead said they think we should wrap all DS code with docker containers and use those as "blackbox" steps instead of directly invoking python code from the flow -- am i right in my understand that if we do that we lose some of the advantages of Prefect like being able to easily map output of docker runs to input of the next tasks - or caching/persistence of results etc. and we'll need to do all these things manually ourselves?
    a
    • 2
    • 22
  • e

    Eduardo Fernández León

    12/22/2021, 12:09 PM
    Hi all! At my company, we are using the Prefect platform integrated with Google Cloud Kubernetes and now we are testing the Prefect Cloud using Kubernetes as agents/workers. Is there any doc/tutorial to do that? I found this article and they talk about a
    runner token
    that I couldn't be able to find in the Cloud UI. Thanks in advance.
    a
    • 2
    • 2
  • r

    Robert Kowalski

    12/22/2021, 2:46 PM
    Hi, I have a problem with flow, sometime flow is correctly executed without any errors in logs, but the same flow execution on next day/time never ends. I stop this flow after eg. 2 days of execution time ( correct execution time ~ 3h )and if I rerun the same flow ones again every tasks in flow is finished correctly. I use docker agent and gitlab registry. In agent logs i found this error:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 188, in interrupt_if_cancelling
        flow_run_info = self.client.get_flow_run_info(flow_run_id)
      File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1564, in get_flow_run_info
        raise ClientError('Flow run ID not found: "{}"'.format(flow_run_id))
    prefect.exceptions.ClientError: Flow run ID not found: "0695cb92-7995-43b1-abf7-6500eb7e9fc0"
    Flow freeze on one task, this task insert data to influxdb. I have two instance of this task with two different database config. This two tasks are execute in the same time. One off instance execute correctly, second task never ends. Does anybody have an idea what might be causing this log error or why the task is not ending?
    a
    • 2
    • 4
  • p

    Philip MacMenamin

    12/22/2021, 3:19 PM
    Hi, is there a standard way to test a Task raises an Exception?
    a
    • 2
    • 7
  • d

    Dimosthenis Schizas

    12/22/2021, 3:31 PM
    Hello dear community. I'm trying to deploy a k8s agent inside a k8s cluster and I'm sstruggling with the manifest. First the manifest asks for
    PREFECT__CLOUD__AGENT__AUTH_TOKEN
    which is deprecated if I understand correctly. Is it correct to assume that I can remove that arg and keep only the
    PREFECT__CLOUD__API_KEY
    a
    • 2
    • 13
  • p

    Paul Gierz

    12/22/2021, 4:34 PM
    Hello, small and likely stupid question. I have something like this:
    @task
      def get_n_newest_files_for_pattern(pattern: str, path: str, n: int) -> list:
          """
          Task to get the n newest files for a given pattern.
          """
          logger = prefect.context.get("logger")
          <http://logger.info|logger.info>(f"Getting {n} newest files in {path} for pattern {pattern}")
          path_files = os.listdir(path)
          files_with_path = [os.path.join(path, f) for f in path_files]
          files = [pathlib.Path(f) for f in files_with_path if re.search(pattern, f)]
          <http://logger.info|logger.info>(f"Found {len(files)} files for pattern {pattern}")
          logger.debug(f"Files: {files}")
          <http://logger.info|logger.info>("Sorting files by modification time")
          files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
          <http://logger.info|logger.info>(f"Returning the {n} newest files")
          return files[:n]
    I am getting file not found errors with:
    FileNotFoundError: [Errno 2] No such file or directory: '<Parameter: Path to the top level of the experiment tree>/outdata/fesom'
    I thought that once it was loaded, any
    Parameter
    would behave as whatever type it is supposed to be? It is defined like this:
    path = Parameter(name="Path to the top level of the experiment tree")
    I one time before do an f-string conversion:
    outdata_path = f"{path}/outdata/fesom"
    Not having f-strings would be possible, but a bit annoying
    m
    • 2
    • 12
  • p

    Pedro Machado

    12/22/2021, 7:46 PM
    Hi I am having trouble registering a flow that used to work and can't figure out what is going on. I am using a different laptop today. Can you tell what may be going on from the output in the thread?
    a
    • 2
    • 12
  • l

    Leon Kozlowski

    12/22/2021, 7:56 PM
    If I want to upgrade my agent prefect version to use some new features for a new flow (by bumping docker image tag version) will flows already deployed running with an older version of prefect continue to run? Storage: Docker Agent: Kubernetes (EKS)
    m
    • 2
    • 3
Powered by Linen
Title
l

Leon Kozlowski

12/22/2021, 7:56 PM
If I want to upgrade my agent prefect version to use some new features for a new flow (by bumping docker image tag version) will flows already deployed running with an older version of prefect continue to run? Storage: Docker Agent: Kubernetes (EKS)
m

Michael Adkins

12/22/2021, 8:02 PM
Generally, yes. We try not to break compatibility when the agent is on a newer version than the flow runs.
l

Leon Kozlowski

12/22/2021, 9:34 PM
Ok thanks @Michael Adkins I’m looking to go from 0.14.19 -> 0.14.22 to use the KV store
m

Michael Adkins

12/22/2021, 10:13 PM
Should be fine 🙂
View count: 1