https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Raviraja Ganta

    05/05/2022, 11:04 AM
    While registering the flow it is not storing the configs file which is present in a different directory. But some modules needs them. How to solve this?
    main-project
      | 
      src
         |
         | __init__.py
         | module1.py
         | module2.py
      |
      configs
         | 
         | config.yaml
      |
      flows
         |
         | sample_flow.py
    a
    • 2
    • 12
  • r

    Rasmus Lindqvist

    05/05/2022, 1:23 PM
    Hi all, we are using
    prefect server
    for local development and
    prefect cloud
    for production which is quite neat in terms of speed. When doing this we set the --external-postgres flag in order to use our development database, which works great! However we are using Hasura for our other backend service which creates some git-pain as Prefect overwrites our local Hasura metadata files. Have anyone run into the same problem ?
    k
    • 2
    • 4
  • i

    Ievgenii Martynenko

    05/05/2022, 1:43 PM
    Hi again, theoretical question. If most use Docker/K8s to run dataflow on (each dataflow has own image, or each flow executes common image), why do we need Agent at all?
    k
    a
    • 3
    • 3
  • a

    Adi Gandra

    05/05/2022, 2:24 PM
    Hey, i’m running a prefect agent on EKS / Fargate. It’s been working great for a while now, spinning up a new pod, running and deleting it. I think for some reason something got changed on the AWS account and the spinning up of pods failed. However, it kept trying to retry and spin up a new pod, and the old pod didn’t get cleaned up. Now whenever I manually try to delete the pod (kubectl delete pods) it just recreates instantly. So now i have like 6 pods that are stuck in Pending status that I can’t delete. Any thoughts on how to clean everything up?
    k
    e
    • 3
    • 18
  • e

    Edmondo Porcu

    05/05/2022, 3:57 PM
    How would you describe Prefect in a compact way that highlights its features? It's not a workflow manager in the sense that it is not Camunda, it is not Temporal, it is not AWS Step. It's not Conductor by Netflix
    a
    • 2
    • 5
  • g

    Geoffrey Keating

    05/05/2022, 4:08 PM
    While
    prefect_test_harness
    provides an environment for flows to be tested, how might I go about testing a task in Prefect 2.0? It seems like calling
    .run()
    on tasks has not made it over from 1.0 yet.
    a
    n
    • 3
    • 6
  • e

    Edmondo Porcu

    05/05/2022, 5:46 PM
    I am struggling a little with Prefect programming patterns. Take this example from this doc: https://docs.prefect.io/core/advanced_tutorials/task-looping.html
    with Flow("mapped-fibonacci") as mapped_flow:
        ms = Parameter("ms")
        fib_nums = compute_large_fibonacci.map(ms)
    
    flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
    nice but what about non array parameters? Let's say I have two parameters, one that's an array, and the other which is a constant.
    with Flow("mapped-fibonacci") as mapped_flow:
        ms = Parameter("ms")
        ms2 = Parameter("ms2")
        fib_nums = compute_large_fibonacci.map(ms, ms2)
    
    flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
    k
    • 2
    • 17
  • a

    André Dias

    05/05/2022, 5:47 PM
    Hi guys ! I’m trying to setup a
    Prefect Agent
    and a
    prefect server
    on my local computer, everything residing inside of
    containers
    . My
    Prefect Server
    `container(s)`is based on the
    docker-compose
    file retrieved from
    prefect server config > server-docker-compose.yaml
    . When spinning up a container for an Agent and running
    prefect agent local start --api <http://host.docker.internal:4200>
    , it says that I must be authenticated to communicate with the containerized server living in my local machine. My first question is: Is
    authentication
    mandatory in this kind of situation , for local run ? When I authenticate by creating a new tenant using
    prefect server create-tenant -n default
    outside the Agent container, I’m able to spin up the
    Agent Container
    and run everything. The problem is that I wanted to run the
    create-tenant
    inside the container too, but it’s not possible as I’m getting connection refused between containers.
    k
    • 2
    • 12
  • j

    Jonathan Mathews

    05/05/2022, 5:57 PM
    Hi, what’s the best way to switch certain flow variables depending on execution environment? e.g. a database schema of dev vs prod? (I am using set_run_config method at the moment to switch execution between local and ECS)
    k
    n
    • 3
    • 3
  • p

    Patrick Tan

    05/05/2022, 6:09 PM
    How so I terminate an unknown local agent?
    k
    a
    • 3
    • 4
  • j

    Jonathan Mathews

    05/05/2022, 9:14 PM
    Hi! Regarding templating, I copied and pasted the code here into a new flow and it still seems to name the task “compute”, rather than “hello” Am I doing something wrong?
    k
    • 2
    • 2
  • l

    Leon Kozlowski

    05/05/2022, 9:32 PM
    Hi - I just happened to inspect my agent pod logs on k8s - I’m seeing a ton of errors with the following traceback:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py", line 413, in heartbeat
        self.manage_jobs()
      File "/usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py", line 193, in manage_jobs
        f"Job {job.name!r} is for flow run {flow_run_id!r} "
    AttributeError: 'V1Job' object has no attribute 'name'
    I found a thread discussing this - but I wanted to know if there was another way to alleviate this other than upgrading to prefect 1.0+ - If that is the only solution, are there any gotchas/things I should look out for when upgrading? And will flows as old as 0.14.X still run as expected?
    k
    • 2
    • 6
  • s

    Seth Just

    05/05/2022, 10:09 PM
    We are running into an issue with an ECS agent where the logs from individual flow tasks aren't appearing in Prefect Cloud. Any advice for how to troubleshoot this issue? We're at a total loss.
    k
    a
    • 3
    • 40
  • l

    Leon Kozlowski

    05/06/2022, 12:18 AM
    Hi all - I have some odd behavior happening for a flow deployed to prefect cloud This might be hard to explain, but I will put most of the details in a thread
    a
    • 2
    • 19
  • a

    Alex Shea

    05/06/2022, 12:36 AM
    I just started at a company using perfect, but it's a version from before service accounts. I want to bring it up to match the docs and guides, but I am failing to understand the need for more than one service account with agent runs. What is the benefit to having individual service accounts for each agent instead of one service account with one key per agent?
    a
    k
    • 3
    • 7
  • a

    Artem Vysotsky

    05/06/2022, 1:02 AM
    Hi all, can you tell me which version of hasura prefect cloud is running on? 2.1.1? I’m asking because I’d like to put hasura graphql API behind my own hasura instance as remote schema and experiencing schema missmatches on 2.6.1
    k
    • 2
    • 1
  • g

    Gustavo Frederico

    05/06/2022, 1:39 AM
    Hi, the page about v2 mentions “Event-Based Workflows” and “Trigger flows from external events…“. I read the API Reference but I couldn’t find how to do it. My use case is precisely this, but I don’t know if Prefect supports it. I need external systems to send events to persisted workflows and for these to continue after that. Can someone please point to the API Reference?
    k
    • 2
    • 1
  • g

    Gustavo Frederico

    05/06/2022, 1:55 AM
    Airflow, for instance, have Sensors. Sensors wait for something external to happen.
    k
    a
    • 3
    • 4
  • e

    Eddie Atkinson

    05/06/2022, 2:48 AM
    Something I am struggling to get my head around is security in regards to
    FargateCluster
    . It seems like the cluster is being assigned a public IP address. Ideally I wouldn’t want that to be the case as I don’t want people snooping on my cluster / submitting jobs. However, when I pass
    "fargate_private_ip": True
    to
    cluster_kwargs
    my cluster fails to start with the error:
    Cluster failed to start: Timed out trying to connect to <tcp://10.0.1.111:8786> after 30 s
    That makes sense. Someone somewhere failed to connect to a local IP address, presumably from outside the subnet. What I don’t understand is how I can prevent people from arbitrarily accessing my cluster from the internet whilst allowing all the ‘right’ traffic through
    k
    a
    • 3
    • 15
  • j

    Jeff Kehler

    05/06/2022, 6:40 AM
    I have a question with the
    IntervalClock
    scheduler. If i were to pass
    interval=timedelta(hours=1)
    to schedule a run for every hour, would this be scheduled to occur at the beginning of every hour? Or would it be relative to when I register the Flow? eg if I register the flow at 1:13pm then the
    IntervalClock
    would trigger at 13 minutes past the hour every hour? Or would it just kick off at the first minute of every hour?
    a
    • 2
    • 2
  • t

    Thomas Huijskens

    05/06/2022, 7:57 AM
    I'm curious how people tend to document their Prefect flows. Do people primarily fall back on Python module-level documentation (and build a static website using something like Sphinx), or is there a way to display documentation of a flow in the Prefect UI?
    e
    i
    a
    • 4
    • 6
  • i

    Ievgenii Martynenko

    05/06/2022, 8:16 AM
    Investigating S3 storage a bit deeper. If we register Flow with S3 and 'local_script_path' set, Prefect copies dataflow to S3 named with specific timestamp; if we register new version, then new file is created. In this way we achieve dataflow versioning. If we use only 'key' option of S3 storage, then we can keep our own names of files, but have to copy dataflow manually. Registering new version of dataflow still increases version in UI, but in fact you have the same dataflow file on S3. So versioning doesn't actually work, right?
    a
    • 2
    • 2
  • j

    Jonathan Mathews

    05/06/2022, 8:43 AM
    Hi, I’ve switched to the git storage class from gitlab and I’m getting an error when using the attached setup. The error is: {‘repo’: [‘Field may not be null.’]} According to the docs, repo is optional when git_clone_url_secret_name is set
    a
    • 2
    • 8
  • i

    Ievgenii Martynenko

    05/06/2022, 12:20 PM
    Hi, Is there any specific usage when working with dict() returned by task? below part is failing with message that 'get_connections' task class has no method get().
    token = EnvVarSecret("TOKEN")
    connections = get_connections(token=token)  # this is a task that returns dict(name, connection_string)
    start_task_result = start_task(connections.get('some_key'))  # i'd expect to get value from dict, not attribute error
    a
    • 2
    • 10
  • n

    Nikhil Joseph

    05/06/2022, 12:54 PM
    Is anyone else having issues logging onto prefect cloud? I keep getting “unable to log in”
    a
    • 2
    • 8
  • j

    Jovan Sakovic

    05/06/2022, 1:06 PM
    Hi all 👋 😛refect-duck: Working on having our Flows automatically re-registered when there are changes with `1.1.0`'s
    --watch
    flag. However, as we’re spinning it all up with docker-compose, and we have multiple Prefect Projects, so I want to run a few of these
    prefect register
    commands in the container’s entrypoint shell script (code example in 🧵) Is there an easy way to: • either run the
    prefect register
    command in background, so it lets the rest run • or, run
    prefect register
    with specifying the project in the Flow script, so we’d need to run the command only once for all projects
    👋 1
    :marvin-duck: 1
    a
    • 2
    • 9
  • m

    Maria

    05/06/2022, 1:30 PM
    Hi, I’m getting time to time 404 not found nginx/1.21.0 error when I try to access prefect server UI. My setup is in EKS with helm. It comes and goes but I noticed that it happens usually after helm update. Any idea what’s causing this?
    k
    • 2
    • 4
  • h

    Henning Holgersen

    05/06/2022, 2:05 PM
    Does anyone have a good explanation of the permissions the Prefect Slack app needs? Our security-conscious admins were not happy that the app needed the "identify" permission, other apps that posts to slack is OK with only the "incoming-webhook" permission. Any help would be appreciated, as I need to document the decision to approve the Prefect app.
    k
    a
    • 3
    • 5
  • a

    Alvaro Durán Tovar

    05/06/2022, 2:16 PM
    Is it possible to set the final state of the flow to the state of one of the intermediate tasks?
    k
    a
    • 3
    • 4
  • j

    Jai P

    05/06/2022, 2:25 PM
    👋 hi! have a question about a particular case where a flow gets stuck running forever in prefect 2.0, example in thread
    👋 3
    a
    m
    k
    • 4
    • 33
