https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Jason Carter

    07/23/2020, 7:22 PM
    Hi everyone, looking for any tips/pointers on where I'm going wrong.... I'm a couple days new to Prefect (used Airflow in the past) and I'm trying to just setup a hello world type thing. Using Prefect Core I was able to get running via CLI and also got the scheduling working. My problem comes when I'm trying to visualize and "register" a flow in the UI The UI is up and running, everything is green, I ran
    prefect backend server, prefect server start and prefect agent start
    in that order but no flow in the UI.
    import prefect
    from prefect import task, Flow
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud!")
    
    flow = Flow("hello-flow", tasks=[hello_task])
    
    # flow.run()
    flow.register()
    👀 1
    d
    m
    17 replies · 3 participants
  • a

    Ashish Arora

    07/23/2020, 11:08 PM
    Hello everyone, Is there a way for you to register a flow (which gives you the UI url on the localhost) and then visualize a particular run of the flow that was executed from the python code itself using (flow_name.run() function) or it only works for manual UI runs and scheduled jobs?
    j
    1 reply · 2 participants
  • j

    Jeremy Knickerbocker

    07/24/2020, 5:31 AM
    Hi Everyone, I am trying to come up with the best flow storage/deployment strategy for easy maintenance with a small team, but that is not going to be super painful to scale as we grow. We are running the Prefect Core server and have that configured properly, but I am struggling with the best approach to register flows and store them. I have pieced together a lot of different ideas from the documentation and slack, but I still think I am missing something. We have an ETL VM that we will be using as our host machine, it has Docker installed, and also hosts the Core UI. We use Azure for all of our infrastructure (VMs, SQL Server, Blob Storage, DevOps Repos, Container Registries) and are tied to that platform, but are open to using any other services available in Azure. We also have several internal python packages that we are actively developing (read, changing often) for interacting with our EHR and other systems. Looking at the available agents, the Docker Agent seems like it would be the best fit for us right now. However, I am struggling with the best approach to get our packages from our private repo and our flows into a docker image. Since we are using SQL Server, we have to customize our images to install pyodbc and the MSSQL ODBC drivers, the smallest image we have been able to develop has been ~600 MB. We are currently using a multi-stage build to clone our internal packages from Azure DevOps, then copying them to the image and setting the Python path. This works, but every time one of the packages changes we need to rebuild the image, push to the registry, and pull on the host machine; it feels extremely wasteful of cycles and storage. I was considering cloning the repositories to our host VM and using docker volumes to share the packages and flows, then we would not have to build the image every time we made a change. I was really excited to read the 0.12.5 release notes, I am hopeful the script storage functionality will benefit our use case, but I still need to explore it further. Does anyone have any suggestions on how to better architect this? I looked at https://docs.prefect.io/orchestration/execution/storage_options.html#non-docker-storage-for-containerized-environments but that seems to only be flow storage and I need to handle additional files that are updating very often. Thank you for any recommendations you may have!
    👀 1
    j
    j
    2 replies · 3 participants
  • j

    Jason Carter

    07/24/2020, 5:44 PM
    Hello, I have another "quick" question branching from my previous issue. 1. If I make a change to my file say
    first_flow.py
    do I have to run
    python first_flow.py
    in order to get it to register the change? (
    flow.register()
    is in my py file) 2. I'm now seeing my flow in the UI but when I try to run it via the UI (as a one off run) it goes into "scheduled to run" but late and never runs (i just mark as cancel after 3mins).
    import prefect
    from prefect import task, Flow
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud!")
    
    flow = Flow("hello-flow", tasks=[hello_task])
    
    # flow.run()
    flow.register()
    https://prefect-community.slack.com/archives/CL09KU1K7/p1595532174143800
    m
    n
    6 replies · 3 participants
  • j

    Jan Feřtek

    07/24/2020, 5:57 PM
    Hi all, I am trying to solve the exact same issue: https://prefect-community.slack.com/archives/CL09KU1K7/p1585863863351300 The given solution is not working for me. I want to use Parameter in shell command:
    from prefect import task, Flow, Parameter
    from prefect.tasks.templates import StringFormatter
    from prefect.tasks.shell import ShellTask
    
    
    msg_task = StringFormatter(template='{my_string}')
    
    shell_task = ShellTask()
    
    with Flow("string-template") as flow:
        name = Parameter('name')
    
        msg_output = msg_task(my_string=name)
        shell_output_static = shell_task(command=f"echo Arthur")
        shell_output_dynamic = shell_task(command=f"echo {msg_output}")
    
    
    flow_state = flow.run(name='Marvin')
    
    
    print(flow_state.result[msg_output].result)
    # Marvin
    
    print(flow_state.result[shell_output_static].result)
    # Arthur
    
    print(flow_state.result[shell_output_dynamic].result)
    # Command failed with exit code 2
    👀 1
    n
    4 replies · 2 participants
  • a

    Avi A

    07/24/2020, 7:21 PM
    Hey there, I’ve been working on my prefect workflows for a while now and I want to take it to production grade. To that end, I’d like to have my flow turn on a dask cluster, run for a while and then turn it off at the end (even if it fails). I was thinking about having tasks for that but since I’m configuring my flow to run on the cluster, these tasks would need the cluster running in order to operate. I guess there’s something here I’m missing regarding best practices for deployment, can someone here advise based on their own setup, or point me to the right place in the documentation? Thanks!
    j
    j
    39 replies · 3 participants
  • p

    Pedro Machado

    07/24/2020, 8:12 PM
    Hi. I have a scheduled, parameterized flow that missed a few flow runs. Is there a quick way to re-run a failed flow with the same parameters? My flow relies on the flow start date (context variable). I am using Prefect Cloud.
    j
    5 replies · 2 participants
  • t

    Tsang Yong

    07/24/2020, 8:48 PM
    Hi, if I have certain run time conditions to check before defining some upstream tasks, can I still use the same task variable for dependencies?
    j
    3 replies · 2 participants
  • m

    Matt Wong-Kemp

    07/24/2020, 8:58 PM
    When using the
    DaskExecutor
    is it safe to share the dask cluster between the execution of the tasks and direct use of dask? The use case here is I have a large number of small tasks I'd like to do in parallel, followed by some large joined data analysis. I'd like to gain the concurrency from running on a dask cluster in my flows, but at the same time I want to perform some data analysis using the dask
    Dataframe
    class and distribute this across a cluster as well. If I provision the cluster myself, is it safe to share the scheduler between the flow and the dataframe library? Or should I expect to need to provision my own dask cluster inside a task to run my Dataframe code on?
    j
    13 replies · 2 participants
  • m

    Mike Ninov

    07/24/2020, 11:16 PM
    What are the capabilities of perfect?
    n
    j
    4 replies · 3 participants
  • a

    Avi A

    07/25/2020, 8:39 PM
    Hey there, I’m having a flow with many mapped tasks (about 5k), running on a
    DaskExecutor
    . The flow fails due to high memory usage by the dask worker which is being killed again and again endlessly. Questions: 1. It seems that Dask/Prefect don’t serialize the completed tasks outputs when the worker gets to high memory usage. I’m not that proficient in Dask, I used Spark a lot and I know that it dumps to disk whenever memory is running low. How can I configure Dask to do the same (or is it a prefect thing?). Worth to mention that the prefect agent, dask scheduler and worker are all on the same machine. 2. Is there some way to have the flow fail in this case? It keeps restarting the worker but not failing, so I don’t get a message that this failure happens Thanks!
    👍 2
  • d

    Daniel

    07/25/2020, 10:41 PM
    I have a kind of stupid question. I want to put my tasks into separate files to keep the code modular, but I want to assign the same state_handler to all the tasks. Is there a way to do this from the Flow?
    1 reply · 1 participant
  • a

    Alfie

    07/26/2020, 10:07 AM
    Hello folks, I register a flow in this way:
    import prefect
    from prefect import task, Flow
    
    @task
    def hello_task():
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>("Hello, Cloud!")
    
    flow = Flow("hello-flow", tasks=[hello_task])
    
    flow.register(labels=['public'])
    But the label of a flow run is:
    "type": "LocalEnvironment",
      "labels": [
         "public",
         "AppledeMacBook-Pro.local"
      ],
    Is there a way to get rid of label “AppledeMacBook-Pro.local” for it impacts the agent to pick up the flow run? thanks
    k
    5 replies · 2 participants
  • v

    Vikram Iyer

    07/27/2020, 5:45 AM
    Hi everyone, I was wondering what configuration is required to not see the below error. So, by default the
    PREFECT_SERVER__GRAPHQL_URL
    is set to http://localhost:4200/graphql but I want to change it some other URL. How can I achieve this?
    n
    9 replies · 2 participants
  • d

    delphi

    07/27/2020, 7:09 AM
    Hello guys! I am new to Prefect. When I am running Prefect Core locally on my mac I just cannot start running the agent. I followed the step as the document describes and just got the information: prefect.utilities.exceptions.AuthorizationError: No agent API token provided. I don't know where to find the API token for my local server. Maybe I missed some information from the Doc ? Please help me make it up and run, thank you !
    🚀 1
  • d

    delphi

    07/27/2020, 7:26 AM
    Problem solved thanks to another thread with the same problem :)
    🚀 3
  • r

    Robin

    07/27/2020, 8:45 AM
    Dear all, I am currently trying to use AWS ECR as container registry, after having successfully experimented with dockerhub. What is the best way to authenticate via prefect to AWS ECR? 🙂
    ✅ 1
    b
    n
    24 replies · 3 participants
  • a

    Adam

    07/27/2020, 9:05 AM
    Hi everyone, hope you all had a lovely weekend. Would anyone have some examples of deploying flows on a CI server? I'm trying to get a better understanding of when to call storage.build() and how we can abstract that a bit for our analysts. Thanks!
    z
    2 replies · 2 participants
  • a

    Arsenii

    07/27/2020, 10:18 AM
    Does anyone else experience the issue with the parameter field breaking up in the run view (in Cloud UI)? Firefox on Win/Chrome on Mac
    👀 1
    b
    2 replies · 2 participants
  • m

    Matthew Maldonado

    07/27/2020, 12:06 PM
    Hi, I'm running into problems running MySQL commands from prefect. I've tried pymysql, mysqlconnector, and sql alchemy. I can't figure out if this is a mysql problem or a seeting I'm missing or a Prefect problem. The code runs and ends fine in a jupyter notebook. However, when prefect runs it it never gets the signal that the command ends. I even tried packaging it up in a stored procedure and everything ran perfectly fine. Prefect just never got the response that that it completed and I can't figure out why. Any, help would be appreciated. I also have other sql commands in other tasks that run just fine. I've attached the code.
    sql query.txt
    a
    z
    19 replies · 3 participants
  • m

    Matthew Maldonado

    07/27/2020, 1:12 PM
    I'm also a little new to using the built in tasks. I want to try the mysqlexecute task but how do i pass in upstream tasks to it?
    n
    2 replies · 2 participants
  • k

    karteekaddanki

    07/27/2020, 1:18 PM
    Hi, I cannot seem to get my flow to run on a LocalDaskEnvironment from the cloud UI. When I use
    flow.run()
    locally it seems to work. Here is the code snippet I am using for registering my flow. I am using
    Docker
    storage.
    if args.base_image:
            storage = Docker(
                    base_image=args.base_image,
                    registry_url=args.registry_url,
                    local_image=args.local_image)
            for flow in flows:
                path = storage.add_flow(flow)
                print(f"storing flow {flow.name} at {path} in the image.")
            storage = storage.build()
            for flow in flows:
                flow.storage = storage
                flow.environment = LocalEnvironment(
                        executor=LocalDaskExecutor(),
                        labels=args.labels)
                flow.register(project_name=args.project, build=False)
    Am I doing something wrong? I am on the developer version of the cloud backend. EDIT: To be more precise, when I run
    flow.run()
    I see concurrency on the mapped tasks, but not when using the cloud API (even for a single flow).
    👀 1
    n
    2 replies · 2 participants
  • a

    Adam

    07/27/2020, 2:50 PM
    Hi everyone, a quick question. What is the relation between the Docker container and Dask? I'm struggling to make the link between the two. To my knowledge Dask is not aware of containers? (I've never used Dask but used k8s/docker extensively so just looking to find the link between how the container is run against Dask)
    c
    3 replies · 2 participants
  • l

    Leo Meyerovich (Graphistry)

    07/27/2020, 3:13 PM
    Anyone have experience / recs for doing Prefect <> GCP serverless (any form)? This is part of some citizen journalism web scraping efforts (twitter, fb, ...), so will be needing to vet & recycle IPs, and thus expect nodes to be somewhat short-lived (1min-1mo), and with many native serverless, 15min anyways
    👀 1
    n
    3 replies · 2 participants
  • l

    Leo Meyerovich (Graphistry)

    07/27/2020, 3:14 PM
    (We already have prefect for our normal GPU jobs on our stable cluster)
  • j

    Jared

    07/27/2020, 3:21 PM
    Using Prefect cloud, let's say I want to set flow SLAs (to be explicit: I want to be able to set cloud hooks, or something similar, when a flow does not complete a certain amount of time after its scheduled start). Is there a good integrated way to do this integrated with cloud? I see some copy about workflow SLAs on the website, so I assume something like this exists, but I don't see any other references in the docs/github/slack.
    c
    1 reply · 2 participants
  • z

    Zach

    07/27/2020, 5:17 PM
    Can someone tell me what the little yellow exclamation point next to the "SCHEDULE" toggle button means?
    n
    k
    2 replies · 3 participants
  • t

    Tarcisio

    07/27/2020, 6:01 PM
    Hi everyone, there a way to stop storing task results?
    k
    c
    +1
    7 replies · 4 participants
  • s

    Sven Teresniak

    07/27/2020, 6:41 PM
    Hi. Is it possible to define a timeout on a flow run's (not task!) runtime? Because sometimes a flow is stalled. The flow is in state RUNNING but every one of the flows' tasks is at state PENDING. I have a prefect-server setup with a dask scheduler and some dask worker doing the heavy lifting. All components running in containers but prefect is NOT spawning containers (static server setup). Unfortunately I had to set a lifetime (=auto-restart) for dask workers (because after maybe some hundeds finished tasks the dask worker's cpu AND ram consumption is increasing -- even when no flow is running the cpu is running at >60%!). The RUNNING flow state with PENDING tasks seems to be persisted in the DB but is never updated, checked, deleted. After a complete restart of all components (apollo, dask, agent, etc.) the flow run(s) are still in the same state but nothing happens. This stalled flow run blocks every other task because its started by a scheduler. Could it be helpful to define a ResultHandler to allow the agent to pickup the flow run's crappy state and maybe continue or abort the broken flow run? I'm using a NFS to share data between dask worker and agent (mounted under
    $PREFECT__HOME_DIR
    . I see my pickled flows and a lot of pickled results. But Prefect seems not to be aware of that. The last log line is btw
    Flow 'session-creation-watchdog': Handling state change from Scheduled to Running
    . In a successful run, the next log line would be the start of my first task. I don't know where to start to debug that. What can I do to better understand the problem? Is there a timeout or can I force a "PING" to the flow to identify the real state?
    k
    m
    +1
    9 replies · 4 participants
  • m

    mithalee mohapatra

    07/27/2020, 7:07 PM
    Hi. How can we run an existing flow version in S3 from the Prefect UI? Any directions?
    k
    2 replies · 2 participants
Powered by Linen
Title
m

mithalee mohapatra

07/27/2020, 7:07 PM
Hi. How can we run an existing flow version in S3 from the Prefect UI? Any directions?
k

Kyle Moon-Wright

07/27/2020, 7:13 PM
Hey @mithalee mohapatra, If you've defined the code to be stored with AWS S3 storage on your flow, then that version of your flow will have to be registered to Prefect Cloud. The latest version of your registered flow will be the 'active' version of that flow group for you to be able to kick off flow runs from the UI.
m

mithalee mohapatra

07/27/2020, 7:25 PM
Thank you.
👍 1
View count: 1