https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    Saksham Dixit

    09/02/2022, 5:02 PM
    My Slack webhook on the prefect UI fails but does not give any failure reason. I'm using prefect version `'1.2.2'`:
    1️⃣ 1
    ✅ 1
    a
    1 reply · 2 participants
  • e

    Ela

    09/02/2022, 9:38 PM
    hi (: I noticed that my flow runs twice when I import tasks or functions from files placed in the same parent directory as the file which keeps the Flow object. I also get a UserWarning that my flow conflicts with 'another flow' and that the tasks of the flow conflict with 'some other tasks', although each task and flow have a unique name (warning example: "_A task named ... and defined at ... conflicts with another task. Consider specifying a unique
    name
    parameter in the task definition_"). I tried to see what's happening in the prefect.tasks.py file (which sends the warning) and I made it print out all the names in the PrefectObjectRegistry.get(). I noticed how it printed out the names of the tasks I'm using multiple times All of this disappeared when I imported tasks/functions from a different directory than where the flow is; the flow runs only once and the warnings are gone - I'm using Prefect 2.3.1 (also when I used version 1 these double runs happened until I changed the location of the func/tasks I was importing) I hope this makes sense, if you have any advice on why this happens, please let me know! thanks^^
    ✅ 1
    a
    3 replies · 2 participants
  • d

    datamongus

    09/02/2022, 9:47 PM
    Is there an known issue with prefect cloud, we are noticing several flows showing up as late schedules
    ✅ 1
    c
    6 replies · 2 participants
  • a

    Alix Cook

    09/02/2022, 10:23 PM
    I think I may have seen this asked somewhere before, but can no longer find the discussion, so apologies if this is duplicative, but.... I'm wondering if there's a way to know if a flow run running in a process (i.e. Process infra type) has been killed. I'm currently seeing these flow runs stay in status "running" when this happens, and im writing a cleanup that will reschedule these runs, but I'm having trouble identifying which flow runs have actually had their processes terminated
    ✅ 1
    m
    a
    +1
    11 replies · 4 participants
  • m

    Matt Fysh

    09/02/2022, 11:42 PM
    Hi all, is there a way to disable flow/task decorators for testing?
    ✅ 1
    a
    k
    6 replies · 3 participants
  • a

    Araik Grigoryan

    09/03/2022, 1:15 AM
    Hi. I would like to use Prefect 2.0 to schedule/run Java programs. These programs, as well as the shell scripts that invoke them, produce stdout/stderr messages. What is the best practice for capturing stdout/stderr from shell and non-Python programs such that they appear in the Orion UI?
    ✅ 1
    k
    6 replies · 2 participants
  • g

    Ghislain Picard

    09/03/2022, 5:24 PM
    I've a function that leak memory and I can't solve it because this happens in a Fortran library. I would like to run the task (or subflow) in a subprocess. Is there an easy way to do that in prefect ? Or maybe another solution to this problem.
  • y

    Yaron Levi

    09/03/2022, 6:17 PM
    Hi 👋 We are getting the: Block document has schema checksum sha256:xxxxx… which does not match the schema checksum for class ‘S3’
    ✅ 1
    j
    1 reply · 2 participants
  • y

    Yaron Levi

    09/03/2022, 6:17 PM
    ✅ 1
  • y

    Yaron Levi

    09/03/2022, 6:17 PM
  • y

    Yaron Levi

    09/03/2022, 6:18 PM
    I’ve tried to delete and re-create the S3 Block from the UI but it didn’t help.
  • y

    Yaron Levi

    09/03/2022, 6:18 PM
    Also cleaned the S3 folder.
  • y

    Yaron Levi

    09/03/2022, 6:18 PM
    Also tried to delete the deployment from the UI just in case. Nothing works.
  • y

    Yaron Levi

    09/03/2022, 6:19 PM
    Any ideas what could be the problem?
    ✅ 1
    j
    2 replies · 2 participants
  • v

    vk

    09/03/2022, 6:49 PM
    Hi all, I'm struggling create and run a flow in orion, for some reason orion cloud says "This deployment is deprecated" and "Run" button is inactive The code I run:
    @flow
    def print_hi():
        print(f'Hi, 123')  # Press ⌘F8 to toggle the breakpoint.
    
    
    deployment = Deployment.build_from_flow(
        name="print_hi",
        flow=print_hi,
        skip_upload=True,
        infrastructure=KubernetesJob(
            image='my-image',
        )
    )
    deployment.apply()
    This created a deployment, but why does it say
    This deployment is deprecated
    With the General Availability release of Prefect 2.0, we modified the approach to creating deployments.
    ? This is the code from latest test cases, prefect 2.3.1, how can it be deprecated? And why is "Run" button inactive?
    my-image
    is based on
    prefect-onion:latest
    and has my flow code in
    /opt/prefect/flows
    . By the way, how does it know which flow to run if there is more that one flow in that folder?
    ✅ 1
    j
    6 replies · 2 participants
  • y

    Yaron Levi

    09/03/2022, 9:24 PM
    We have an hourly job and we’ve used a schedule deployment with a cron expression. We are also using Render (Heroku alternative). In Render we have a background worker (that runs the Prefect agent) with a CRON expression similar to that of the deployment. This way the background worker wakes up just in time to process the upcoming schedules, and we don’t need to run the worker 24/7 and pay for it. But sometimes, the background worker may fail, or not wake up in time. For example, Render is having an outage. So in that case, when the background worker comes back online, it runs all the late schedules. But it runs them all, and immediately. Is there a way to somehow define a “late” run policy. So for example, when the worker is back online it won’t run the missed schedules, or maybe do run them, but in a “push” manner, which will push the upcoming schedules to make room for the lates ones, keeping the time spans equal between them.
    ✅ 1
    r
    5 replies · 2 participants
  • j

    Jari Rosti

    09/04/2022, 9:24 AM
    Is there a way to accomplish v1's "Manual trigger"-like behaviour in Prefect 2.0? I mean, flow goes to paused state, and then with some interaction user can resume it?
    ✅ 1
    a
    1 reply · 2 participants
  • m

    Michael Levenson

    09/04/2022, 5:25 PM
    Hi all, for v1.0 I forgot how to set a result location dynamically - ie importing the flow and then setting the result template
    ✅ 1
    a
    2 replies · 2 participants
  • g

    Georgi Yanev

    09/04/2022, 9:11 PM
    Hey there, I I am reading and playing with prefect v2 for a while. I already have some flows working perfectly locally using local storage and dask, but now I have to move everything to use docker in preparation for production. We have minio server and I want to utilize it as a storage form my flow code. And here comes my struggle and question. Should I have separate s3 block for every flow code to use in the deployments? If so how should I reorganize my project structure, because currently all my flows are leaving in the same folder and it will be strange to have the same files deployed in multiple blocks.
    ✅ 1
    r
    v
    3 replies · 3 participants
  • y

    Yousef Hosny

    09/05/2022, 2:25 AM
    Hi all, I was wondering if there is a way to get airbyte logs in the prefect UI, I am using
    prefect-airbyte
    to monitor & schedule my airbyte pipelines but unfortunately I am only getting these logs
    ✅ 1
    a
    2 replies · 2 participants
  • s

    Saurabh Indoria

    09/05/2022, 3:55 AM
    Hi all, Is there a way to get the time breakdown of a prefect task's execution journey? Like, when the task says it took 5 seconds to run, is it just the compute or some prefect functionality also taking time. Also, at the flow level, granular timing breakdown of what is purely compute time and what is prefect operations time.
    ✅ 1
    a
    1 reply · 2 participants
  • t

    Tan Li

    09/05/2022, 5:15 AM
    Hi community, yet another question need Prefect masters’ wisdom. Anyone know how to specify “TimeZone” using the prefect deployment cli? I noticed that you can specify, say cron, via
    prefect deployment build --cron "00 1 * * *"
    But how can I specify timezone in cli like how it can be done through the python object interface (I am having a weird bug, i.e. some folder under /tmp/* could not be found if a python deployment get execute in the second time)? I searched a bit on Github but only found some python code, and I also tried to looked at the code implementation (on 2.3.0), but it’s not very clear to me where that TimeZone flag got specified (already got lost in the *args maze) Any help would be appreciated! 🙏🙏🙏
    ✅ 1
    o
    5 replies · 2 participants
  • j

    Jari Rosti

    09/05/2022, 8:01 AM
    I am trying to use Prefect 2.3.1's REST API to create flow run (locally installed prefect server), but irrespective what I've tried, flow is never scheduled. I guess that the root cause is that I can't get it assigned to a work_queue. But I haven't been able to pass it, and I believe that it should come from the deployment. Flow works ok, when I run it from UI. Any ideas what detail I'm missing? I create a request:
    curl -X 'POST' \
      '<http://localhost:4200/api/flow_runs/>' \
      -H 'accept: application/json' \
      -H 'Content-Type: application/json' \
      -d '{
      "name": "my-flow-run",
      "flow_id": "adc10e6b-62b1-4a5a-ba75-a9d03107c75c",
      "deployment_id": "0a26b313-0134-43ac-b4ab-67bcbeeaebf9",
      "parameters": { "flow_run_id": "d3f49714-3b14-4bbf-a42c-7081149b9fb3" }
    }'
    and get a response:
    {
      "id": "1277b255-4250-45f7-b0bf-c06630dcac86",
      "created": "2022-09-05T07:51:42.448486+00:00",
      "updated": "2022-09-05T07:51:42.450190+00:00",
      "name": "my-flow-run",
      "flow_id": "adc10e6b-62b1-4a5a-ba75-a9d03107c75c",
      "state_id": "aae8e698-9df6-4efa-8692-2d9e97e01112",
      "deployment_id": "0a26b313-0134-43ac-b4ab-67bcbeeaebf9",
      "work_queue_name": null,
    ...
    ✅ 1
    a
    2 replies · 2 participants
  • m

    Mohamed Alaa

    09/05/2022, 8:18 AM
    Hello guys, i am hosting prefect's orion server on my K8s cluster and im finding dificulties to connect prefect cli to the remote orion server. Also, how does the connection work? Does anyone with my domain name that is hosting my orion uri api have the ability to send request to it? as this would be a security concern for us. Any guiides/ tutorials or videos are highly appreciated. Thanks in advance.
    :plus-one: 1
    a
    10 replies · 2 participants
  • m

    Matt Fysh

    09/05/2022, 8:41 AM
    hi all, does prefect have a way of letting me specify how to write the results of a flow / subflow to a database, so I can keep the ETL logic and database logic seperate?
    ✅ 1
    a
    1 reply · 2 participants
  • n

    Nuno Silva

    09/05/2022, 9:50 AM
    Hello, since Friday "Prefect Cloud will undergo scheduled maintenance for our secret store" https://prefect.status.io/pages/maintenance/5f33ff702715c204c20d6da1/630cf177ad12e9052dd03e42 that we have problems to connect to api.prefect.io from our AKS virtual nodes, we just get a urllib3 "name resolution error" and the flow does not even start. Any ideas? We're using v1.3.0
    ✅ 1
    c
    3 replies · 2 participants
  • b

    Ben Muller

    09/05/2022, 10:48 AM
    Hey community, just reading up about blocks in 2.0. Really interesting construct. I like the idea of the use for secrets and accessing aws, database etc etc. Currently we use 1.0 and have an internal "task library" type of dealio. Are you guys recommending that blocks could be used for something like that, so that you avoid duplicating general tasks across several repositories or is it best to keep tasks with logic away from blocks? It's appealing from a visibility standpoint in the UI but perhaps dangerous from a testing standpoint if someone accidentally changes something in a block that other repos rely on. Keen to hear your recommendations!
    ✅ 1
    a
    a
    5 replies · 3 participants
  • m

    Malavika S Menon

    09/05/2022, 12:02 PM
    I am trying to use postgres as the database instead of sqlite. While I managed to connect the webserver to the pg db, cli commands for deployments don't seem to reflect on orion. How does the connection between cli commands and the database work, how can I change the link from the old sqlite database to the new pg one?
    ✅ 1
    a
    2 replies · 2 participants
  • y

    Youssef Ben Farhat

    09/05/2022, 12:45 PM
    Hello, I want to ask something about the tasks : For example I have 3 tasks that are related in my flow. Before starting the second task, I need to be sure that the first one was working without errors and before starting the 3rd task, I need to be sure that the second one is working without errors too. How can I do this ?
    t
    1 reply · 2 participants
  • n

    Niels Prins

    09/05/2022, 1:58 PM
    Hello all, could use some advice. I have the following usecase, I'm getting a list of Delta's from een external API, these delta's need to be processed in Sequence, n + 1 cannot start before n has finnished and should not start if n fails. Processing a delta requires multiple tasks (2) that can be processed in parallel and a clean up task. I want the possibility to restart a subtask if it fails. What would be the propper construnct? I have explored LOOP, this will run through the tasks in sequence and a restart results in restarting from the first deltat on. I have tried a Flow of Flows, this will not allow for the dynamic tasks. Combining them resulted in a sub flow that was run for each delta in sequence, but a failure in the sub flow gives the complete end of the Looping task. Restarting results in running from the beginnen etc. Is there a better option for this usecase in prefect 1+?
    ✅ 1
    a
    8 replies · 2 participants
Powered by Linen
Title
n

Niels Prins

09/05/2022, 1:58 PM
Hello all, could use some advice. I have the following usecase, I'm getting a list of Delta's from een external API, these delta's need to be processed in Sequence, n + 1 cannot start before n has finnished and should not start if n fails. Processing a delta requires multiple tasks (2) that can be processed in parallel and a clean up task. I want the possibility to restart a subtask if it fails. What would be the propper construnct? I have explored LOOP, this will run through the tasks in sequence and a restart results in restarting from the first deltat on. I have tried a Flow of Flows, this will not allow for the dynamic tasks. Combining them resulted in a sub flow that was run for each delta in sequence, but a failure in the sub flow gives the complete end of the Looping task. Restarting results in running from the beginnen etc. Is there a better option for this usecase in prefect 1+?
✅ 1
a

Anna Geller

09/05/2022, 9:05 PM
can you move the code block to the thread? this helps us to keep the main channel cleaner
🙌 1
to answer your question: how would you approach it in Python without Prefect? This would be a good direction to consider. Not relying on Prefect 1 constructs such as LOOP will also make it easier to migrate to Prefect 2
n

Niels Prins

09/06/2022, 8:53 AM
Code snippet from original question See code snippet for what I got right now
import time
from typing import List
from uuid import uuid4

import prefect
from prefect import Flow, Parameter, Task, task
from prefect.engine.signals import FAIL, LOOP, signal_from_state
from prefect.executors import LocalDaskExecutor
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


@task
def get_deltas() -> List:
    return [1, 2, 3, 4]


class A(Task):
    def run(self, v):
        <http://self.logger.info|self.logger.info>(f"{v} A")
        if v == 3:
            raise ValueError("This is not the value you where looking for")
        for i in range(4):
            time.sleep(1)
            <http://self.logger.info|self.logger.info>(f"{v} - {i}")
        return "A"


class B(Task):
    def run(self, v):
        <http://self.logger.info|self.logger.info>(f"{v} B")
        time.sleep(5)
        return "B"


class C(Task):
    def run(
        self,
    ):
        <http://self.logger.info|self.logger.info>("C")
        time.sleep(2)
        return "C"


class Sequenced(Task):
    def run(self, ds: List):
        LOG = prefect.context.get("logger")
        loop_payload = prefect.context.get("task_loop_result", {"ds": ds})
        ds = loop_payload["ds"]
        if not ds:
            return

        d = ds[0]
        <http://LOG.info|LOG.info>(d)
        # setting this is required, if not the flow is detected as already run, and will not be started
        idempotency_key = str(uuid4())
        id = create_flow_run.run(
            idempotency_key=idempotency_key,
            flow_name="sub",
            project_name="test",
            parameters={"delta_id": d},
        )
        LOG.warning(id)
        # raising final state will result in the final state of this task being raised
        # this will lead to not calling the loop value
        flow_run = wait_for_flow_run.run(id, stream_logs=True, raise_final_state=False)

        state_signal = signal_from_state(flow_run.state)(
            message=f"{id} finished in state {flow_run.state}",
            result=flow_run,
        )
        if isinstance(state_signal, FAIL):
            raise state_signal

        raise LOOP(message="", result={"ds": ds[1:]})


with Flow(name="sub", executor=LocalDaskExecutor()) as sub:
    delta_id = Parameter(name="delta_id", required=True)
    a = A()(delta_id)
    b = B()(delta_id)
    C()(upstream_tasks=[a, b])


with Flow(name="main") as flow:

    deltas = get_deltas()
    Sequenced()(deltas)

if __name__ == "__main__":
    sub.register("test")
    flow.register("test")
    flow.run()
I would recursively apply the function on the head of the list. Made a POC of this, staring a new flow on succesfull completion of the delta process. Was fun, but we decided to move onther way, which was fitting for the use case. The whole thing gave me the oppertunity to talk about migrating again. At least that is now back on the table =D
🙌 1
:thank-you: 1
a

Anna Geller

09/06/2022, 10:41 AM
definitely something that would be much easier to do in Prefect 2, especially if you got it working in Python already. Looping is definitely a viable solution here for Prefect 1, too - LMK if there's anything we can help with. It seems like you have a good grasp on how to handle it and your code looks good
n

Niels Prins

09/06/2022, 1:16 PM
:gratitude-thank-you:
🙌 1
jeeej, GREEN LIGHT for starting a POC with prefect 2 on k8s ;D
❤️ 1
a

Anna Geller

09/12/2022, 2:40 PM
Great to hear that! 🙌 You can reach out to cs@prefect.io if you need help from infrastructure experts for your PoC
🙌 1
View count: 1