https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Ahmed Ezzat

    04/20/2022, 7:06 AM
    Cloud anyone from the devs take a look at this? https://github.com/PrefectHQ/prefect/discussions/5440
    a
    1 reply · 2 participants
  • c

    Chris Reuter

    04/20/2022, 2:10 PM
    Hey all! Come hang with @Kevin Kho and myself on PrefectLive 📺 today at 3p Eastern live on Twitch. He'll be showing off Modin, PySpark, Pandas and Fugue. We'll also probably have an invigorating ⌨️ mechanical keyboard debate and would love for you to join us! For event updates and reminders make sure to join the #events channel.
    ⌨️ 1
    👍 2
    :marvin: 1
    🔥 1
    :upvote: 1
  • j

    Jason

    04/20/2022, 2:55 PM
    I'm getting some weird behavior regarding modules. I can correctly import my flow and the Config class from
    projects
    from both Python and Jupyter Notebook:
    from projects.example.flows.hello_world
    from projects import Config
    But when I run the same import from Prefect, I get the following error:
    pipenv run prefect run -m projects.example.flows.hello_world                    
    No module named 'projects'
    a
    k
    5 replies · 3 participants
  • c

    Constantino Schillebeeckx

    04/20/2022, 3:19 PM
    I have a flow that runs 121 mapped tasks - two of those tasks failed however the overall flow was nor marked as failed
    :discourse: 1
    k
    19 replies · 2 participants
  • c

    Chris Reuter

    04/20/2022, 6:55 PM
    https://prefect-community.slack.com/archives/CL09KU1K7/p1650463806923319 We're going live in 5 minutes on Twitch!
    :marvin: 1
    🔥 1
  • l

    Leigh-Ann Friedel

    04/20/2022, 7:31 PM
    Hello all! Still a Prefect newbie, and have quick question that I wasn't able to find in the docs: Is there a maximum time limit for how long a flow can run? And does the length of a flow run affect cost, or is cost purely based on successful task runs?
    a
    2 replies · 2 participants
  • m

    Mars

    04/20/2022, 8:09 PM
    Hi all, I’m evaluating Prefect as a data pipeline tech upgrade for our company. I’m putting together some pipeline examples to share with the team and I’m wondering what a good dev-test-debug-deploy loop looks like. The example flows I’ve found in the docs and the blog use
    flow.run()
    for local dev, then edit the code and switch to
    flow.register()
    . I don’t like editing the code to switch between local dev and deployment, and it doesn’t feel like your typical Python webapp dev loop. Is there a better way to do this?
    k
    1 reply · 2 participants
  • j

    Jason

    04/20/2022, 8:10 PM
    I'm getting a weird behavior - I can correctly fetch the secrets when running a flow locally - but when it's built on Github Actions - after a step correctly logs into prefect with auth, I get
    ValueError: Local Secret "AWS_ACCOUNT_ID" was not found.
    k
    2 replies · 2 participants
  • k

    Kevin Kho

    04/20/2022, 8:23 PM
    My (sloppy) code from Prefect Live if anyone is interested
    🙌 4
    :twitch: 3
    c
    1 reply · 2 participants
  • j

    Jason

    04/20/2022, 8:30 PM
    I have a current organization of: • Dockerfile • package ◦ init.py with DockerStorage tied to ECR • projects ◦ example ▪︎ flow.py Given this, I keep running into the fact that package can't find the Dockerfile in the root directory. What's the context for the flows that are registered? I tried using relative pathing, (i..e, "../Dockerfile") but I'm not sure what the base directory is. I could try just moving the Dockerfile into package in the same directory as init.py, but I'm trying to understand what I'm doing wrong here.
    k
    8 replies · 2 participants
  • m

    marque

    04/21/2022, 1:35 AM
    Hi all, I'm trying out Prefect Orion (v2.0b3) and was following the orion tutorials. I am having issues updating an existing deployment, which the docs mentioned can be done by
    prefect deployment create
    . Getting a
    prefect.exceptions.ObjectAlreadyExists
    error. Is this expected?
    k
    a
    14 replies · 3 participants
  • a

    Ahmed Ezzat

    04/21/2022, 5:11 AM
    what's the difference between prefect checkpointing and targets? from my understanding targets could be used to save task output and if the task is called again it will use the cached state, checkpointing does the same so what is the key difference?
    k
    4 replies · 2 participants
  • r

    Rasmus Lindqvist

    04/21/2022, 5:48 AM
    Hi everyone, I’ve just started using Prefect at my company. First of all, thank you for a great product. I am trying to deploy Prefect on GCP and find the documentation very good except for GCP deployment, so I’m struggling a bit. I understand that the VertexAgent is a quite new addition. I have so far successfully registered a Vertex agent on Prefect Cloud but I don’t quite manage to deploy the agent to Vertex. Is the intention to start a Custom Vertex job in addition to
    prefect agent vertex start
    ? Or would you recommend deploying Prefect on in another way on GCP instead?
    :discourse: 1
    a
    13 replies · 2 participants
  • b

    Ben Ayers-Glassey

    04/21/2022, 6:04 AM
    Hello all! Is there a way to store arbitrary data on a flow, in such a way that you can get it back from the server (i.e. via
    prefect describe flows ...
    )? What I'm wanting to do is mark each flow with the git repo and commit from which the flow was registered. I'm thinking of situations where a flow run fails, so you want to see the source code; but if you're in an organization with many people making their own flows in different Git repos, you may not know which repo the flow is from, and so you want a way to easily find the source for a given flow. We've been asking people to manually add a link to the Git repo in every flow's "README" section in the Prefect Cloud UI, but as far as I can tell, you can't populate that README section via Python or commandline.
    a
    14 replies · 2 participants
  • m

    Malthe Karbo

    04/21/2022, 6:52 AM
    Hi, when you have a task (ORION) which returns a large output (or is slow to return), it will cause a hiccup with prefect log handlers (see chat for details) I am using latest beta release 2.03b
    a
    m
    6 replies · 3 participants
  • c

    Chris Reuter

    04/21/2022, 12:32 PM
    Anyone thinking about giving Prefect conference talks but you're worried about getting there? We'd be happy to help.
    📣 1
    :marvin: 3
  • k

    Konstantin

    04/21/2022, 1:57 PM
    Hi team Prefect! I don't understand what could be the reason. Help
    51    secret = VaultSecret("/######/######/###/#####/").run()
    52    File "/usr/local/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    53    return run_method(self, *args, **kwargs)
    54    File "/usr/local/lib/python3.9/site-packages/prefect/tasks/secrets/vault_secret.py", line 171, in run
    55    value = self._get_vault_secret(name)
    56    File "/usr/local/lib/python3.9/site-packages/prefect/tasks/secrets/vault_secret.py", line 87, in _get_vault_secret
    57    vault_creds = PrefectSecret(self.vault_credentials_secret).run()
    58    File "/usr/local/lib/python3.9/site-packages/prefect/tasks/secrets/base.py", line 66, in run
    59    return _Secret(name).get()
    60    File "/usr/local/lib/python3.9/site-packages/prefect/client/secrets.py", line 140, in get
    61    raise ValueError(
    62  ValueError: Local Secret "VAULT_CREDENTIALS" was not found.
    my code: from prefect.tasks.secrets.vault_secret import VaultSecret secret = VaultSecret("/######/########/###/#####/").run()
    k
    10 replies · 2 participants
  • k

    Kevin Mullins

    04/21/2022, 2:06 PM
    Just ran into something I wasn’t expecting I wanted to pass along. I’m in an Azure environment running Prefect on
    AKS
    . I’ve been using
    Docker
    storage for my flows and just recently started using
    AzureResult
    in my flows for result persistence. I’ve been letting the
    AzureResult
    use the
    AZURE_STORAGE_CONNECTION_STRING
    environment variable which is the recommendation from the documentation and plays nicely with
    k8s
    secrets in
    AKS
    . Just found after some infrastructure changes that the environment variable was not being used and finally found it it was because the
    Docker
    storage was capturing the environment variable during registration and serializing the value via
    cloudpickle
    into the flow. I’ll be looking into using secrets; however, I wanted to suggest maybe re-visiting the documentation or process to prevent accidental credential exposures in the
    Docker
    storage.
    k
    k
    9 replies · 3 participants
  • j

    Joshua Greenhalgh

    04/21/2022, 2:20 PM
    Hey any idea how to debug a failed flow on k8s (gke) that just says;
    Pod prefect-job-18e30a61-5tnpd failed.
    	Container 'flow' state: terminated
    		Exit Code:: 1
    		Reason: Error
    In the logs...
    k
    16 replies · 2 participants
  • t

    Tom Manterfield

    04/21/2022, 2:42 PM
    Hello everyone! Is there a way to create a full Prefect 2.0 setup entirely in code/config without CLI interactions? Ideally when creating work queues, agents, deployments etc I can just whack it all in a yaml file or similar and then have the same setup created every single time I apply that config. All I can see in the docs is a series of CLI commands for all of this, which is great for local dev and interacting on an ad-hoc basis, but I couldn’t run like that across multiple envs. I have searched (and searched and searched) but coming up with nothing. Hopefully I’m just missing the obvious! I’m deploying to Kubernetes, if that info makes a difference.
    k
    a
    15 replies · 3 participants
  • k

    Kevin Weiler

    04/21/2022, 3:00 PM
    is it possible to use the python API to get a flow_id given a project name and flow name?
    :discourse: 1
    k
    10 replies · 2 participants
  • a

    Ahmed Ezzat

    04/21/2022, 3:05 PM
    What is the equivalent of
    gather
    in prefect 2.0. I'm aware that tasks return instance of
    PrefectFuture
    what I want is a way to get finished futures as soon as it completes, keep in mind I'm talking about a list of futures not just one
    ✅ 1
    :discourse: 1
    k
    a
    +2
    17 replies · 5 participants
  • j

    Joshua Greenhalgh

    04/21/2022, 3:20 PM
    I promise to stop being annoying today!! After this lol Is it possible to write a single mutation to delete every flow in a given project?
    😂 1
    k
    a
    13 replies · 3 participants
  • p

    Patrick Tan

    04/21/2022, 3:43 PM
    When running child flows from a parent flow, how to pass parent flow run name to child flow? below does not work.
    a
    5 replies · 2 participants
  • j

    Jake

    04/21/2022, 3:58 PM
    Hi everyone, I’ve been running into an issue today
    AttributeError: 'V1Job' object has no attribute 'name'
    and I’m not sure what this means. Prefect cloud is reporting that
    No heartbeat detected from the remote task; marking the run as failed.
    This is a Kubernetes Agent. I looked at the logs and here is a stack trace:
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/prefect/agent/kubernetes/agent.py", line 413, in heartbeat
        self.manage_jobs()
      File "/usr/local/lib/python3.6/site-packages/prefect/agent/kubernetes/agent.py", line 193, in manage_jobs
    ERROR:agent:Error while managing existing k8s jobs
    Traceback (most recent call last):
        f"Job {job.name!r} is for flow run {flow_run_id!r} "
    AttributeError: 'V1Job' object has no attribute 'name'
      File "/usr/local/lib/python3.6/site-packages/prefect/agent/kubernetes/agent.py", line 190, in manage_jobs
        flow_run_state = self.client.get_flow_run_state(flow_run_id)
      File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1664, in get_flow_run_state
        raise ObjectNotFoundError(f"Flow run {flow_run_id!r} not found.")
    prefect.exceptions.ObjectNotFoundError: Flow run 'af8b8a74-8ed0-4417-812a-566de859ce64' not found.
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/prefect/agent/kubernetes/agent.py", line 413, in heartbeat
        self.manage_jobs()
      File "/usr/local/lib/python3.6/site-packages/prefect/agent/kubernetes/agent.py", line 193, in manage_jobs
        f"Job {job.name!r} is for flow run {flow_run_id!r} "
    AttributeError: 'V1Job' object has no attribute 'name'
    Deleting the Agent pod did not solve the issue. Any ideas?
    k
    a
    13 replies · 3 participants
  • m

    Matthew Seligson

    04/21/2022, 4:25 PM
    What’s the best practice for starting the run of a subflow X hours after the start of a flow? I have a StartFlowRun task that takes a scheduled_start_time. But will that scheduled start time get fixed at flow registration time? This should be dynamic depending on the start of the main flow.
    k
    10 replies · 2 participants
  • b

    Brian Phillips

    04/21/2022, 4:30 PM
    I'm curious why name templating isn't available for flow run names? Is anyone aware of a workaround for generating more readable names for simple, scheduled flows?
    k
    2 replies · 2 participants
  • m

    Matt Delacour

    04/21/2022, 4:42 PM
    hi all 👋 I am reading the Orion doc and playing the beta cloud product. I mostly need help regarding the whole deployment process. What I am looking for is a way to isolate dependencies between our use cases (ML, DBT, Dask, etc.). So in my mind, I imagine that our users will need to create docker images that can be used in when a job runs (see a quick diagram in the thread). Looking at the Orion documentation, what's not clear to me • How can I assign a specific Docker image to a job (so that we can spin up the right server)? The use of
    DockerFlowRunner()
    for that is unclear • I am guessing that I would need to use tags to glue any logic around our different environments (prod / adhoc). But I don't know if work-queues need to be created "manually" for each new "Repo user" (see diagram) Also feel free to point me to a tuto explaining all the best practices to deploy Prefect 🙏
    n
    6 replies · 2 participants
  • n

    Niclas Roos

    04/21/2022, 5:36 PM
    Hi, I’ve just started testing out prefect 2.0 but I can’t find secrets, are they not there anymore?
    k
    m
    +1
    9 replies · 4 participants
  • s

    Shuchita Tripathi

    04/21/2022, 6:12 PM
    Hi, I want to capture the return value of one task and pass it in another task. I am trying to do this via Result. But when using result, my flows are not running. they are stuck. Is result the right way to do this? if not, can someone point me to a better way of doing it? also, what can be the reason for flows getting stuck?
    k
    3 replies · 2 participants
Powered by Linen
Title
s

Shuchita Tripathi

04/21/2022, 6:12 PM
Hi, I want to capture the return value of one task and pass it in another task. I am trying to do this via Result. But when using result, my flows are not running. they are stuck. Is result the right way to do this? if not, can someone point me to a better way of doing it? also, what can be the reason for flows getting stuck?
k

Kevin Kho

04/21/2022, 6:15 PM
You can just pass it.
with Flow(..) as flow:
    a = task_a()
    b = task_b(a)
The result is more about checkpointing
s

Shuchita Tripathi

04/21/2022, 6:18 PM
when I am print the value of 'a' above, it is getting printed as a Task object. Is this expected?
k

Kevin Kho

04/21/2022, 6:23 PM
Yes if you print inside the Flow because
print
will execute during flow build time instead of runtime so the task value isn’t there yet and you print the Task object itself. If you print inside a task it should be right because the execution will take place during
flow.run()
View count: 3