https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Alex Welch

    02/26/2021, 8:06 PM
    Is there a way to import a custom model when using
    GitHub
    storage? Reason for this is two-fold. Since we would be using the same block of code for just about every flow, it would help keep it semi-DRY. Second, it would allow us to pass different configs depending on environment variables (
    dev
    vs
    prod
    )
    from prefect_utils import (
      RUN_CONFIG,
      STORAGE
    )
    flow.storage=STORAGE
    flow.run_config=RUN_CONFIG
    It works with the
    S3
    storage. But when trying to use
    GitHub
    I get the below.
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'prefect_utils'")
    The module is in the same folder as the flow (
    flows/
    ). Is this an issue because when the flow is ran it is being ran from the parent directory? thus an issue with relative paths?
    n
    • 2
    • 8
  • c

    Carlo

    02/26/2021, 8:13 PM
    Hi. In this example, if all the sub flows had the same parameters, how do I create a parameter at the parent flow to thread thru to the dependencies?
    flow_a = StartFlowRun(flow_name="A", project_name="examples", wait=True)
    flow_b = StartFlowRun(flow_name="B", project_name="examples", wait=True)
    flow_c = StartFlowRun(flow_name="C", project_name="examples", wait=True)
    flow_d = StartFlowRun(flow_name="D", project_name="examples", wait=True)
    
    with Flow("parent-flow", schedule=weekday_schedule) as flow:
        b = flow_b(upstream_tasks=[flow_a])
        c = flow_c(upstream_tasks=[flow_a])
        d = flow_d(upstream_tasks=[b, c])
    n
    • 2
    • 2
  • j

    Jack Sundberg

    02/28/2021, 4:12 AM
    On Prefect-Cloud, I had 1000 flow runs and after roughly 150 of them completed, the remaining all failed with "_Failed after exceeding scheduled work SLA_". Is there some unwritten limit to Prefect Cloud that I'm missing? My best guess is a limit on task duration or maybe a limit on how far "behind schedule" a flow run is allowed to be.
    c
    • 2
    • 13
  • h

    Hiodo

    03/01/2021, 7:19 AM
    Hi! We are developing an OCR service. I would like to know how good the prefect is for this task?
    j
    • 2
    • 1
  • s

    Sara Iris Valverde Bernuz

    03/01/2021, 9:52 AM
    Hey guys, Im having this error while trying to deploy a script
    Beginning health checks...
    System Version check: OK
    Traceback (most recent call last):
    File "/opt/prefect/healthcheck.py", line 151, in <module>
    flows = cloudpickle_deserialization_check(flow_file_paths)
    File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
    flows.append(cloudpickle.loads(flow_bytes))
    File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle.py", line 562, in subimport
    __import__(name)
    ModuleNotFoundError: No module named 'gcloud'
    a
    • 2
    • 12
  • s

    Sara Iris Valverde Bernuz

    03/01/2021, 9:58 AM
    has anyone had it and managed to fix it somehow?
  • m

    Matt Liszewski

    03/01/2021, 5:06 PM
    Hi! We are shipping our Kubernetes Run logs to Datadog. Datadog (as well as most log aggregators) prefers logs in JSON format. What is the best way to get Prefect logs formatted in JSON?
    d
    m
    a
    • 4
    • 12
  • l

    Laura Vaida

    03/01/2021, 5:27 PM
    Hi always get that error with the package, but installed it several times. thanks for helping me!
    [2021-03-01 18:24:40+0100] ERROR - prefect.FlowRunner | Unexpected error: ModuleNotFoundError("No module named 'oauthlib'")
    Traceback (most recent call last):
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\prefect\engine\runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\prefect\engine\flow_runner.py", line 619, in get_flow_run_state
        final_states = executor.wait(
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\prefect\executors\dask.py", line 397, in wait
        return self.client.gather(futures)
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\distributed\client.py", line 1993, in gather
        return self.sync(
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\distributed\client.py", line 839, in sync
        return sync(
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\distributed\utils.py", line 340, in sync
        raise exc.with_traceback(tb)
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\distributed\utils.py", line 324, in f
        result[0] = yield future
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\tornado\gen.py", line 762, in run
        value = future.result()
      File "C:\Users\laura.vaida.000\anaconda3\envs\prefect\lib\site-packages\distributed\client.py", line 1858, in _gather
        raise exception.with_traceback(traceback)
      File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
    ModuleNotFoundError: No module named 'oauthlib'
    [2021-03-01 18:24:40+0100] ERROR - prefect.billwerk-contracts | Unexpected error occured in FlowRunner: ModuleNotFoundError("No module named 'oauthlib'")
    m
    • 2
    • 39
  • m

    Marwan Sarieddine

    03/01/2021, 10:41 PM
    Hi folks, is there a way to trigger a flow B to run if flow A fails, without having a “meta” flow C that waits on flow A to finish ?
    k
    • 2
    • 5
  • c

    Carl

    03/02/2021, 11:25 AM
    Probably a basic question, but I’m wondering how people organise their flow codes? I’m processing a whole heap of text files. Some use the exact same flow and tasks, whilst others have their own peculiarities. Do you generally have a
    tasks.py
    file with all your task functions? A
    .py
    with a
    Flow
    for each file you process? Interested to see how others work this.
    s
    m
    • 3
    • 2
  • s

    Sami Niemelä

    03/02/2021, 12:13 PM
    Hi! Not sure what the best/right channel for support requests is, so I'll try here. • Problem: Flow that has worked previously suddenly stopped working yesterday • Setup: Orchestration with Prefect Cloud, agent running on AWS EC2 instance (agent process hosted with forever) • Done so far: ◦ Restarted the agent, last agent log line is "[2021-03-02 11:37:18,922] INFO - agent | Waiting for flow runs..." ◦ Restarted the flow run in Prefect Cloud, but no logs are being generated (picture attached) This is production stuff and related to month change process, so any pointers are highly appreciated... EDIT: After agent restart, the agent status was green for about 15 minutes, but now turned red.
    j
    • 2
    • 11
  • h

    Harshal Rane

    03/02/2021, 3:07 PM
    Hi prefect experts, How many local agents can run parallel in a machine? any limit on this in prefect.io
    j
    • 2
    • 4
  • r

    Robert Bastian

    03/02/2021, 3:27 PM
    Hello Prefect! I have a question about variables defined in the config.toml file. Can you confirm that when registering a flow any variables defined in config.toml are “uploaded” to Prefect Cloud as part of the registration process? For example when instantiating classes with config.toml configuration: refresh_mv = PostgresExecute(name=“Refresh MV”, host=prefect.config.postgres.host, db_name=prefect.config.postgres.db_name, user=prefect.config.postgres.user) I expected to need to define those configuration variables as environment variables in the execution rutime but did not. Thanks in advance! rb
    j
    j
    • 3
    • 9
  • r

    rafaqat ali

    03/02/2021, 3:45 PM
    Hi team, I have registered flow using the below code
    with Flow("Test run", run_config=DockerRun(labels=["dev"])) as flow:
                flow.add_task(say_hello())
            flow.register("Test")
    @task
    def say_hello():
        print("Hello, world!")
    When I run the flow from UI, I got the below error Failed to load and execute Flow's environment: AttributeError("'NoneType' object has no attribute 'rstrip'") Can anyone has faced such issue or help me in right direction?
    j
    • 2
    • 5
  • l

    Laura Vaida

    03/02/2021, 4:52 PM
    hi folks, im getting this error when kicking of a registered flow from the ui, but have claimed that as dependency:
    #define executor as dask executor with tcp from load balancer
    flow.executor = DaskExecutor(address="35.198.104.230:8786") # tcp:port address of dask load balancer
    flow.storage = Docker(registry_url="<http://gcr.io/keller-steering-enabling/flows|gcr.io/keller-steering-enabling/flows>", image_name="billwerk-contracts",python_dependencies=["pandas", "oauthlib ", "requests", "requests_oauthlib", "datetime", "oauth2client", "snowflake"])
    flow.run_config = KubernetesRun()
    flow.register('Billwerk')
    m
    m
    • 3
    • 5
  • j

    Julie Sturgeon

    03/02/2021, 4:53 PM
    Hi all! I have some flows that continue to run in Prefect Cloud even though I have the flow schedule turned off on the UI. Has anyone encountered this before?
    c
    • 2
    • 5
  • m

    mithalee mohapatra

    03/02/2021, 9:13 PM
    Does flow.run() needs a local prefect agent ?
    j
    • 2
    • 4
  • h

    Hui Zheng

    03/02/2021, 11:50 PM
    Hello everyone. Is there a way to customize the final message of the flow ? Currently it only says
    ====== final results: <Success: "All reference tasks succeeded.">
    or 
    ====== final results: <Failed: "Some reference tasks failed.">
    We have different types of successes/failures. Currently we could not differentiate them from the flow hook notification message. We would like to set customized messages to different scenarios and react differently. for example
    ====== final results: <Failed: "Task_A failed due to timeout.">
    m
    j
    • 3
    • 12
  • l

    Laura Vaida

    03/03/2021, 11:44 AM
    hi folks, what is the best way to pass results from one task to another? making a cache?
    ✅ 1
    a
    s
    • 3
    • 30
  • j

    John Ramirez

    03/03/2021, 2:58 PM
    hey everyone - my cloud account is broken (see pic below). Who should I contact to get the account restored?
    j
    d
    • 3
    • 16
  • l

    Laura Vaida

    03/03/2021, 3:20 PM
    how to store the gcp credentials in the ui? the whole json or certain aspects of that?ß
    ✅ 1
    m
    • 2
    • 4
  • b

    Brian Mesick

    03/03/2021, 4:16 PM
    I’m curious if other folks are running into this. Using a KubernetesRun run config and Docker storage for our flows, we almost always need to manually bump the Docker
    image_tag
    to a new version in order for K8s to pick up the new image. Does anyone have a more automated way of doing this? It’s not uncommon for someone to forget to make that change and leads to confusion about what’s running in prod.
    m
    j
    +2
    • 5
    • 8
  • i

    itay livni

    03/03/2021, 4:22 PM
    Hi - I have a
    flow
    that runs successfully with a
    LocalExecutor
    . However when the
    flow
    is run using
    flow.executor = DaskExecutor()
    the flow returns a
    Failed to deserialize
    error on the final
    task
    .
    [2021-03-03 08:27:24-0600] INFO - prefect.TaskRunner | Task 'new-terms': Finished task run for task with final state: 'Success'
    INFO:prefect.TaskRunner:Task 'new-terms': Finished task run for task with final state: 'Success'
    distributed.protocol.core - CRITICAL - Failed to deserialize
    Traceback (most recent call last):
      File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/core.py", line 151, in loads
        value = _deserialize(head, fs, deserializers=deserializers)
      File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
        return loads(header, frames)
      File "/home/ilivni/miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
        return pickle.loads(x, buffers=buffers)
      File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
        return pickle.loads(x)
      File "miniconda3/envs/py38lmap/lib/python3.8/site-packages/tblib/pickling_support.py", line 26, in unpickle_exception
        inst = func(*args)
    TypeError: __init__() missing 1 required positional argument: 'request'
    The last task is a
    FilterTask
    filter_res = FilterTask()
    res = filter_res(
                keywords,
                task_args={
                    "name": "new-terms"
                    }
                )
    Any suggestions?
    j
    • 2
    • 7
  • l

    Laura Vaida

    03/03/2021, 4:31 PM
    hi, what is that pointing at?
    Unexpected error: TypeError("'NoneType' object is not subscriptable")
    m
    • 2
    • 62
  • s

    Sean Talia

    03/03/2021, 4:49 PM
    hi all – I'm trying to get an ECS Agent to launch a task that i've preregistered with ECS (and i'm just passing in the
    task_definition_arn
    to the run configs), but I'm currently getting this error:
    An error occurred (InvalidParameterException) when calling the RunTask operation: Override for container named flow is not a container in the TaskDefinition.
    does this mean that my pre-registered task needs to have a container named
    flow
    ?
    j
    • 2
    • 3
  • d

    Daniel Ahn

    03/03/2021, 6:05 PM
    Hi, here's yet another license question:
    Prefect Core is licensed under the Apache Software License Version 2.0. Please note that Prefect Core includes utilities for running Prefect Server and the Prefect UI, which are themselves licensed under the Prefect Community License.
    • Is there a clearer documentation on what's under PCL vs Apache 2.0?
    Prefect Core includes utilities for running Prefect Server and the Prefect UI
    • are these referring to the CLI wrapper around docker-compose (https://github.com/PrefectHQ/prefect/tree/master/src/prefect/cli)? • in other words, if I write my own docker-compose to deploy Prefect Core in my environment, the entire architecutre should be under Apache 2.0?
    c
    • 2
    • 5
  • j

    Jack Sundberg

    03/03/2021, 7:49 PM
    Is there a way to set the max_workers and max_deployed_flows for a single Agent? I can see the max_workers is hardcoded here. Is it possible to add a max_workers argument to the base Agent class? I believe we would only need to change this one line. In a number of cases, I would like to limit the number of threads (workers) the Agent is using. And for max_deployed_flows to an Agent, it looks like Agents grab all flows that are ready by default, shown here. It would be useful to add a graphql "limit" tag to this mutation to ensure one Agent isn't "hogging" flow runs when another (with equivalent Labels) is open and ready for them. I'm not sure if this is possible in a mutation though and may be trickier to change.
    j
    • 2
    • 14
  • b

    Berty

    03/03/2021, 8:25 PM
    How does one increase flow concurrency in prefect cloud? I am currently limited to 2.
    j
    c
    • 3
    • 7
  • s

    Sean Talia

    03/03/2021, 9:43 PM
    I feel like am so close to getting my ECS Task to run w/ the ECS Agent, but I'm having an odd issue where my flow that's calling this task is just hanging forever in a scheduled/pending state – I can see in the AWS console that the ECS task is being run, and prefect is indeed passing a lot of ENV variables to the container that's getting spun up, but nonetheless my flow is always stuck in a "Submitted" state and I'm not seeing any logs in the Cloud UI
    m
    d
    +2
    • 5
    • 64
  • d

    Daniel Caldeweyher

    03/03/2021, 10:30 PM
    Hi, new here but have been using prefect in anger for a couple of weeks and loving it
    j
    • 2
    • 1
Powered by Linen
Title
d

Daniel Caldeweyher

03/03/2021, 10:30 PM
Hi, new here but have been using prefect in anger for a couple of weeks and loving it
j

Jim Crist-Harif

03/03/2021, 10:35 PM
Glad to hear it Daniel!
View count: 3