https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Sean Leakehe

    12/13/2021, 10:33 PM
    Hey all, I’m running into an issue with mapped tasks not being executed in parallel. In the example below, if we set
    func4
    to have an upstream dependency on
    func1
    , it executes at the same time as
    func2
    , as expected. However, if we make
    func4
    a mapped task with the same upstream dependency on
    func1
    , it doesn’t actually execute until after
    func3
    has finished. Is there a way to get mapped tasks to run in parallel?
    a
    c
    k
    • 4
    • 9
  • s

    Sean Leakehe

    12/13/2021, 10:33 PM
    Untitled.py
  • j

    John Muehlhausen

    12/14/2021, 12:54 AM
    How do I ensure that a local agent will not attempt to run more than one flow at a time? `
    prefect agent local start {...}
    k
    t
    +2
    • 5
    • 50
  • t

    Tilak Maddy

    12/14/2021, 10:30 AM
    How do I set the state_handlers for a task if I am using a custom class that inherits from Task and I have my own init and run method ?
    a
    k
    • 3
    • 2
  • j

    Jeremiah Lethoba

    12/14/2021, 12:23 PM
    Hello people, anybody know how to display progress bars from a running task on the prefect URL UI in logs
    a
    • 2
    • 2
  • t

    Tom Shaffner

    12/14/2021, 3:24 PM
    If tasks are mapped in the complex way like https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines, does something else need to be done to get them to cache?
    k
    • 2
    • 4
  • p

    Pedro Machado

    12/14/2021, 3:42 PM
    Hi there. I have a module that users
    loguru
    https://github.com/Delgan/loguru It's used inside of a task. When I run the flow locally, I see the loguru output but when it runs remotely (ECS), I don't see the logs in Prefect Cloud. How could I get it to write to the Prefect logs? Also, how should I set up my ECS flow to get it to write the container logs to cloudwatch? Thanks!
    k
    • 2
    • 6
  • l

    Leanna Morinishi

    12/14/2021, 5:39 PM
    Hi, I’m new to Prefect. I’m trying to get a pandas dataframe out of a mapped task. Here
    query_list
    is a
    list of str
    , each of which is a valid query. Let’s say there’s 3 queries and each query gives me back 10 rows. This task works fine.
    data = execute_test_query.map(query=query_list)
    Now I want to transform a concatenated dataframe in its entirety.
    @task
    def n_rows(df):
        rows = df.shape[0]
        print(f"There are {rows} rows!")
        return rows
    I was expecting
    data_2 = n_rows.map(flatten(data))
    to give
    "There are 30 rows!"
    , but I get key errors. Any idea what I need to do to flatten
    data
    ?
    j
    k
    • 3
    • 5
  • j

    Jason Motley

    12/14/2021, 6:27 PM
    Does anyone have a dummy task available that would check whether a task above has run?
    a
    k
    • 3
    • 60
  • r

    Royzac

    12/14/2021, 7:16 PM
    Does Prefect support Vyper?
    k
    t
    • 3
    • 9
  • d

    David Yang

    12/14/2021, 8:23 PM
    Can I expose flows through API using prefect cloud? A use case is that some ETLs are running in SSIS sever which is windows environment, and some programs running in other dedicated applications servers. I'm imaging that I need to install prefect agent on each server, and a dedicated flow to run programs installed on the server and then use one flow to orchestrate them together. each dedicated flow works as a task in this "parent" flow. and in each task, I could call API to trigger these flows in various servers. Does that make sense? Any better solutions? I'm newbie here.
    a
    k
    • 3
    • 5
  • k

    Kevin Kho

    12/14/2021, 8:36 PM
    message has been deleted
  • a

    Amanda Wee

    12/14/2021, 8:51 PM
    @Chris White have you seen this "fact check" of your "Why Not Aurflow?" article? https://www.linkedin.com/pulse/airflow-prefect-can-we-fact-check-prefects-claims-muthuvel Would you happen to have another article that addresses the objections listed? Thanks!
    c
    • 2
    • 1
  • p

    Pedro Machado

    12/14/2021, 10:22 PM
    Hi. I have a flow that does the following: • query snowflake for a list of URLs (there may up to 100k URLs) • create a list of lists (create chunks of 120 URLs in each sublist) • a mapped task processes each chuck/batch of URLs and calls an API for each URL. An
    S3Result
    with a
    target
    is configured for this task so the output of the API lands in S3 in json format. • A pipe in Snowflake will detect and load these files My question: is relying on the Prefect results an acceptable way to save data to S3 or should I have an explicit task that will handle writing to S3? In this approach (S3Results), will the data stay in memory until the end of the process or will the memory be released when the data is written to S3?
    a
    • 2
    • 1
  • m

    Matt Denno

    12/14/2021, 11:40 PM
    Hi All, I have what I hope is a quick question. We are running prefect server and seem to have an issue with the Zombie Killer for a long running process. I want to set the heartbeat_mode to "thread". Looking here: https://docs.prefect.io/orchestration/concepts/services.html#heartbeat-configuration I see the following environment variable. Is this appropriate when running prefect server? Or just for Cloud?
    PREFECT__CLOUD__HEARTBEAT_MODE
    Or is there an equivalent env for server, like this:
    PREFECT__SERVER__HEARTBEAT_MODE
    Thanks, Matt
    a
    • 2
    • 4
  • j

    Jacob Blanco

    12/15/2021, 5:48 AM
    Just throwing this out there, if any artist on here wants to design and sell stickers based on Flow task run names, I’d buy a bunch off you.
    :marvin: 3
    😆 4
  • c

    Chun Shen Wong

    12/15/2021, 7:48 AM
    I would like to ask if anyone been thru this?
    a
    • 2
    • 2
  • c

    Chun Shen Wong

    12/15/2021, 7:49 AM
  • h

    haf

    12/15/2021, 8:59 AM
    https://prefect-community.slack.com/archives/CL09KU1K7/p1638014722400800 any ideas?
  • t

    Thomas Furmston

    12/15/2021, 10:39 AM
    Hi
  • t

    Thomas Furmston

    12/15/2021, 10:39 AM
    I am wondering if someone can help me. I was interested in running some distributed model training on kubernetes using Horovod. I was wondering how I would go about incorporating that into a prefect flow. I am assuming I would use something like RunNamespacedJob (i.e., https://docs.prefect.io/api/latest/tasks/kubernetes.html#runnamespacedjob) with the kubernetes manifest for an
    MPIJob
    (from the https://github.com/kubeflow/mpi-operator library) given in the body of the task. Is that roughly correct?
    a
    • 2
    • 21
  • t

    Thomas Furmston

    12/15/2021, 10:39 AM
    Would be interested to know if anyone has tried something like this before?
  • j

    Jelle Vegter

    12/15/2021, 1:28 PM
    Hi all, I might be missing it but, is there a way to add overwrite=True to the Prefect Azure BlobStorageUpload method?
    a
    k
    • 3
    • 7
  • t

    Tom Shaffner

    12/15/2021, 1:56 PM
    Is there a way to get the logs to show the line in my code that caused an issue? In several cases I get Prefect-specific errors and the stack trace is through the Prefect file chain, but doesn't indicate which line in my code I could tweak/remove to avoid an issue. There a way to do this? Also, the logs to the screen seem to exclude more complex outputs at times, presumably due to formatting, where's the default save location of them for review after the fact without these constraints?
    a
    k
    • 3
    • 5
  • j

    Jason Motley

    12/15/2021, 3:25 PM
    Does prefect default to a certain time zone? Local flow runs show my local time but will that hold in production? This would be for datetime functions
    k
    • 2
    • 13
  • t

    Tilak Maddy

    12/15/2021, 3:29 PM
    I am not able to access my prefect cloud (was working fine just 3 hours ago)
    k
    v
    • 3
    • 13
  • k

    Kevin Kho

    12/15/2021, 3:47 PM
    message has been deleted
  • e

    Enda Peng

    12/15/2021, 4:13 PM
    Hi, If I kill a agent in docker container, is there a way to enable graceful shutdown? I mean if I have some long lasting job running, I’d like to give it a timeout window instead of killing it immediately.
    k
    • 2
    • 4
  • a

    Adam Roderick

    12/15/2021, 4:45 PM
    Hi, does anyone have recommendations or best practices that they follow around dependencies? My specific example has to do with extraction/load vs transformation steps. It is convenient to put it all in one Flow so that the transformations start after the extraction/loads are complete. But our Flows have become quite monolithic, and we want to be able to test various sections (i.e. extract/load) separately from other steps (i.e. transformations). Any recommendations here?
    a
    • 2
    • 1
  • t

    Tao Bian

    12/15/2021, 4:56 PM
    Hi, I am trying to execute flow run though graphql query from google cloud functions, how can I set up client so that the cloud functions know what is my client. How can I reach my prefect cloud externally from anywhere?
    from prefect.client.client import Client
    client = Client()
    client.graphql(query)
    a
    k
    • 3
    • 3
Powered by Linen
Title
t

Tao Bian

12/15/2021, 4:56 PM
Hi, I am trying to execute flow run though graphql query from google cloud functions, how can I set up client so that the cloud functions know what is my client. How can I reach my prefect cloud externally from anywhere?
from prefect.client.client import Client
client = Client()
client.graphql(query)
a

Anna Geller

12/15/2021, 4:59 PM
You could create a flow run directly using requests and pass the API key into Bearer token:
import requests

query = """
 mutation {
  create_flow_run(input: { version_group_id: "fb919cc4-7f74-4ed7-9f3c-71bdd9be69e8" }) {
    id
  }
}
"""

url = "<https://api.prefect.io>"
response = <http://requests.post|requests.post>(
    url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)
❤️ 1
k

Kevin Kho

12/15/2021, 4:59 PM
Hi @Tao Bian, if you did
prefect auth login --key API_KEY
, then the Client will pull that API key. You can also pass it into the
Client()
. You can use
client.create_flow_run()
instead of the
client.graphql()
. You can also use
create_flow_run
in our task library which is much more flexible. Just invoke it by calling
.run()
create_flow_run.run(flow_name="name", project_name="project")
❤️ 1
t

Tao Bian

12/15/2021, 6:33 PM
Thank you, that works!
View count: 3