https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Brian Mesick

    11/02/2020, 10:05 PM
    Is there a way of seeing the json blob of parameters that a flow run was executed with in the Cloud UI?
    k
    • 2
    • 6
  • m

    Michelle Wu

    11/03/2020, 5:24 AM
    Hi! This is Michelle from Hong Kong. Lately, I've been trying to migrate services from Airflow to Prefect. One of the functions of Airflow we use is Admin > Connections, where we store parameters of different connections (ex. databases). I wonder if there is a similar function to use in Prefect? I've been getting through official documents, viewing concepts of "Secrets" etc., but haven't been able to locate a concrete answer...
    a
    • 2
    • 2
  • m

    Max

    11/03/2020, 1:57 PM
    Hi everyone! Does dask executor support versioning? Let's say I have two versions of a codebase for my tasks, each lives in a separate docker container. I'd like to be able to run both versions, e.g. flow1 runs inside container1, and flow2,3 run inside container2. This could be achieved without dask AFAIK, but dask is really handy for resources management, such as GPU memory.
    c
    • 2
    • 1
  • c

    Chris White

    11/03/2020, 3:52 PM
    Hello folks! The Prefect team has the day off in recognition of Election Day in the US, and so might not get to all questions today. We appreciate your patience and understanding and we’ll be back tomorrow!
    🚀 1
    🗳️ 3
    👍 6
  • p

    Priyan Chandrapala

    11/03/2020, 5:14 PM
    Hello Everyone! We are using Prefect cloud. We have dev and prod environments on separate EC2 instances. Each env has an agent configured and we can see 2 agents/ 2 flows for dev and prod in Prefect cloud. My question is how do we know for sure the prod flow will always run the job in the prod EC2 instance? Is there a possibility of the Prod flow running the job on dev EC2 and vice versa. cc @Adrien Boutreau
    r
    • 2
    • 4
  • l

    Lukas N.

    11/03/2020, 5:15 PM
    Hello Prefect team 👋, I'm adding
    S3Result
    to my flows and I have encountered a weird error (template issue?), which I tried to replicate in a minimal example fashion. I'm running the flow through Prefect server which I set up using
    prefect server start
    and using local agent
    prefect agent start
    . More info in thread
    b
    m
    • 3
    • 10
  • m

    matta

    11/03/2020, 9:21 PM
    Hey, what's the standard way of storing Secrets for Prefect Core?
  • m

    matta

    11/03/2020, 9:22 PM
    We've got our own cluster that we're running it on. Is there an easy for it to talk to EKS or something? Or what are Best Practices here?
  • m

    matta

    11/03/2020, 9:56 PM
    Also, just wanted to say I'm super happy that I got a new gig where we're using Prefect! 🙂
    🚀 3
  • b

    Ben Fogelson

    11/03/2020, 10:19 PM
    I’m wondering about using
    apply_map
    with functions that create parameters. Here’s a minimal example:
    from prefect import Flow, Parameter, unmapped, apply_map
    
    def func(x, y):
        z = Parameter('z')
        w = x + y + z
        return w
    
    with Flow('flow') as flow:
        x = Parameter('x')
        y = Parameter('y')
        w = apply_map(func, x, y)
    
    ---------------------------------------------------------------------------
    ValueError: Parameters must be root tasks and can not have upstream dependencies.
    The error comes when
    apply_map
    tries to add edges from the root tasks of the subgraph to the argument tasks passed to
    apply_map
    . This fails when it tries to make an edge from
    x
    to
    z
    since
    z
    is a parameter. This error seems to preclude ever using
    apply_map
    with a function that creates a parameter. I’m wondering if that constraint is necessary. My intuition is that if
    apply_map
    sees a parameter that is in the subflow but not in the parent flow, it should assume that any edges from that parameter are unmapped edges that should be added to the parent flow. Does this seem right?
  • m

    Marwan Sarieddine

    11/04/2020, 12:58 AM
    Hi folks, I know flow cancelling was recently introduced in prefect, but I am curious why *prefect.engine.state.State.is_cancelled*() wasn't also introduced ?
    • 1
    • 1
  • j

    Jasono

    11/04/2020, 1:04 AM
    Hi, a newbie question… If the trigger for all tasks in a flow is “all successful”, and one of the upstream task fails, all of the downstream tasks are “trigger failed” (even if the upstream failed task is restarted and succeeds). This forces me to restart the trigger-failed tasks one by one. Is this the intended behavior? or am I missing something? Should I change the trigger to “all finished” in order to avoid this manual restarting of the downstream tasks when one of the upstream tasks fails at least once?
    c
    • 2
    • 38
  • a

    abhilash.kr

    11/04/2020, 6:56 AM
    Hi Folks, Did anyone receive the prefect 5k stars goodies? - https://prefect-community.slack.com/archives/CKNSX5WG3/p1601480661016400
    j
    • 2
    • 2
  • p

    psimakis

    11/04/2020, 7:51 AM
    Hello all, Sorry in advance for this vague issue but I couldn't be more specific 😕 In my case, flows are split in different groups (different sub-folders) based on their similarity and for my convenience, I have created a shell script to register specific groups of flows each time. so far, so good. When I register a flow I have noticed that the scheduling of another flow is deregulated. Let's say I have flow
    A
    and flow
    B
    . cron for flow B is
    0 11 * * 3
    (1 event every Wednesday at 11:00am). Last time I registered flow
    A
    I noticed that two runs had been scheduled for flow
    B
    for the upcoming Wednesday (both at 11:00am). When I re-registered flow
    B
    , scheduling was fixed. Have you any idea why this happening? Thanks. python:
    3.7.3
    prefect:
    0.13.12
    OS:
    Debian GNU/Linux 10 (buster)
    c
    • 2
    • 4
  • r

    Radek Tomsej

    11/04/2020, 11:07 AM
    Had anybody tried to pass results/states between tasks that are in different flows? Thank you! https://prefect-community.slack.com/archives/CL09KU1K7/p1604173648410700
  • u

    uttam K

    11/04/2020, 11:42 AM
    I have written mysql connection function in different file and imported in my flow it works well when i use flow.run() But when I run using prefect server UI it says module not found . I have attached code .Please help
    233.zip
    a
    p
    m
    • 4
    • 4
  • r

    Roey Brecher

    11/04/2020, 1:18 PM
    Hi, any pointer on how to use
    map
    with
    StartFlowRun
    ?
    c
    • 2
    • 6
  • p

    Philip MacMenamin

    11/04/2020, 6:28 PM
    problems with connecting Prefect agent Hey, I have a prefect server running which I am having problems getting an agent connected. The agent is running on the machine, and starts up with the message:
    Agent connecting to the Prefect API at <http://localhost:4200>
    but, I'm unable to see or use the agent. (If I run curl on localhost:4200, I can connect without issue.)
    • 1
    • 3
  • h

    Hui Zheng

    11/04/2020, 7:15 PM
    Hello, I wonder how a task could process the result of a failed upstream task. Please see the example below, I have a
    task_a
    and then I have a
    task_b
    which takes the
    task_a()
    as upstream task.
    task_b
    always runs and process the task_a result even when the task_a fails
    def task_a(threshold: float):
      result = random()
      if result > threshold:
       return result
      else:
        # question 1: how do I get task_a to return the result of random() even when the FAIL signal is raised. 
        raise signals.FAIL("failed, result is less than the threshold")
    
    
    @task(trigger=triggers.always_run)
    def task_b(result_a: float):
      # question 2: how could task b check if the task_a is a success or failure? 
      if result_a.is_failed():   # ???
        print ('task_a failed with result {}'.format(result_a))
      else:
        print ('task_a suceeds with result {}'.format(result_a))
    
    
    with Flow('task_b always handles task_a failure') as flow:
      result_a = task_a(0.3)
      result_b = task_b(result_a)
    I have two questions: 1. how do I get
    task_a
    to return the result of
    random()
    even when the
    FAIL
    signal is raised. 2. how do
    task_b
    check if the
    task_a
    is a success or failure?
    c
    • 2
    • 8
  • h

    Hui Zheng

    11/04/2020, 7:38 PM
    Hello, I would like to raise a feature request to allow ShellTask to return stdout when it fails.
    https://github.com/PrefectHQ/prefect/issues/3614
    please see this link or the thread for the details
    • 1
    • 1
  • s

    Saatvik Ramisetty

    11/04/2020, 9:05 PM
    Hey guys, new here to Prefect. The DevOps team at my company created a server for us but i can't seem to connect.
    curl -I <http://myserver>
    seems to return
    HTTP/1.1 200 OK
    Date: Wed, 04 Nov 2020 20:57:57 GMT
    Content-Type: text/html
    Content-Length: 12382
    Connection: keep-alive
    Server: nginx/1.18.0
    Last-Modified: Tue, 13 Oct 2020 23:49:16 GMT
    ETag: "5f863cfc-305e"
    Accept-Ranges: bytes
    But i can seem to connect on the UI.
    m
    m
    • 3
    • 7
  • j

    Jakub Hettler

    11/04/2020, 10:52 PM
    Hi everyone, first thanks for all work on Prefect, good job! 👏 We are in the phase of testing Prefect as a potential replacement of Airflow, but I am not sure if I understand the Agent part of concepts correctly. In Airflow we have workers and there are many of workers across our infrastructure (servers) - we can scale just by adding workers - to be able to run more and more tasks. And worker is asking if there are some tasks what worker can do. As I understand, workers are called Agents in Prefect and they have similar function - am I right? If yes, how can I scale the Agents (distribute them across servers) or how the scaling is done and is it possible to scale community version? Thanks for any explanation! cc @Radek Tomsej
    👀 1
    c
    • 2
    • 1
  • m

    Matt Drago

    11/05/2020, 1:01 AM
    Hey Folks, I'm hopeful that someone can give me some tips as to what I am doing wrong with the BigQueryLoadGoogleCloudStorage Task. I have built a flow to export data from a MySQL database and then upload it to GCS (all works fine). When the flow attempt to run the BigQueryLoadGoogleCloudStorage task, I get the following Error:
    Unexpected error: PicklingError('Pickling client objects is explicitly not supported.\nClients have non-trivial state that is local and unpickleable.',)
    I'm configuring the security credentials for GCP in
    .prefect/config.toml
    under the
    [context.secrets]
    section using the
    GCP_CREDENTIALS
    field. Doing
    flow.run()
    , the flow completes sucessfully. I then do
    flow.register(...)
    and then start the agent with
    flow.run_agent()
    and manually kick off a run in the UI. The following is an example of how I am adding the
    BigQueryLoadGoogleCloudStorage
    task to the flow:
    from prefect import Flow
    from prefect.tasks.gcp.bigquery import BigQueryLoadGoogleCloudStorage
    from google.cloud.bigquery import SchemaField
    
    load_to_bigquery = BigQueryLoadGoogleCloudStorage(
        dataset_id="data_lake_dev",
        project= "data-platform"
    )
    
    with Flow("Move Data From DB to BQ") as flow:
        load_to_bigquery(
            uri="<https://storage.cloud.google.com/data-platform-dev/a-csv-file.csv>",
            table="website_data",
            schema=[
                SchemaField(name="id", field_type="INT64", mode="REQUIRED"),
                SchemaField(name="uuid", field_type="STRING"),
                SchemaField(name="site", field_type="STRING"),
                SchemaField(name="path", field_type="STRING"),
                SchemaField(name="type", field_type="STRING", mode="REQUIRED"),
                SchemaField(name="reported_at", field_type="TIMESTAMP", mode="REQUIRED"),
                SchemaField(name="helpful", field_type="BOOL", mode="REQUIRED"),
                SchemaField(name="message", field_type="STRING")
            ]
        )
    • 1
    • 1
  • n

    Neeraj Sharma

    11/05/2020, 5:54 AM
    Hello, I want to create dask cluster on Azure. Is their any API's available in Prefect which can create Dask Cluster in Azure .
    n
    j
    • 3
    • 8
  • j

    Joël Luijmes

    11/05/2020, 7:14 AM
    Hey, I’m trying to use the
    LOOP
    and write the results on each iteration as documented here. However, it doesn’t seem to work. It only writes the last iteration.
    @task()
    def log_output(result):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>(result)
    
    
    @task(result=LocalResult(dir='./results', location='test-{task_loop_count}.prefect'))
    def loop_test():
        loop_payload = prefect.context.get("task_loop_result", {})
    
        n = loop_payload.get("n", 1)
        print(n)
    
        if n > 5:
            return n
    
        raise LOOP(f'Iteration {n}', result=dict(n=n+1))
    
    
    with Flow("Postgres -> BigQuery") as flow:
        x = loop_test()
        log_output(x)
    See output logging and diagnostics in thread
    👀 1
    n
    m
    • 3
    • 4
  • a

    ale

    11/05/2020, 8:58 AM
    Hi folks, does
    @task
    support both
    name
    and
    task_run_name
    at the same time? It seems that
    name
    takes precedence over
    task_run_name
    , am I right?
    e
    j
    n
    • 4
    • 24
  • e

    emre

    11/05/2020, 9:47 AM
    Hey everyone, I’ve got some
    secrets
    that I need to use in most of my tasks in a flow. I hate the visual of an upstream dependency to the same secret task from almost all my tasks. Is there a way for custom secrets to behave like, say
    AWS_CREDENTIALS
    , as per: https://docs.prefect.io/core/concepts/secrets.html#default-secrets Or a prefect backend compatible way of having my secrets reside in
    prefect.context
    , or a similar context like construct?
    j
    n
    +2
    • 5
    • 13
  • r

    Robert Sokolowski

    11/05/2020, 3:39 PM
    Hey folks, I'm thinking about using Prefect at a medium-sized company. I'm looking for guidelines on how to set up a flow that spans multiple teams / code repos. For example, node A in the dag may be owned by team A, runs in container A, outputs a large dataset to be consumed by the node B, owned by team B, in container B. Does anyone know of best practices for such a situation?
    r
    j
    • 3
    • 6
  • s

    simone

    11/05/2020, 3:44 PM
    Hi I am running prefect on an on premises HPC with no docker (can be installed) and my flow in not really serialisable (I need to figure out which step). i was just wondering what is the best SOP for tracking the source of issues that can brake the flow for debugging. In the current setup the output to the terminal shows only the log message for start and end and none of the logs i put in the code. When i test the flows locally all the logs output are visible at the terminal. Thanks a lot!
    n
    • 2
    • 2
  • h

    Hui Zheng

    11/05/2020, 6:06 PM
    Hello, community, our prefect flow run gets the error of no heartbeat from time to time. It seems retrying, but actually nothing happen, the normal 20 mins run gets stuck there for hours. Any idea how to fix that?
    n
    d
    k
    • 4
    • 12