Powered by Linen
Title
j

Jai P

05/06/2022, 2:25 PM
👋 hi! have a question about a particular case where a flow gets stuck running forever in prefect 2.0, example in thread
👋 3
@task
def task_one():
    return 1


@task
def task_two(results):
    logger = get_run_logger()
    <http://logger.info|logger.info>(results)
    return 2


@flow
def test_flow():
    results = {}

    t1_result = task_one()
    results["t1_result"] = t1_result
    t2_result = task_two(results, wait_for=[t1_result])
    results["t2_result"] = t2_result
appears to cause the flow to run indefinitely and i'm unclear why..
a

Anna Geller

05/06/2022, 2:26 PM
to retrieve the results of a task, you would need to use .result()
j

Jai P

05/06/2022, 2:27 PM
i would've expected that it would just print out something like:
{"t1_result": PrefectFuture}
in
task_two
though?
a

Anna Geller

05/06/2022, 2:29 PM
check this section: "Using results from tasks" https://orion-docs.prefect.io/concepts/tasks/
j

Jai P

05/06/2022, 2:31 PM
so that makes sense, but it looks like you need to pass the result future directly in as an arg? if you wrap it in a dictionary should it cause the flow to run indefinitely until
.result()
is called?
since i'm not actually using
t1_result
in
task_two
in the above example, i wouldn't expect it to be stuck. fully agree if i planned to use it, i would have to do something like:
def task_two(results):
    do_something_with(results["t1_result"].result())
