https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jacqueline Riley Garrahan

    03/01/2022, 9:46 PM
    Any advice for handling floats values as Parameters? Will selecting integer in this menu coerce my values?
    k
    a
    +1
    6 replies · 4 participants
  • w

    Wesley Jin

    03/01/2022, 11:45 PM
    What difference, if any, is there between the labels set during
    flow.register
    vs. on the
    run_config
    object e.g.
    ECSRun
    ? If not, is there a preferred place to set it?
    a
    5 replies · 2 participants
  • s

    soner

    03/01/2022, 11:50 PM
    hey everyone, how can I get the flow run id of the current flow that is running in flow definition?
    a
    1 reply · 2 participants
  • m

    Max Lei

    03/01/2022, 11:58 PM
    I'm trying to use
    DockerRun
    with
    LocalStorage
    but I have a
    config.py
    file that is not packaged in my package, but it is beside the
    flow.py
    however seems that
    flow.py
    was not able to access this. The working directory in docker is set to the same place where both
    config.py
    and
    flow.py
    . Is there a setting I can configure?
    a
    5 replies · 2 participants
  • a

    Aqib Fayyaz

    03/02/2022, 6:31 AM
    Can i use docker-storage for flow with ubtuntu 20.04 as a base image not prefect in Dockerfile?
    m
    a
    6 replies · 3 participants
  • y

    Yas Mah

    03/02/2022, 8:50 AM
    Hi, I have noticed, that the execution time for a flow which is run through the server is much than just running the flow locally without the monitoring from the server. I know that specific metadata will be exchanged between the server and the execution environement, but is there a way to optimize it?
    a
    1 reply · 2 participants
  • e

    Edvard Kristiansen

    03/02/2022, 10:44 AM
    Machine Learning Model Written in R? Hi Guys, I have tried to search around without any luck. Is it theoretically possible to run a machine learning model written in R through a R docker image with the docker agent? Or is a Vertex custom model using the Vertex agent the only viable option here?
    n
    a
    2 replies · 3 participants
  • a

    Aqib Fayyaz

    03/02/2022, 11:35 AM
    Hi, i have flow containing spark which runs our data pipeline. Using docker storage for flow and server is deployed on gke. The thing is when i register the flow from my local system it also start executing the flow in local environment so is this normal behaviour? What if i dont want to run the flow while registering?
    a
    5 replies · 2 participants
  • t

    Tomer Cagan

    03/02/2022, 12:25 PM
    Hi, Is there any best practices in case I want to run a library that has it's own parallelism? For example, in our code base we sometimes use solvers (e.g. https://github.com/coin-or/pulp, or https://developers.google.com/optimization/introduction/overview) which brings their own parallelism...
    a
    k
    4 replies · 3 participants
  • a

    Ayah Safeen

    03/02/2022, 1:53 PM
    Hi All,, I'm trying to install prefect Orion on linux Centos, but I keep getting this error when I use any of those commands
    pip install -U "prefect>=2.0a"
    pip install -U "prefect==2.0a9"
    Do anyone have any idea about this,,
    ERROR: Could not find a version that satisfies the requirement prefect==2.0a9 (from versions: 0.5.0, 0.5.1, 0.5.2, 0.5.3, 0.5.4, 0.5.5, 0.6.0, 0.6.1, 0.6.2, 0.6.3, 0.6.4, 0.6.5, 0.6.6, 0.6.7, 0.7.0, 0.7.1, 0.7.2, 0.7.3, 0.8.0, 0.8.1, 0.9.0, 0.9.1, 0.9.2, 0.9.3, 0.9.4, 0.9.5, 0.9.6, 0.9.7, 0.9.8, 0.10.0, 0.10.1, 0.10.2, 0.10.3, 0.10.4, 0.10.5, 0.10.6, 0.10.7, 0.11.0, 0.11.1, 0.11.2, 0.11.3, 0.11.4, 0.11.5, 0.12.0, 0.12.1, 0.12.2, 0.12.3, 0.12.4, 0.12.5, 0.12.6, 0.13.0, 0.13.1, 0.13.2, 0.13.3, 0.13.4, 0.13.5, 0.13.6, 0.13.7, 0.13.8, 0.13.9, 0.13.10, 0.13.11, 0.13.12, 0.13.13, 0.13.14, 0.13.15, 0.13.16, 0.13.17, 0.13.18, 0.13.19, 0.14.0, 0.14.1, 0.14.2, 0.14.3, 0.14.4, 0.14.5, 0.14.6, 0.14.7, 0.14.8, 0.14.9, 0.14.10, 0.14.11, 0.14.12, 0.14.13, 0.14.14, 0.14.15, 0.14.16, 0.14.17, 0.14.18, 0.14.19, 0.14.20, 0.14.21, 0.14.22, 0.15.0, 0.15.1, 0.15.2, 0.15.3, 0.15.4, 0.15.5, 0.15.6, 0.15.7, 0.15.8, 0.15.9, 0.15.10, 0.15.11, 0.15.12, 0.15.13, 1.0rc1, 1.0.0)
    ERROR: No matching distribution found for prefect==2.0a9
    b
    k
    8 replies · 3 participants
  • b

    Brett Naul

    03/02/2022, 1:53 PM
    q about mapping and PAUSE signals: the mapping docs say
    Even though the user didn't create them explicitly, the children tasks of a mapped task are first-class Prefect tasks. They can do anything a "normal" task can do, including succeed, fail, retry, pause, or skip.
    this seems kinda true, but when you try to raise PAUSE inside a mapped task it seems to just loop infinitely, whereas for a normal task you see
    <Task: pause> is currently Paused; enter 'y' to resume:
    . anyone have a strong opinion on whether this is a bug or just not a supported usage?
    from prefect import Flow, task
    from prefect.engine.signals import PAUSE
    
    @task
    def pause(i):
        import prefect
        if not prefect.context.get('resume'):
            raise PAUSE("pausing")
        return i
    
    with Flow("f") as f:
       # pause.map(i=[1])  # infinite loop
       pause(i=1)          # works normally
    
    f.run()
    k
    m
    +2
    14 replies · 5 participants
  • d

    Daniel Nilsen

    03/02/2022, 2:59 PM
    Hi Im am using docker storage and I am trying to make a docker image with my flow. I am getting an error. Any idea about how I can solve this?
    Step 15/15 : RUN python /opt/prefect/healthcheck.py '["flow-data_transformation.py"]' '(3, 10)'
     ---> Running in 2853f6886cd5
      File "/opt/prefect/healthcheck.py", line 17
        def system_check(python_version: str):
                                       ^
    SyntaxError: invalid syntax
    a
    8 replies · 2 participants
  • a

    Adi Gandra

    03/02/2022, 4:16 PM
    Hey, i have a task that has ran successfully in the past, but now it gets stuck on ’Task ‘X’: Starting task run...’. All other tasks run fine, and if i manually kill the job and restart it, it runs fine again. I’m running on EKS, so its spinning up a pod to run this task. Any idea’s for debugging? The task just doesn’t seem to start
    a
    k
    8 replies · 3 participants
  • r

    Rajan Subramanian

    03/02/2022, 4:22 PM
    hello, have a question about prefect orion. is there a shellTask capability in orion that was available in prefect core? Another question in terms of implementation: i have an interesting use case. I am working as a cryptocurrency software engineer. I need to develop streaming pipelines from various exchanges that trade cryptocurrencies. for each exchange, i have two python files that streams different data. So, since i have 4 exchanges at the moment, thats 8 files of streaming data for a particular currency. What i did initially, i wanted all these files to be running at the same time where after every 500 records streamed, i am pushing this to a redis stream. i have a 2nd file where i have transferred data from teh redis stream to a postgresql database and cleaned out the stream and t his is scheduled every 5 minutes. All of this is done asynchronously. Initially, when i got into prefect, i decided to create shellTask and make shell calls using the dask executor. So, i created a file called exchange_to_redis_pipeline.py in prefect where i create a shellTask and store the path to the 8 files that streams the exchange. i think loop through the list and execute shellTask using the dask executor. This works incidentally and i see pipelines executed on the prefect cloud. However, after an hour or so, some of the tasks lose heartbeat and they just stop working. I am not sure if this is due to the websocket or due to prefect losing heartbeat on tasks where its expecting the task to end? Hence, my attempt at using orion now. There are quite a few changes in this library that isn't in prefect. Flow() constructor is no longer available. But also not what is available is the shellTask() capability. Curious if my method is the standard way of achieving this in prefect. Thank you
    a
    k
    60 replies · 3 participants
  • x

    Xavier Babu

    03/02/2022, 5:55 PM
    I get the following error while running Prefect in Windows. ImportError: cannot import name 'ThreadedChildWatcher' from 'asyncio' (C:\Users\xbabu\Babu\anaconda3\envs\prefect-dev\lib\asyncio\__init__.py)
    k
    a
    14 replies · 3 participants
  • r

    Rajan Subramanian

    03/02/2022, 6:23 PM
    Hello, how does one create a schedule in orion similar to prefect core? I had:
    from prefect.orion.schemas.schedules import IntervalSchedule
    schedule = IntervalSchedule(interval=timedelta(minutes=5))
    @flow(
        name="redis_to_postgres_pipeline", task_runner=DaskTaskRunner(), schedule=schedule
    )
    def run_flow(shell_task: str):
        cmd = build_command(shell_task)
        run_in_shell(command=cmd)
    Getting an error, flow doesn't accept schedule as argument anymore
    k
    3 replies · 2 participants
  • v

    Vamsi Reddy

    03/02/2022, 7:42 PM
    Hi everyone, we have been experimenting with the ECSRun and set it up successfully. I was trying to test if the autoscaling feature works so I created a script to submit 100 flow runs in a loop. for the most part most of the runs succeeded however a few of them failed with error:
    An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 2): Rate exceeded
    does anyone know why some of the runs failed?
    a
    d
    7 replies · 3 participants
  • c

    Chris Reuter

    03/02/2022, 7:45 PM
    Hey everyone 👋! @Nate & I are showing up & showing out on PrefectLive in 15 minutes (3p Eastern). We're going to work with Orion caching this week and the featured theme is........ ♟ CHESS ♟ See you there: twitch.tv/prefectlive
    🚀 4
    ♟ 3
  • l

    Leo Kacenjar

    03/02/2022, 8:02 PM
    Hi there, I'm trying to automate registering/updating flows via a Jenkins automation job. I pull and build our docker image, login via the cli and then attempt to register the flows. This is all working well. However every time I run this process, I get a new version for all of my flows even when nothing but the docker image id and container id has changed. Does it seem possible to not create these revisions or is this by design? I've tried registering with numerous versions of the command. Thanks!
    k
    j
    6 replies · 3 participants
  • p

    Patrick Tan

    03/02/2022, 8:29 PM
    Hi, I want to register a flow to Prefect Cloud with flow script uploaded to S3 storage. It is giving me "An error occurred (AccessDenied) when calling the PutObject operation". How do I pass AWS credentials to this registration activity?
    k
    9 replies · 2 participants
  • c

    Christian Nuss

    03/02/2022, 8:33 PM
    hey all again! Do you have recommendations on how to handle an Agent killed due to OOM? We currently see Flows stay in a
    Running
    state, then we do a cancel via the API, which leaves them in Cancelling state
    k
    13 replies · 2 participants
  • h

    Hedgar

    03/02/2022, 9:31 PM
    I think I have a challenge with my flow as it's running twice instead of once(the scheduled time) If The last line of my flow code is
    flow.register(projectname, flowname e.t.c
    Would it be necessary to do again on the command line
    prefect register...
    before doing
    prefect agent local start
    ?
    k
    9 replies · 2 participants
  • j

    Jason Noxon

    03/02/2022, 9:40 PM
    Hi all! Anyone using Prefect Agent from the Azure Marketplace?
    k
    a
    4 replies · 3 participants
  • m

    Max Lei

    03/03/2022, 4:03 AM
    Is there an option you can disk space set for fargate cluster? Along the lines of:
    "ephemeralStorage": {
       "sizeInGiB": 200
    }
    k
    8 replies · 2 participants
  • s

    Sudharshan B

    03/03/2022, 4:38 AM
    Is there a way to get the status of the flow which is already created and scheduled ( if the flow is already running or not running ) without using run_id in python3 ? (use like flow name instead) we use prefect cloud and using the python script in prefect agent.
    a
    d
    2 replies · 3 participants
  • d

    Daniel Nilsen

    03/03/2022, 9:53 AM
    Hi I have a problem running my flow. The problem has occurred before with the
    local agent
    . The fix for that was to run the agent from the folder of the flow. How do I fix this for the docker agent when the flow is dockerized?
    Failed to load and execute flow run: ModuleNotFoundError("No module named 'parameters'")
    a
    13 replies · 2 participants
  • v

    Valantis Hatzimagkas

    03/03/2022, 10:08 AM
    Hello! I have noticed the following behavior and I cannot figure out what is going on. (I am performing the following action on my local machine) I have done he following: 1. I have created a flow which has a task that gets some data 2. I am using Docker storage 3. I have done the required steps(building base image, passing dependencies, running server with --expose, adding a docker agent that communicates in the same network with my server) My Issues: I have added a for loop in my task, this is causing the task to be stuck in the running state I have attempted a dictionary comprehension inside my task, it caused a ι SystemError('Objects/dictobject.c:1555: bad argument to internal function') I tried the same piece of code in local storage and it works fine, so I think there is something with Docker storage that I am missing.
    a
    a
    7 replies · 3 participants
  • s

    Sen

    03/03/2022, 10:28 AM
    I have tried the suggestion from @Abhishek by following the GitHub project at https://github.com/kvnkho/demos/tree/main/prefect/docker_with_local_storage But it still doesn't work as expected. I believe I am missing some configuration.
    a
    19 replies · 2 participants
  • n

    Nico Neumann

    03/03/2022, 12:37 PM
    Hi! I have a question regarding running prefect on AWS ECS. I am currently using Fargate to launch my flows. I have a pretty big docker image (~2-3GB uncompressed) which adds some dependencies (not only python) on top of the prefect docker image. The problem is, that for every flow Fargate pulls the image from AWS ECR (in the same VPC) which results in multiple minutes to start. Most of the runs are small, so I need to wait couple of minute to start and then they finish within a few seconds. Let’s assume I start 100 flows a day, this would result in 200-300gb of pulling the same image. My first idea was to split the image into multiple images and use subflows. Then every subflow could specify which image and dependencies it needs. Or I could try to reduce the image somehow. But in both cases even at 0.5GB per image it would result in pulling 50GB a day. I found this AWS issue regarding caching the image: https://github.com/aws/containers-roadmap/issues/696 Unfortunately caching is currently only supported for EC2 but not for Fargate. So my second idea was to use EC2 instead? But I am not sure how well it scales. This would result in startup and shutdown of EC2 instances depending on how many flows are running. So it might just shifts the startup problem as flows might need to wait for another EC2 instance to start. I used this tutorial to set-up everything for Fargate: https://towardsdatascience.com/how-to-cut-your-aws-ecs-costs-with-fargate-spot-and-prefect-1a1ba5d2e2df (thanks to @Anna Geller, this works great!) But I could not figure out how to do it for EC2 properly. If I understand correctly, EC2 has one IP per instance while Fargate has one IP per every flow, so the set-up would be a little different. My main problem is the long startup time of multiple minutes and I am not sure what’s the best way to deal with it. Maybe someone experienced the same problem and found a better solution?
    n
    a
    7 replies · 3 participants
  • y

    Yas Mah

    03/03/2022, 1:57 PM
    Hello 🙂 which possibilities are there to access the result of a task and use it in an other operation with the flow, which is not a task:
    @task
    def get_access_paths(base_path:Path):
        return base_path
    
    with Flow("flow") as flow:
        base_path = Parameter("base_path", default=pathlib.Path(__file__).parent.parent.resolve())
        data_access = get_access_paths(base_path)
        files = [str(Path.joinpath(data_access, x)) for x in data_access.glob('*') if x.is_file()]
    
        input = Parameter("input", default=files)
    k
    2 replies · 2 participants
Powered by Linen
Title
y

Yas Mah

03/03/2022, 1:57 PM
Hello 🙂 which possibilities are there to access the result of a task and use it in an other operation with the flow, which is not a task:
@task
def get_access_paths(base_path:Path):
    return base_path

with Flow("flow") as flow:
    base_path = Parameter("base_path", default=pathlib.Path(__file__).parent.parent.resolve())
    data_access = get_access_paths(base_path)
    files = [str(Path.joinpath(data_access, x)) for x in data_access.glob('*') if x.is_file()]

    input = Parameter("input", default=files)
k

Kevin Kho

03/03/2022, 2:04 PM
Hi @Yas Mah, 🙂 I think the thinking is the other way around. You need to make the non tasks into tasks to defer their execution to run time. Otherwise they will run while the Flow (DAG) is being constructed.
🙏 1
y

Yas Mah

03/03/2022, 2:49 PM
Thank you @Kevin Kho, it makes sense!
View count: 2