https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Alex Rogozhnikov

    10/11/2021, 7:10 PM
    hello, some questions about orion (I am new so I decided to start with orion): • is there a way to run it on a fargate cluster? • is there a complete example of using own ssh cluster? • when I run things from jupyter, is this scenario considered supported? • if I change something in the package I develop - how do I make the same change delivered to agents/executors?
    n
    • 2
    • 6
  • t

    Tony Hill

    10/11/2021, 8:21 PM
    👋🏽
    👋 8
    k
    • 2
    • 2
  • t

    Tadej Svetina

    10/11/2021, 8:21 PM
    Would you consider creating multi-arch docker images (specifically for ARM processors)? this would enable me to use Prefect on the latest gen AWS instances
    m
    • 2
    • 4
  • m

    Matt Alhonte

    10/11/2021, 9:11 PM
    #RubberDucking So, we're using Papermill for a lot of stuff.  I think the code that runs in Papermill isn't actually being distributed with the Dask scheduler, and is basically just being treated as one function call.  I guess the way to have the Papermill code get Dask-ified would be: Prefect flow starts in a lightweight container Prefect flow spins up a Dask cluster Modify Papermill notebooks so that they take an Address of a Dask cluster as an argument, and then submit the code that runs in them to the Cluster That sound right?
    k
    • 2
    • 29
  • s

    Shaoyi Zhang

    10/11/2021, 11:08 PM
    Hey team, is it possible to set up some automations that receives an email address and invite that email address to a prefect tenant? We are trying to build a self-service platform where stakeholders can request prefect accounts by themselves.
    z
    • 2
    • 3
  • j

    jake lee

    10/12/2021, 2:19 AM
    Hi! I’ve been setting common parameter for all the task such as
    log_stdout=True, name="<task name>", state_handlers=[state_handler])
    and wanted to have common parameter setted up to just call them. Is there any sample where such thing was done? or is there any way that I can set s common parameter that are being used in my company flows?
    k
    • 2
    • 2
  • r

    Ryan Sattler

    10/12/2021, 5:35 AM
    I find it a bit clunky that we have to either write all our code in one giant file, or rebuild + upload a docker image every time any of the other files change. Is Orion going to have a cleaner way to structure flows in multiple files?
    👀 1
    :upvote: 2
    c
    e
    +3
    • 6
    • 15
  • d

    Darren Fleetwood

    10/12/2021, 7:26 AM
    Hi all! I have a Kubernetes cluster via AKS that’s running the Kubernetes agent. I’m trying to use the cluster autoscaler on node pools to scale the cluster depending on the cpu and memory requests of the prefect flows. We’re using prefect cloud for orchestration. The jobs are being submitted fine but they don’t seem to be consistently triggering scale up. Sometimes the exact same flow, with the same resource requests, will trigger the autoscaler, sometimes it won’t. When I take a look at the pods, the ones that run successfully and the ones that sit in pending have the same resource requests, etc. so they’re definitely feeding through ok to Kubernetes. We’re not maxing out the node limits of the pools (i.e. this still a problem scaling from 0->1) and the resources of the nodes (cpu/memory) are sufficient to run the flows. I’m at a loss for other reasons why flow runs that appear identical would sometimes work and sometimes not. I wanted to check if anyone had experienced this or had any ideas of where to look next?
    a
    • 2
    • 2
  • t

    Thomas Furmston

    10/12/2021, 8:16 AM
    Hi I've been playing around with prefect. Firstly, I think it is a really great tool!
  • t

    Thomas Furmston

    10/12/2021, 8:17 AM
    I have a couple of question I hope you can help clarify my understanding.
  • t

    Thomas Furmston

    10/12/2021, 8:17 AM
    Initially I had my example flow running in a docker container (on my local machine) through thr use of the
    DockerRun
    class and using
    LocalStorage
    . I had this running on a DockerAgent just fine.
  • t

    Thomas Furmston

    10/12/2021, 8:17 AM
    I then changed the storage to
    DockerStorage
    , which again I got to work. I was just wondering though, does the use of the
    DockerRun
    class do anything when I am using
    DockerStorage
    ? At first glance, it would seem not as the use of
    DockerStorage
    would imply the run would run on docker anyway. Am I missing something?
    a
    • 2
    • 7
  • t

    Thomas Furmston

    10/12/2021, 8:23 AM
    Also, it is currently not clear to me the difference between putting a selection of jobs in one flow as separate tasks vs having a selection of flows (of individual tasks) that have the dependencies set between them. In particular, I am interested in the execution resources for the different jobs.
  • t

    Thomas Furmston

    10/12/2021, 8:23 AM
    I currently have a selection of jobs that run on kubernetes that I would like to migraate to prefect. I am trying to understand how I would make the decision of which approach to take. Is a flow on prefect deployed to a kubernetes pod and all the tasks deployed on that pod? If so and the current tasks have different configurations to run on kubernetes, would it be best to split the existing jobs into different flows?
    a
    k
    • 3
    • 10
  • u

    张强

    10/12/2021, 11:16 AM
    Can I call
    subflows
    in the
    parent flow
    ?
    k
    • 2
    • 2
  • r

    Ruslan Aliev

    10/12/2021, 1:25 PM
    Hi, folks. I got 3 virtual machine(VM1, VM2, VM3) with access via corporative VPN. 1. Can I run Prefect Cloud on VM1? 2. How can I run agents on VM2&VM3 for Prefect server on VM1.
    j
    k
    • 3
    • 8
  • t

    Tony Yun

    10/12/2021, 4:08 PM
    Hi, is there a way to make schedule to run one after one? like, if the first job didn’t finish, the second one has to wait.
    k
    • 2
    • 16
  • c

    chicago-joe

    10/12/2021, 4:28 PM
    Hey guys, I'm running into an issue with registering a flow that uses CronClock with parameter default inputs. On v. 0.15.5
    ValueError: Flow could not be deserialized successfully. Error was: ValidationError({'schedule': {'clocks': {0: {'parameter_defaults': defaultdict(<class 'dict'>, {'accnt': {'value': ['Field may not be null.']}, ....
    std_inputs = { 'report':False,
                   'accnt':               None,  # string - delimited list
                   'strategy':            None,  # string - delimited list
                   'institutionalAdvisor':None,  # string - delimited list
                   'endDate':             None,  # string - 'YYYY-MM-DD'
                   'shadow':              None,  # SPY, treasury
                   'benchmark':           None,  # SPY, PUT, BXM
                   'fees':                False,  # Bool
                   'output':              'daily'  # monthly, daily
                   }
    
    with Flow('Update Performance tables',
            schedule = Schedule(
                    clocks = [
                          CronClock("7 6 * * 1-5",
                                    start_date=pendulum.now("America/Chicago"),
                                    parameter_defaults = std_inputs)]...
    Any help would be greatly appreciated!
    k
    n
    • 3
    • 10
  • n

    Nick Hart

    10/12/2021, 5:40 PM
    Hi, I have some parameters in the host_config for DockerRun that are in the child flows that I want to be able to edit/set when I go to run the parent flow. Is this currently possible in Prefect? • Is there a way for me to be able to define/set the host_config for the child process in the parent process?
    k
    • 2
    • 5
  • h

    Hugo Slepicka

    10/12/2021, 6:32 PM
    Hi, I have a flow that was registered with a local Prefect server. If I invoke
    flow.run()
    , instead of it running on the registered server with the local agent it is running on my notebook. I could not find on the docs how to trigger the execution of the registered flow using the local server. Could you point me to an example or docs (which I likely missed)? What I am looking for is a way to execute the registered flow on my server via Python.
    k
    • 2
    • 16
  • a

    Adam Brusselback

    10/12/2021, 6:45 PM
    I have a flow that has 4 tasks, first being a parameter, second returning an intermediary result, third using that result to get some more intermediary results < fails here, fourth using all these intermediary results to run a query against my database. In testing I caused some failures on the 3rd step, and when trying to restart the failed job the intermediary results from the second step are set as None
    k
    • 2
    • 11
  • d

    Dominic Pham

    10/13/2021, 1:00 AM
    Hi all, does anyone have experience importing and calling scrapy API scripts in Prefect? I am running into a
    KeyError: 'Spider not found: 'scraper'
    when I try to run debug my program. However when I try to define the function the task is calling manually, it works as intended.
    n
    • 2
    • 7
  • s

    Sean Talia

    10/13/2021, 1:51 AM
    Hi All, We're starting to onboard more and more people from our organization onto Prefect, and as we do so, one of the things we're noticing is that the flows that many people are authoring have the same handful of Python library dependencies – maybe 4 or 5 of the same libraries (e.g.
    numpy
    ,
    pandas
    ,
    snowflake-connector-python
    ). Then our users will go and write a handful of their own custom Python classes and modules that they need for their flow. In order to make these custom modules available for use in their flows, people have been creating slight variations of the same docker image that have that same set of python packages installed in it, and then just
    COPY
    their project's code into the image – at that point, their RunConfig image has everything they need in it to run their flow. One of the issues I'm foreseeing with this approach is that it's going to lead to a lot of image bloat in terms of the number of images we'll have in use across our flows – images whose Dockerfiles might be found across several different repositories – so we'll be maintaining a lot of images that hardly differ from one another save for a handful of custom Python modules that people copy into them. I'm trying to see if there's an approach that avoids this – or at least avoids it in a way that has a favorable tradeoff. Maybe instead of these custom modules needing to be available at registration/build time, they can simply be retrieved at runtime from S3, for example? If that were possible, the management overhead now moves to S3 rather than our image repository, but I think that's easier to deal with; plus many of our users who need/want to build these flows don't necessarily want to be in the business in building and managing Docker images.
    👀 1
    s
    k
    c
    • 4
    • 5
  • m

    Martin Durkac

    10/13/2021, 11:54 AM
    Hi guys, I am using prefect scheduler, for flow to repeat. Unfortunatelly, our task last longer than scheduler and that means that some things messes up. Is it possibility to set scheduler after previous successfull flow rerun same flow again and again.
    👋🏽 1
    👋 1
    a
    k
    • 3
    • 10
  • m

    Madison Schott

    10/13/2021, 2:54 PM
    Hi all, I'm getting an error in my Prefect run saying a column in my dbt model doesn't exist, but when I run the model locally everything works fine. I recently updated the Docker container so the code matches. Any ideas why I could be getting this?
    a
    • 2
    • 24
  • m

    Madison Schott

    10/13/2021, 3:53 PM
    Anyone else getting an issue like this with their Fivetran connectors in Prefect? `
    Error during execution of task: HTTPError('404 Client Error: Not Found for url: <https://api.fivetran.com/v1/connectors/%0Alethal_conservation>')
    m
    • 2
    • 2
  • l

    Lon Nix

    10/13/2021, 4:31 PM
    Hello everyone! First time user here but excited to get going. I'm installing into kubernetes via helm and I'm not able to get the chart to create a tenant. It looks like its expecting apollo on localhost:4200 but its running in a different pod so I keep getting connection issues. Thoughts?
    👋 2
    👍 1
    n
    • 2
    • 18
  • g

    Greg Adams

    10/13/2021, 4:33 PM
    Hi! I’ve been using prefect for a pandas-heavy pipeline and I started to write some very redundant code. I can’t help but think there’s a nicer way than what I’m doing to achieve the desired result. I have a couple decorators I’m inserting into a function inside the task, then calling that function at the end. It works, but was wondering if I could extend the Task class or create one decorator with all the task calling stuff. My brain feels like a pretzel, is there any python superhero who’s been through this?
    a
    m
    • 3
    • 4
  • e

    ek

    10/13/2021, 5:36 PM
    Hi everyone, I'm trying to setup prefect to use external db on aws rds and I verify that I can connect to it my rds with my user and password Here is my helm:
    NAMESPACE=prefect-server
    VERSION=2021.09.02
    
    kubectl create namespace $NAMESPACE
    helm install -n $NAMESPACE $NAMESPACE \
    --version $VERSION \
    --values ./helm/values.yaml \
    prefecthq/prefect-server
    Here is my helm
    values.yml
    postgresql:
      postgresqlDatabase: postgres
      postgresqlUsername: postgres
      existingSecret: [secret]
      servicePort: 5432
      externalHostname: "x.x.x.x"
      useSubChart: false
    Is there anything else I'm missing to set in my value.yml? much appreciate it!
    n
    • 2
    • 3
  • s

    Sergey Shamsuyarov

    10/13/2021, 7:30 PM
    Hi, I have a problem with prefect core. I dynamically create a cron clock with different parameters and add them to the schedule. When I try to run them in one flow, then if the execution time overlaps, then only the first is executed. I thought it was solved https://github.com/PrefectHQ/prefect/pull/3394, but i have same problem.
    def gen_prefect_cronclock(config, report_conf_path):
        clocks = []
        for report, cron in get_all_reports_cron_dict(report_conf_path).items():
            for tz, ppk_name_list in get_tz_ppk_dict(config).items():
                clocks.append(CronClock(cron,
                                        start_date=pendulum.datetime(
                                            1970, 1, 1, tz=tz),
                                        parameter_defaults={
                                            'report_path': report, 'ppk_name_list': ppk_name_list}
                                        )
                              )
        return clocks
    run flow
    with Flow("main-report-flow", main_schedule) as main_flow:
        conf = Parameter('config', default=config)
        report_config_path = Parameter('report_config_path',
                                       default=REPORT_CONF_PATH)
        report_path = Parameter('report_path', default="")
        ppk_name_list = Parameter('ppk_name_list', default=[])
        tasck_execute_send_report.map(unmapped(conf),
                                      unmapped(report_config_path),
                                      unmapped(report_path),
                                      ppk_name_list)
    a
    • 2
    • 6
Powered by Linen
Title
s

Sergey Shamsuyarov

10/13/2021, 7:30 PM
Hi, I have a problem with prefect core. I dynamically create a cron clock with different parameters and add them to the schedule. When I try to run them in one flow, then if the execution time overlaps, then only the first is executed. I thought it was solved https://github.com/PrefectHQ/prefect/pull/3394, but i have same problem.
def gen_prefect_cronclock(config, report_conf_path):
    clocks = []
    for report, cron in get_all_reports_cron_dict(report_conf_path).items():
        for tz, ppk_name_list in get_tz_ppk_dict(config).items():
            clocks.append(CronClock(cron,
                                    start_date=pendulum.datetime(
                                        1970, 1, 1, tz=tz),
                                    parameter_defaults={
                                        'report_path': report, 'ppk_name_list': ppk_name_list}
                                    )
                          )
    return clocks
run flow
with Flow("main-report-flow", main_schedule) as main_flow:
    conf = Parameter('config', default=config)
    report_config_path = Parameter('report_config_path',
                                   default=REPORT_CONF_PATH)
    report_path = Parameter('report_path', default="")
    ppk_name_list = Parameter('ppk_name_list', default=[])
    tasck_execute_send_report.map(unmapped(conf),
                                  unmapped(report_config_path),
                                  unmapped(report_path),
                                  ppk_name_list)
a

Anna Geller

10/13/2021, 8:04 PM
Hi @Sergey Shamsuyarov, I was trying to reproduce the issue but the parameters attached to the schedule are working properly. The following flow generates 2 flow runs every minute: one with the parameter Sergey, and one with parameter Anna.
from prefect import Flow, Parameter, task
from prefect.schedules.clocks import CronClock
from prefect.schedules import Schedule
import pendulum


clock_1 = CronClock(
    "*/1 * * * *",
    start_date=pendulum.now(),
    parameter_defaults={"user_input": "Sergey"},
)

clock_2 = CronClock(
    "*/1 * * * *", start_date=pendulum.now(), parameter_defaults={"user_input": "Anna"}
)

schedule = Schedule(clocks=[clock_1, clock_2])


@task(log_stdout=True)
def hello_world(user_input: str):
    print(f"hello {user_input}")


with Flow("test-flow", schedule=schedule) as flow:
    param = Parameter("user_input", default="Marvin")
    hw = hello_world(param)
Can it be that perhaps your schedule doesn’t get attached properly to the flow object? The overlapping schedule doesn’t seem to be the issue here.
s

Sergey Shamsuyarov

10/14/2021, 6:11 AM
Hi, thanks for the help. I run your test. and get output, with only one parameter
[2021-10-14 06:02:21+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:03:00+00:00
[2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | hello Anna
[2021-10-14 06:03:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
[2021-10-14 06:03:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-10-14 06:03:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:04:00+00:00
[2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'test-flow'
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Starting task run...
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'user_input': Finished task run for task with final state: 'Success'
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | hello Anna
[2021-10-14 06:04:00+0000] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
[2021-10-14 06:04:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-10-14 06:04:00+0000] INFO - prefect.test-flow | Waiting for next scheduled run at 2021-10-14T06:05:00+00:00
i use prefect 0.15.1 in jupyterlab enviroment base on Ubuntu 20.04.3 LTS.
a

Anna Geller

10/14/2021, 7:50 AM
To test overlapping schedules, it’s best if you register your flow and check the output in Prefect Cloud or Server UI. The built-in scheduler that you get with Prefect Core is more of a convenience method than a real scheduler. You can find more about it here.
s

Sergey Shamsuyarov

10/14/2021, 11:18 AM
Ок thanks, i try up local server, but for prefect.core is it correct? When I create a clock with different start times the parameters from clock are transmitted normally
a

Anna Geller

10/14/2021, 11:25 AM
If this is the first time you want to deploy your Prefect Core flows, it’s much easier to get started with Prefect Cloud. You can sign up for a Starter or Standard tier here and your first 10000 successful tasks per month are for free so that you can try everything this way. Once you signed up, you can create an API key, authenticate with this API key, and you can start registering and scheduling your flows. More details on that are here.
s

Sergey Shamsuyarov

10/14/2021, 2:01 PM
yes i try my code with local agent, when i use flow.register in prefect cloud alweys work fine as in doc.
👍 1
View count: 3