raising it up to a more generic level, the case i'm trying to solve for is:
given a list of prefect tasks, execute each one while passing the results of all previously completed tasks to the next
so given that, an example would be:
@task def t1
@task def t2
@task def t3
...

@flow
def my_flow():
    tasks = [t1, t2, t3, t4]

    results = {}
    for task in tasks:
        task_result = task(results)
        results[task.name] = task_result
or, is the expectation that you can't do that, and you have to instead do
task_result = task(**results)
because prior task results (futures) need to be passed in as args/kwargs
m

Michael Adkins

05/06/2022, 2:38 PM
You should be able to pass futures wrapped in Python collections, we’ll autoresolve them still.
k

Kevin Kho

05/06/2022, 2:38 PM
I can replicate that the first code snippet does hang. I think something is wrong?
m

Michael Adkins

05/06/2022, 2:38 PM
Yeah this is definitely a bug, a flow should never hang like this. I’ve also reproduced.
Oh gosh
😓 1
😨 2
You might have made t2 depend on itself by adding its future to the dictionary you passed to it
:thank-you: 1
Since the dictionary is mutable
j

Jai P

05/06/2022, 2:41 PM
YIKES
m

Michael Adkins

05/06/2022, 2:41 PM
I’m not 100% sure but I’ll poke around
j