Powered by Linen
Title
h

Hui Zheng

11/05/2020, 6:06 PM
Hello, community, our prefect flow run gets the error of no heartbeat from time to time. It seems retrying, but actually nothing happen, the normal 20 mins run gets stuck there for hours. Any idea how to fix that?
n

nicholas

11/05/2020, 6:08 PM
Hi @Hui Zheng - task runs usually fail to send heartbeats when they're in resource-starved environments or when the process has been exited prematurely (something like calling
sys.exit()
) - either of those sound like possible scenarios for you?
h

Hui Zheng

11/06/2020, 7:08 PM
@nicholas thank you. I think it could be resource-starved environments. I am fine with runs fail to send heartbeats. My concern is that the retrying seems not working.
How could I ensure that the retrying works properly? How do I fix the re-trying?
@Dylan @Kyle Moon-Wright Thank you again for chatting today. Please see below the flow run that had this unresponsive stuck-in-retrying issue https://cloud.prefect.io/semios/flow-run/ee39d492-3c56-4f69-8721-b9eb9f32dd21?logId=
d

Dylan

11/11/2020, 6:33 PM
Thank you Hui!
h

Hui Zheng

11/16/2020, 10:04 PM
@Dylan @Kyle Moon-Wright Hello, hope all is well.. this This
failed-retry-when-no-heartbeat
issue kept coming up. I would like to work with you to get to the bottom of it.
<https://cloud.prefect.io/semios/flow-run/72082fcb-213b-4cc2-8d6e-d2a0b345cffe>
<https://cloud.prefect.io/semios/flow-run/8bf9505f-5c0f-47a1-8cc4-cd9393d17bc5>
<https://cloud.prefect.io/semios/flow-run/c85d6ad6-0074-4cdb-9a12-5b02ab100a11>
k

Kyle Moon-Wright

11/16/2020, 10:14 PM
Hey @Hui Zheng, I see that this flow is registered with Core version
0.13.6
. Would it be possible for you to upgrade to the current version (
0.13.15
) and see if this behavior still occurs?
Just want to be sure this issue hasn’t been addressed with previous versions. From there, we can definitely dig in with potential culprits I have in mind.
h

Hui Zheng

11/16/2020, 10:15 PM
for sure, I could do that.
we will have another production release next week, where I could upgrade the prefect flow version
0.13.15
. if the issue persists, I will let you know
k

Kyle Moon-Wright

11/16/2020, 10:16 PM
ok great, please do 👍
also if possible, please upgrade your Agent while you’re there! (to cross that culprit off the list 😉 )
View count: 1