https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Josh Paulin

    07/13/2022, 3:55 PM
    If I have a conditional check in my flow thats something like
    with case(cond, True):
        val1_if_true = action1_if_true()
        val2_if_true = action2_if_true()
    
    
    with case(cond, False):
        val_if_false = action_if_false()
    What (if anything) do I pass to the merge function?
    ✅ 1
    a
    6 replies · 2 participants
  • s

    Sabir Ali

    07/13/2022, 5:21 PM
    I am trying to connect elasticsearch clietn but getting exception
    Traceback (most recent call last):
      File "/Users/sabirali/PycharmProjects/ETL/ElasticSearchClientTest.py", line 5, in <module>
        print(<http://client.info|client.info>())
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
        return api(*args, **kwargs)
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/__init__.py", line 2277, in info
        return self.perform_request(  # type: ignore[return-value]
      File "/Users/sabirali/PycharmProjects/ETL/venv/lib/python3.8/site-packages/elasticsearch/_sync/client/_base.py", line 332, in perform_request
        raise UnsupportedProductError(
    elasticsearch.UnsupportedProductError: The client noticed that the server is not Elasticsearch and we do not support this unknown product
    I used following command to install elasticsearch client
    (venv) sabirali@Sabirs-MacBook-Pro ETL % pip install elasticsearch      
    Collecting elasticsearch
      Using cached elasticsearch-8.3.1-py3-none-any.whl (382 kB)
    Requirement already satisfied: elastic-transport<9,>=8 in ./venv/lib/python3.8/site-packages (from elasticsearch) (8.1.2)
    Requirement already satisfied: urllib3<2,>=1.26.2 in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (1.26.9)
    Requirement already satisfied: certifi in ./venv/lib/python3.8/site-packages (from elastic-transport<9,>=8->elasticsearch) (2022.6.15)
    Installing collected packages: elasticsearch
    Successfully installed elasticsearch-8.3.1
    WARNING: You are using pip version 21.3.1; however, version 22.1.2 is available.
    You should consider upgrading via the '/Users/sabirali/PycharmProjects/ETL/venv/bin/python -m pip install --upgrade pip' command.
    (venv) sabirali@Sabirs-MacBook-Pro ETL %
    ✅ 1
    a
    13 replies · 2 participants
  • d

    Divya

    07/13/2022, 6:33 PM
    Hello all, I am trying to connect to Perfect Server but I am getting the error message as below. Can you please help me figure out the issue or the step that I am missing? The steps taken so far: 1. Installed prefect using the pip command 2. Created a flow and ran successfully(screenshot attached). 3. Executed the below commands to start the server: *prefect backend serve*r prefect server start --detach --use-volume --expose Thank you! Best, Divya
    ✅ 1
    n
    5 replies · 2 participants
  • f

    Frank Hardisty

    07/13/2022, 8:25 PM
    Hello, I work at Replica, we are enthusiastic Prefect users on a paid plan. Yesterday we had an unfortunate misconfiguration which caused us to send an excessive amount of logs to Prefect, and our Prefect logging was shut off. Fair enough. The misconfiguration was fixed, we are not spamming logs anymore. however the lack of Prefect logs is an operational issue for us, including for some Flows that produce customer-facing data. So... can I ask for our logging to be turned on? Or an estimate of when it might be turned back on? Thanks in advance. If this is not the right place to ask this question, apologies, please direct me to the right place to ask about this. Thanks.
    ✅ 1
    d
    3 replies · 2 participants
  • v

    Victoria Alvarez

    07/13/2022, 8:47 PM
    Hello. I run a flow every day using bigquery_insert_stream. I am having some trouble running this today because I get timeout or 500 internal server error. Any idea of what's wrong? Thank you in advance!
    ✅ 1
    a
    l
    4 replies · 3 participants
  • r

    Ryan

    07/14/2022, 2:05 AM
    Hi all -- are there plans to add a 'Paused' state / signal / other mechanism to 2.0 for flows and/or tasks?
    ✅ 1
    a
    5 replies · 2 participants
  • a

    Amanda Wee

    07/14/2022, 2:46 AM
    Is Prefect angling for an in-road to join Airflow as a managed solution by AWS? I'm asking because I've recently joined a new company, and the current plan is to use AWS managed Airflow to orchestrate dbt. I'm still learning, but my assessment is that it doesn't make sense to propose Prefect as an alternative right now because what the team is doing with Airflow is so simple. But in the future, there may be more, in which case I expect to run into the lack of AWS managed Prefect as an obstacle.
    ✅ 1
    a
    k
    5 replies · 3 participants
  • м

    Максим Пышный

    07/14/2022, 2:50 AM
    Hi, folks! Is there any way to get failed task's state message (or failed task's name at least) inside flow-level state handler? I am trying to build a message for slack alert that contains information about particular reason of fail. But I want to try to declare state handler just once (e.g. on flow level).
    ✅ 1
    k
    2 replies · 2 participants
  • s

    Shivam Bhatia

    07/14/2022, 7:46 AM
    Hi, I just hosted a prefect 1.2.4 on a gcp vm I am trying to run a flow using VertexRun and I got this error
    400 List of found errors:	1.Field: job_spec.worker_pool_specs[0].container_spec.env[12].value; Message: Required field is not set.	2.Field: job_spec.worker_pool_specs[0].container_spec.env[3].value; Message: Required field is not set.	3.Field: job_spec.worker_pool_specs[0].container_spec.env[4].value; Message: Required field is not set.	 [field_violations {
      field: "job_spec.worker_pool_specs[0].container_spec.env[12].value"
      description: "Required field is not set."
    }
    field_violations {
      field: "job_spec.worker_pool_specs[0].container_spec.env[3].value"
      description: "Required field is not set."
    }
    field_violations {
      field: "job_spec.worker_pool_specs[0].container_spec.env[4].value"
      description: "Required field is not set."
    }
    ]
    Do i need to configure vertex jobs manually?
    ✅ 1
    a
    10 replies · 2 participants
  • f

    Faheem Khan

    07/14/2022, 8:54 AM
    Hi All! Prefect2.0b7 Dockerflowrunner, dasktaskrunner. I am getting the following error
    _pickle.PicklingError: Can't pickle <function db_con at 0x7f85c986f880>: it's not the same object as __main__.db_con
    when I run a task with querying a database. the code runs fine without that db connection task.
    ✅ 1
    a
    16 replies · 2 participants
  • n

    Nikolaus Landgraf

    07/14/2022, 10:01 AM
    Hi there! Ever since we have scheduled a flow in a higher frequency, we see a lot of "*late*" flow runs in the prefect UI. What does this status mean? It feels like late flow runs are not automatically re-running. Also, it feels like all these late flow runs are "blocking" new flow runs from running (without the concurrency limit reached). Sometimes running a deployment in the prefect UI immediately results in a late state flow run
    ✅ 1
    a
    p
    6 replies · 3 participants
  • t

    Tom Klein

    07/14/2022, 11:35 AM
    Hello 🙋 let's say i have a mapped task
    X
    that spawn several "children" tasks
    Y1
    ...
    Yn
    let's say some of them are stuck on
    Running
    as a finished state - but i wanna guarantee the next phase in the flow executes correctly and ignores the few "hung" tasks -- what's the (default) behavior for if i set those tasks (e.g.
    Y3
    ,
    Y17
    and
    Y91
    ) to
    Skipped
    ? would the next task that depends on them still get executed (even if it has the default
    all_successful
    trigger?) the reason i'm asking about
    Skipped
    is because i wanna avoid a
    None
    response flowing downstream from these tasks
    ✅ 1
    a
    21 replies · 2 participants
  • r

    Rajeshwar Agrawal

    07/14/2022, 12:15 PM
    Hey Prefect, I have tried disabling heartbeat using instructions at https://docs.prefect.io/orchestration/concepts/services.html#heartbeat-configuration, however they dont seem to work for Prefect Server. What’s the
    config.toml
    setting for disabling both heartbeat and lazarus for prefect flows on Prefect Server?
    ✅ 1
    a
    7 replies · 2 participants
  • a

    Abhishek Mitra

    07/14/2022, 2:59 PM
    For a private db in AWS can I connect to it using MysqlFetch in a function without
    @task
    decorator ? I get
    pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on '<http://xyz-mysql8-cluster.cluster-fa43asdfd.us-west-2.rds.amazonaws.com|xyz-mysql8-cluster.cluster-fa43asdfd.us-west-2.rds.amazonaws.com>' (timed out)")
    every time. Any suggestions on how to approach in solving this?
    ✅ 1
    a
    11 replies · 2 participants
  • v

    Vrinda Kailash

    07/14/2022, 3:10 PM
    hi all We are using an Automation for a DAG to create a new flow-run after a flow-run has “Finished”, We are facing an issue where multiple flow-runs are created at the same time
    ✅ 1
    a
    14 replies · 2 participants
  • f

    FuETL

    07/14/2022, 3:22 PM
    Guys do prefect have priority flows? Example i have 4 flows: a, b, c, d and i want to prioritize a and c to agent run first is this possible?
    ✅ 1
    k
    1 reply · 2 participants
  • j

    Joshua Massover

    07/14/2022, 4:22 PM
    I'm having trouble getting prefect agent logs for the debug level to show up. I've set
    PREFECT__LOGGING__LEVEL=DEBUG
    as an environment variable, confirmed it's in the environment, and my agent logs are not showing anything but INFO. Is there anything else that I should need to set?
    ✅ 1
    a
    2 replies · 2 participants
  • t

    Thomas Opsomer

    07/14/2022, 4:23 PM
    Hello 🙂 I'm trying to build a Graphql query to compute the number of running tasks for a particular task. I tried with the
    task_run_state_aggregate
    but I don't know how to filter on a specific task 😕 If anyone can help 🙏
    ✅ 2
    a
    2 replies · 2 participants
  • j

    Joshua Massover

    07/14/2022, 5:51 PM
    I'm running into a case where the prefect cloud UI shows jobs as submitted. I have queried the graphql api by the flow id to confirm it is submitted (i presume that's where the ui gets it's info from anyway). The agent however can finds no ready flows. I am running with debug mode to see that it's polling and never finding anything:
    [2022-07-14 17:49:44,859] DEBUG - agent | Querying for ready flow runs...
    DEBUG:agent:Querying for ready flow runs...
    [2022-07-14 17:49:44,959] DEBUG - agent | No ready flow runs found.
    DEBUG:agent:No ready flow runs found.
    DEBUG:agent:Sleeping flow run poller for 10.0 seconds...
    [2022-07-14 17:49:44,960] DEBUG - agent | Sleeping flow run poller for 10.0 seconds...
    [2022-07-14 17:49:54,960] DEBUG - agent | Querying for ready flow runs...
    DEBUG:agent:Querying for ready flow runs...
    [2022-07-14 17:49:55,046] DEBUG - agent | No ready flow runs found.
    DEBUG:agent:No ready flow runs found.
    [2022-07-14 17:49:55,046] DEBUG - agent | Sleeping flow run poller for 10.0 seconds...
    DEBUG:agent:Sleeping flow run poller for 10.0 seconds...
    Anyone have an suggestions on how to continue to debug?
    ✅ 1
    a
    3 replies · 2 participants
  • m

    Matan Drory

    07/14/2022, 5:55 PM
    Does anyone have any tips working with a random number generator in prefect. I want my tasks to be able to recreate the result and also make sure they don’t get the same value when concurrency is involved. One of the things I though about is generating a set of random numbers based on a seed and passing them along to specific tasks. That way concurrency isn’t an issue. Just feel it’s not a very elegant solution. I though about putting the RNG in the context but then concurrency might be an issue
    ✅ 1
    a
    3 replies · 2 participants
  • m

    Minh Mai

    07/14/2022, 6:08 PM
    Does anyone know on top of their head why a script with
    flow.run()
    will fail once I register it to the UI? I'm current using the local server
    prefect backend server
    prefect server start
    prefect agent local start
    the error im getting is
    Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'utils\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
    ✅ 1
    a
    2 replies · 2 participants
  • s

    Sergey Goncharov

    07/14/2022, 8:21 PM
    Hi I'm trying to prepare a parametrised flow for Prefect v1. It has almost the same code as https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py , but I only changed some functions to make them ready to work with parameters. I cannot get why does not it work now: function
    CreateAndRunJob
    marked as succeeded but prefect does not wait for the end of the execution and then function
    get_our_pod_name
    fails because it cannot find Job which is not yet created. I though to use threading or queue from python here, but I think it's not the right way with Prefect. Could you please advice here? I believe I do something wrong here.
    ✅ 1
    a
    8 replies · 2 participants
  • c

    Cab Maddux

    07/14/2022, 10:04 PM
    Hi! I'm trying to use the Orion python API
    prefect.packaging.docker.DockerPackager
    class and have it push to Google Container Registry. It looks like the current
    package()
    method is unable to push to GCR because the
    prefect.docker.push_image
    here ignores any path provided in the registry url. So it looks to me like if I provide a
    registry_url
    like
    <https://gcr.io/my-project-name>
    (where
    my-project-name
    is a GCP project) for a flow named
    my-flow
    rather than pushing an image to
    <http://gcr.io/my-project-name/my-flow|gcr.io/my-project-name/my-flow>
    the current implementation will try to push to
    <http://gcr.io/my-flow|gcr.io/my-flow>
    (which fails because I don't have a project named
    my-flow
    ). Wondering if I'm missing something here or maybe this should go into an issue?
    ✅ 1
    a
    3 replies · 2 participants
  • a

    Alex Fok

    07/14/2022, 10:24 PM
    hello everyone, i have been trying to set up the prefect server. When I run prefect server start, I encountered this error. I have already installed docker-compose.
    ✅ 1
    a
    5 replies · 2 participants
  • j

    Joshua Massover

    07/14/2022, 10:38 PM
    Are there any suggestions for where to put code for instrumentation? I'd like to wrap my flows and tasks. I've found two spots that seem ok:
    def patch_task():
        method = getattr(CloudTaskRunner, "run")
    
        def patched(self, *args, **kwargs):
            name = self.task.name
            with tracer.trace(f"task", resource=name):
                result = method(self, *args, **kwargs)
            tracer.flush()
            return result
    
        setattr(CloudTaskRunner, "run", patched)
    
    
    def patch_flow():
        method = getattr(CloudFlowRunner, "run")
    
        def patched(self, *args, **kwargs):
            name = self.flow.name
            with tracer.trace(f"flow", resource=name):
                result = method(self, *args, **kwargs)
            tracer.flush()
            return result
    
        setattr(CloudFlowRunner, "run", patched)
    ✅ 1
    a
    4 replies · 2 participants
  • m

    Mohamed Hatem Diabi

    07/15/2022, 12:13 AM
    Hi everyone, a question about Prefect 1. I am trying to run a subflow using a task. My code:
    @task(log_stdout=True)
    def create_subflows(list_of_elements):
       list_param= []
       for element in list_of_elements:
            list_param.append(
                {
                    "pr": element,
                }
            )
       mapped_flows = create_flow_run.map(
            flow_name=unmapped("Subflow"),
            parameters=list_param,
        )
    
    with Flow("Parent Flow") as flow:
       list_of_elements = [1,2,3]
       create_subflows(list_of_elements)
    I am getting this error: `ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>. This often means you called a task outside a with Flow(...) block. If you're trying to run this task outside of a Flow context, you need to call
    create_flow_run.run(...)
    ✅ 1
    a
    3 replies · 2 participants
  • i

    Ievgenii Martynenko

    07/15/2022, 6:19 AM
    Just an interesting observation. When using clocks.RRuleClock for creating built-in scheduler and setting start time to 2000-01-01, when deployed via "prefect register" it takes 10 minutes for deployment. When doing the same with 2022-01-01, it takes a minute. Wonder what is happening under the hood 🙂
    a
    1 reply · 2 participants
  • w

    wonsun

    07/15/2022, 6:56 AM
    Hi all~ I want to know that how to change the prefect's default server database! (using prefect 1.0) There's method "Running Prefect with a pre-existing postgres database" in prefect docs. But can i change the
    server.database.connection_url
    or
    server.database.host
    to connecting information for MySQL database, not postgres?
    ✅ 1
    a
    11 replies · 2 participants
  • m

    Mickael ANDRIEU

    07/15/2022, 8:11 AM
    Hi everybody, to we have exemples of Prefect Orion used to manage Airbyte and dbt tasks ? I have this simple need and I'm wondering if I should use Prefect 1 or 2.
    ✅ 1
    a
    a
    54 replies · 3 participants
  • e

    Eric So

    07/15/2022, 8:38 AM
    In Prefect 1, I can’t seem to see any logs in the UI when using
    LocalDaskExecutor(scheduler="processes")
    while with
    threads
    logs can be seen in UI as normal, any ideas?
    ✅ 1
    a
    2 replies · 2 participants
Powered by Linen
Title
e

Eric So

07/15/2022, 8:38 AM
In Prefect 1, I can’t seem to see any logs in the UI when using
LocalDaskExecutor(scheduler="processes")
while with
threads
logs can be seen in UI as normal, any ideas?
✅ 1
Ah nevermind found this: https://github.com/PrefectHQ/prefect/issues/5895
a

Anna Geller

07/15/2022, 9:42 AM
Yes exactly, glad you found it. There is a linked issue there where I also reproduced it so I know what you mean
View count: 6