Jai P

05/06/2022, 2:46 PM
i think that's what is happening:
@flow
def test_flow():
    results = {}

    t1_result = task_one()
    results["t1_result"] = t1_result
    t2_result = task_two(results, wait_for=[t1_result])
    new_results = {**results}
    new_results["t2_result"] = t2_result
successfully runs
m

Michael Adkins

05/06/2022, 2:48 PM
task_two(results.copy(), …
also works yeah
I’m not entirely sure we can guard against this
You could nest a mutable object arbitrarily deeply in there
We can exclude the simple cycle of self-dependency though
j

Jai P

05/06/2022, 2:53 PM
yeah definitely seems either (a) impossible or (b) very difficult without possibly significantly affecting performance to guard against this
unless there's a point where you ask the question "have all futures that need to resolve to run this task resolved?" in which case...you could validate that the future isn't related to the currently in progress task, and if it is then throw an error and exit? would need you to pre-process and pull out all futures arbitrarily deep though i'd guess
m

Michael Adkins

05/06/2022, 2:57 PM
We do pull out futures arbitrarily deep
🔥 2
We can easily exclude the case where a task requires its own future, but we’d have to set the value in your object to
None
or something. I’d probably just throw an exception if we find that.
The issue is that the cycle could be deeper, e.g. task C relies on task B which relies on task A which ries on C.
j

Jai P

05/06/2022, 2:58 PM
ahh yes yes
m

Michael Adkins

05/06/2022, 2:58 PM
Futures don’t store what they depend on right now, so we can’t even detect that if we wanted.
We could add that to futures though it starts getting complicated. All very interesting 🙂
j

Jai P

05/06/2022, 3:01 PM
definitely very interesting! in regards to our problem, i think the
.copy()
is a totally reasonable workaround. it won't affect execution time significantly because we don't expect the list of tasks to be extremely long
m

Michael Adkins

05/06/2022, 3:02 PM
Are you going to pass the results of every upstream task to every downstream task like that?
You might as well switch to the
SequentialTaskRunner
if you’re not using concurrency.
:upvote: 1
j

Jai P

05/06/2022, 3:34 PM
hmm, that's a good point. i don't know if we necessarily can know at execution time which tasks rely on the results of which other upstream tasks, which is why we were going to use the
dict
approach. i guess if the consequence of that of that is that each subsequent task relies on the completion of every task before it, it is essentially sequential
👍 1
View count: 8