https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    John Marx

    08/30/2021, 11:43 PM
    I would be interested in knowing about a feature analysis of Prefect vs Airflow.
    k
    w
    • 3
    • 7
  • j

    John Marx

    08/30/2021, 11:43 PM
    Also if there are any migration tools.
  • y

    YD

    08/31/2021, 12:52 AM
    How to clean pip instal prefect ? I had python 3.7 but removed it and installed python 3.6.8 due to some other compatibility issues. When I run
    prefect agent local start
    it still tries to find the python3.7 I pip uninstalled prefect and install it several times, but it does not help. how can i clean install it so it will run using the python3.6 ?
    z
    • 2
    • 3
  • p

    Pinakpani Mukherjee

    08/31/2021, 1:44 AM
    Can anybody help me with a graphQL problem. I need to get the schedule of all flows, and as some were manually set, using graphQL seems like the best solution. Can anybody help me or point to a source?
    k
    • 2
    • 2
  • a

    Aiden Price

    08/31/2021, 3:17 AM
    Hi everyone! I have a GraphQL question as well. I'm trying to get and set the default parameters for a flow_group, can anyone help me?
    k
    b
    • 3
    • 20
  • w

    Wilson Bilkovich

    08/31/2021, 3:36 AM
    I can simply use
    Local
    storage when I'm building my own docker image that contains the flow. Do I understand that correctly?
    k
    • 2
    • 1
  • b

    Bastian Röhrig

    08/31/2021, 6:55 AM
    Hi everyone, I found a weird behaviour when I use a timeout with a LocalDaskExecutor and manually raise a Success. The task will always fail with the timeout error message, even if it takes less time than the specified timeout. Minimal example:
    from prefect import Flow, task
    from prefect.executors import LocalDaskExecutor
    
    from prefect.engine.signals import SUCCESS
    
    
    @task(timeout=60)
    def succeed():
        raise SUCCESS()
    
    
    with Flow("timeout-test") as flow:
        succeed()
    
    flow.executor = LocalDaskExecutor()
    flow.register(project_name="tutorial")
    Is this the intended behaviour? Can anyone reproduce it?
    z
    m
    • 3
    • 4
  • t

    Tomoyuki NAKAMURA

    08/31/2021, 9:08 AM
    Hi, We are thinking of migrating from Airflow to Prefect Cloud and would like to use Datadog to monitor the number of SUCCES and FAILURES of tasks, task execution time etc in Prefect Cloud. Here are the metrics we can get from Airflow’s Datadog integration: https://docs.datadoghq.com/integrations/airflow/?tab=host Is it possible to get the similar metrics in Prefect Cloud and integrate it with Datadog?
    k
    • 2
    • 4
  • s

    Seonghwan Hong

    08/31/2021, 10:06 AM
    I set the flow schedule 5minutely by cron string. However, the upcoming flow list is not visible in the UI. And It is not running. Prefect doesn't actually run the flow at the time.
    k
    • 2
    • 14
  • s

    Seonghwan Hong

    08/31/2021, 10:06 AM
  • s

    Simon Gasse

    08/31/2021, 12:59 PM
    Hi! Can I somehow ignore skipped tasks of a
    map
    in downstream tasks? Let's say a mapped task skips for some inputs. In this case, I would still like the downstream task that runs on the return values of the mapped task to continue with the results of all successful task runs. An example is in the thread 🙂
    k
    • 2
    • 11
  • w

    Wilson Bilkovich

    08/31/2021, 1:19 PM
    I'd like to use a container that defines its own `ENTRYPOINT`as the image that prefect/dask workers and schedulers run, but when I try it, Prefect seems to expect
    /bin/sh
    as an entrypoint. Is there any way to override that?
    z
    • 2
    • 24
  • p

    Paulo Maia

    08/31/2021, 2:45 PM
    Hey everyone, I'm trying to run two prefect flows with different schedules within the same file. Is this possible? I saw the documentation and I see there can be different clocks for the same schedule, but didnt see anything about my use case. Example: • Machine Learning application where you want to train a model every week, generate predictions every day and run a reporting script every 3 days. I could separate this into three prefect files/flows but I don't want to I'm using the open source version, offline
    k
    • 2
    • 3
  • n

    Nicholas Chammas

    08/31/2021, 3:56 PM
    I think Prefect currently lacks an organized way to quiesce agents in preparation for an upgrade. i.e. I want a way to tell agents “finish what you’re doing but don’t accept any new work” so I can shut them down and replace them in an organized manner without interrupting any flows. Should I file an issue about this, or is there an accepted workflow for accomplishing this?
    k
    • 2
    • 4
  • d

    Didier Marin

    08/31/2021, 4:06 PM
    Hey everyone, We are trying to add some fine grained authorization to our Prefect server (such as which user can perform which action and which flow). I understand the Cloud version as roles and custom roles but I don't think this would be detailed enough for our needs (for example the read permission on flows can be granted, but I don't think it can be limited to a particular flow group id for example). It seems like the easiest way to tackle this is at the GraphQL level, using resolvers to customize exactly what request can be performed by who. By any chance, does anyone has some experience doing this ?
    k
    • 2
    • 4
  • m

    Mark McDonald

    08/31/2021, 5:07 PM
    Hi - we have a system in place wherein we collect all state change events via webhook triggered by automations. We're collecting this data to identify and alert us if a flow run's duration is anomalous. It would be really helpful if we had the flow's parameters in the webhook payload, because these are the best indicator of how long a flow should run. It's not currently part of the default payload. Should I open an issue in the github repo?
    m
    • 2
    • 5
  • m

    Mark McDonald

    08/31/2021, 7:48 PM
    Hi again - we're having trouble logging into Prefect Cloud. We've been seeing this screen for a while. I've confirmed the issue with multiple people at my company.
    👀 1
    n
    • 2
    • 4
  • t

    Tony Yun

    08/31/2021, 8:35 PM
    hi, I’m having some trouble setting up the Docker container correctly for Flow to register. Could someone take a look? The Dockerfile setup correctly and working on local testing (with flow.run()); • I’m using Docker Storage like this, which the dockerfile is working correctly to call my modules:
    storage = Docker(
        dockerfile="./Dockerfile"
    )
    But the register always report the module not exists.
    ModuleNotFoundError: No module named 'criteo'
    k
    • 2
    • 58
  • w

    Wilson Bilkovich

    08/31/2021, 8:41 PM
    I'm struggling to get local+docker 'as script' flows to work, and I can't find a working value for 'script path'
    k
    • 2
    • 49
  • w

    Wilson Bilkovich

    08/31/2021, 11:36 PM
    Can anyone explain the correspondence of KubernetesRun and pods that get spun up. Is KubeRun what the Agent runs to create a scheduler pod? Does it have anything to do with the worker environment?
    k
    • 2
    • 1
  • h

    Hamza Ahmed

    09/01/2021, 12:37 AM
    Running into issues after upgrading Prefect (flows+kubernetes deployment) from 0.14.6 to 0.15.4 and I am running into an issue where a flow gets scheduled but errors out after a couple hours with an error
    A Lazarus process attempted to reschedule this run 3 times without success. Marking as failed.
    The only other errors are in the prefect towel logs [see comments] Any clue as to what could be wrong?
    k
    m
    • 3
    • 32
  • a

    Aric Huang

    09/01/2021, 12:41 AM
    Hi, I'm trying to run some flows that use
    ProcessPoolExecutor
    and am having trouble getting print statements and stack traces from subprocesses to show up in the Prefect Cloud UI. When running the flow locally the logs show up in stdout/stderr, and having the task defined with
    @task(log_stdout)
    makes print statements in subprocesses show up with a
    └── <time> | INFO
    prefix (which I thought means it's getting picked up by the Prefect logger). However the subprocess print statements don't show up at all in the Prefect Cloud UI logs. Stack traces from subprocesses that crash also print locally but don't show up in the UI, which only shows a stack trace for the main process with
    concurrent.futures.process.BrokenProcessPool
    . Is there a way to have a flow capture all subprocess output so it's visible in the UI?
    k
    z
    • 3
    • 21
  • j

    Jason Kim

    09/01/2021, 2:21 AM
    Is it possible to have a local agent have multiple conda environments? Should I use 'prefect agent local start' for each of my conda envs and then add a label for each agent?
    k
    • 2
    • 4
  • c

    CA Lee

    09/01/2021, 4:27 AM
    Hello, has anyone run into the below errors while using DbtShellTask?
    k
    • 2
    • 7
  • h

    haf

    09/01/2021, 8:42 AM
    Is Python 3.9 supported?
    z
    • 2
    • 1
  • h

    haf

    09/01/2021, 8:45 AM
    And do you have a sample with using python modules with docker and prefect?
    k
    • 2
    • 5
  • m

    Michael

    09/01/2021, 9:34 AM
    Hey there! I’m trying to implement some sort of rudimentary Sensor (like given here https://github.com/PrefectHQ/prefect/discussions/4156), but I would really like to implement a timeout that spans retries. Or in other words, enforce max_retries even though I am raising the RETRY signal manually (which ignores max_retries). Any ideas how to tackle this? I could store a counter in the context and increment a counter inside each of my task’s run() calls I suppose
    r
    z
    k
    • 4
    • 4
  • k

    Konstantin

    09/01/2021, 11:08 AM
    Hey! I have registered my flow on my prefect server (core). The start of the task failed with an error. in flow I applied parameters and map. error State Message: Error during execution of task: ValueError("Could not infer an active Flow context while creating edge to <Task: request_huntflow['items']>. This often means you called a task outside a
    with Flow(...)
    block. If you're trying to run this task outside of a Flow context, you need to call `GetItem(...).run(...)`")
    ✅ 1
    k
    • 2
    • 4
  • k

    Konstantin

    09/01/2021, 12:08 PM
    Hey there! how to add a token file to the storage in the prefect. for example, the error is this:
    Task 'request_web [0]': Calling task.run () method ...
    15:05:12
    INFO
    CloudTaskRunner
    [Errno 2] No such file or directory: './Tokens/access_token.xml'
    z
    • 2
    • 3
  • e

    Evan Brown

    09/01/2021, 2:23 PM
    Hey there! I am having trouble logging standard out with shelltasks like you would with the task decorator. Does anyone know how you would do this?
    k
    • 2
    • 14
Powered by Linen
Title
e

Evan Brown

09/01/2021, 2:23 PM
Hey there! I am having trouble logging standard out with shelltasks like you would with the task decorator. Does anyone know how you would do this?
k

Kevin Kho

09/01/2021, 2:24 PM
Hey @Evan Brown, could you show me how you set up the shell task?
e

Evan Brown

09/01/2021, 2:36 PM
This is what I currently have.
And in my test.py I just have some print statements.
k

Kevin Kho

09/01/2021, 2:41 PM
Can you try adding
log_stdout=True
to
ShellTask
? It inherits from Task so it can take all of the kwargs.
👀 1
e

Evan Brown

09/01/2021, 2:55 PM
It didn't work 🙁. Any other ideas?
k

Kevin Kho

09/01/2021, 2:57 PM
Do you get no logs from this whatsoever?
Add
stream_output=True
like this:
from os import path
from prefect import Flow
from prefect.tasks.shell import ShellTask

shelltask = ShellTask(
    name="shell_task",
    log_stderr=True,
    return_all=True,
    stream_output=True
)

with Flow("shelltest") as flow:
    shelltask(command="echo 'test'")

flow.run()
👀 1
e

Evan Brown

09/01/2021, 3:15 PM
That works!
k

Kevin Kho

09/01/2021, 3:15 PM
Awesome!
e

Evan Brown

09/01/2021, 3:15 PM
Do you know if the output can be viewed in prefect cloud?
Ideally I would like to see it in the logs if possible
k

Kevin Kho

09/01/2021, 3:16 PM
I think it should be. I’ve done it before.
Just tested and yes I see it
🙌 1
e

Evan Brown

09/01/2021, 3:18 PM
Yes I did as well. Thanks so much for the help!
View count: 1