https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Abhas P

    09/02/2021, 6:21 PM
    Hi, if I am trying to run a heavy transform task on a large data frame -
    @task
    def transform(df):
       return transformed_df
    
    with Flow("transform-1") as flow:
       transform(df).                 # current implementation
       for each df in filtered_dfs:   # suggested way to utilize paralellization optimization on tasks
          transform(df)
    1. Will breaking down the data frame into small chunks and the calling the transformation task on each batch of the dataframe help me reap the benefits of prefect parellelization using a dask executor? 2. Given that the input to a transform task is a large dataframe - what other steps can I consider to optimize the turn around time of the flow ? (suggest using different run configs and prefect paradigms of writing the code)
    d
    • 2
    • 3
  • j

    Jeremy Yeo

    09/02/2021, 8:24 PM
    Hey prefecters... is there a mechanism for mapping task args to a map? Specifically trying to set the name of the task... you can see from the example below only the static name seems to work...
    dirs = ["dir1", "dir2", "dir3"]
    commands = [f"rm -rf {d}" for d in dirs]
    names = [{"name": f"removing dir {d}"} for d in dirs]
    
    shell = ShellTask()
    
    with Flow as f:
        shell.map(command = commands, task_args={"name": "removing dir"})
        # shell.map(command = commands, task_args={"name": dirs})          # not working
        # shell.map(command = commands, task_args=names)                   # not working
    
    f.run()
    Possibly not getting some syntax right. Thanks.
    c
    m
    k
    • 4
    • 6
  • b

    Brad

    09/03/2021, 1:44 AM
    Hi, I’ve opened https://github.com/PrefectHQ/prefect/discussions/4935 to discuss data lineage if anyone is interested
    👍 3
    k
    • 2
    • 2
  • b

    Brad I

    09/03/2021, 5:41 AM
    Follow up from my message above about the
    DaskExecutor
    not getting the env vars set in the run config, this has caused other issues in the way we have been deploying flows to different environments (test, dev, prod, etc). Previously, using the
    LocalDaskExecutor
    , we could set the executor on the flow itself and it was generic enough to work across all of our environments. Now we also need to change the dask config depending on our target environment, but it seems like setting the executor outside of the
    with Flow()
    definition just gets ignored. I believe this has something to do with the
    storage
    and
    run_config
    being serialized with the flow but the
    executor
    is not right now. An example flow and registration script is attached inside this thread. Does anyone know of a work around for this issue or have a better way to do this?
    k
    m
    • 3
    • 8
  • a

    Arun Dass

    09/03/2021, 6:01 AM
    Is there a setting for registering a workflow at a certain interval (say every 1 hr) but having the workflow start instantly then wait 1 hr for the next run?
    k
    • 2
    • 1
  • g

    g.suijker

    09/03/2021, 7:54 AM
    Hi, today I'm getting errors about Prefect not finding my secrets: KeyError: 'The secret xxx was not found. Please ensure that it was set correctly in your tenant: https://docs.prefect.io/orchestration/concepts/secrets.html'. This happens during a flow where I use the same secret multiple times to access a data source, after a while I get this error. But it must have found the secret before, since the data source was accessed earlier in the flow. Anyone else getting the same errors?
    ✅ 1
    k
    • 2
    • 5
  • b

    Bastian Röhrig

    09/03/2021, 9:49 AM
    Hey everyone, we have a flow that creates another flow and reacts to its results using
    create_flow_run
    and
    get_task_run_result
    . We unit tested the parent flow using mocks to simulate the possible results of the child flow. However, it would be great to have a basic integration test where we ideally only mock the outside dependencies like sql and api calls. Do you have any ideas how to achieve that?
    k
    • 2
    • 2
  • p

    Paulo Maia

    09/03/2021, 10:44 AM
    Hi, I need some help forcing a linear dependency between my tasks:
    execution_id = task_run_inference()
    task_decision_process(execution_id, config, upstream_tasks=[task_run_inference])
    
        task_append_to_m2m(execution_id, config, upstream_tasks=[task_decision_process])
    
        task_append_to_m2r(execution_id, config, upstream_tasks=[task_decision_process])
    
        task_run_inference_cleanup(execution_id, config, upstream_tasks=[task_append_to_m2r, task_append_to_m2m])
    This is returning a weird dependency plot where task_run_inference is pointing to task_append_to_m2r. Do I need to explicitly set all tasks as upstream? wont this create a dependency between them?
    ✅ 1
    a
    • 2
    • 6
  • q

    Qin XIA

    09/03/2021, 1:12 PM
    Hello, I do need some information for passing "endpoint_url" to prefect.tasks.aws.s3.S3List.run to change default endpoint URL. Is it possible in PREFECT? Like `aws s3 ls --endpoint-url https://s3.xxx.xxx.xxxx/`in aws cli. Thx a lot
    ✅ 1
    k
    b
    • 3
    • 11
  • s

    Shyam

    09/03/2021, 3:45 PM
    Hello everyone, I am new to the prefect and find it interesting. One question I have is, how do I push different versions of workflow code? Whenever I push a new version of code, prefect cloud does not pick it up. I have to create a new flow to reflect the change. 1. Is there a way to update the current version of code? 2. How to push a new version of the code? I am using prefect cloud
    k
    • 2
    • 4
  • k

    Kevin Mullins

    09/03/2021, 4:57 PM
    Hello, hopefully an easy question. What backing cloud provider is Prefect Cloud using? I need to be able to answer this for bureaucratic reasons.
    k
    • 2
    • 1
  • k

    Kevin Mullins

    09/03/2021, 7:52 PM
    I’m having some difficult deploying the prefect agent into Kubernetes using a namespace and service account. When trying to trigger a flow I get the following error. It looks like it’s trying to use the
    default
    service account in the
    prefect
    namespace I’m attempting to use:
    system:serviceaccount:prefect:default
    (403)
    Reason: Forbidden
    HTTP response headers: HTTPHeaderDict({'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '0cb3f7e4-3156-4e9e-b025-5c9deb274813', 'X-Kubernetes-Pf-Prioritylevel-Uid': '96071337-eda2-47ce-9026-95ed0ab85b02', 'Date': 'Fri, 03 Sep 2021 19:49:12 GMT', 'Content-Length': '311'})
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:prefect:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"prefect\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
    I specified SERVICE_ACCOUNT_NAME in the kubernetes manifest but it doesn’t seem to be honoring it. Am I perhaps missing another configuration?
    k
    • 2
    • 26
  • j

    Jessica Smith

    09/03/2021, 9:29 PM
    quick question - when adding a label to a flow, what is supposed to happen to future scheduled flows? Should they get the labeled applied automatically? I had thought they would, but I had to turn off the schedule and then turn it back on for new flow runs to generate with the new label
    k
    • 2
    • 5
  • v

    Vincent

    09/03/2021, 10:07 PM
    Has anyone noticed any degraded service on prefect cloud. It seems that my flows keep exiting before completion (no error message on cloud)
    k
    • 2
    • 9
  • j

    Josiah Berkebile

    09/04/2021, 5:24 PM
    Does Prefect have something analogous to the Airflow Operator for starting, monitoring, and terminating AWS EMR clusters?
    k
    c
    • 3
    • 10
  • t

    Tim Chklovski

    09/04/2021, 7:18 PM
    Has anyone experimented with rigging prefect flows which execute jupyter notebooks? We have folks who author scrapes in a no-code tool, and then they may need to set up scheduled postprocessing of incremental ingested data. Anyone doing something similar?
    k
    • 2
    • 6
  • j

    Julie Sturgeon

    09/05/2021, 12:37 AM
    @channel Experiencing a weird issue with Prefect Cloud. We have twice experienced flows that have stopped scheduling themselves ~1.5 hours after the scheduler is enabled (see image). This flow was deployed yesterday afternoon and is scheduled to run every 10 minutes. The first instance of schedule “running out” happened at 2:40 PM (PST). Disabling and renabling the flow schedule allowed more flow runs to be schedule. This happened again at 4:40 PM PST. Tagging @nicholas and also going to follow up with sales@prefect.io. Hoping to get this resolved soon so my devs and I can avoid flipping switches every 1.5 hours and enjoy our long weekend 😉
    j
    c
    • 3
    • 4
  • o

    Omar Sultan

    09/05/2021, 12:19 PM
    Hi Everyone, I am using Webhook Storage to register flows on our onprem prefect server, and the flows are configured to used Kubernetes Run .. .I am trying to understand who does the actual GET API Call to the webhook to download the file (Agent Pod, Job Pod) and when does that happen? Does it happen when the flow is about to run or when the flow is being registered?
    k
    • 2
    • 2
  • o

    Omar Sultan

    09/05/2021, 1:51 PM
    I'm wondering if anyone has a custom job_template YAML for KubernetesConfig that they used to assign some environment variables. I can find it in the documentation for the legacy components but not for the current one. Thanks
    k
    • 2
    • 4
  • h

    Hari Krishna Sunkari

    09/06/2021, 8:14 AM
    Using prefect, is it possible to implement tasks in different programming languages, we already have our business login in Nodejs and I couldn't find any docs related to this
    j
    k
    • 3
    • 2
  • b

    Benjamin Rousselière

    09/06/2021, 12:24 PM
    Hi ! I'm trying to register a flow from a flask API. This flow register successfully, but i always get an error in the prefect logs (c.f. picture). Do you have any idea ? Tell me if you need more informations.
    k
    • 2
    • 10
  • b

    Benjamin Rousselière

    09/06/2021, 12:24 PM
    Thanks you !
  • m

    Michael

    09/06/2021, 12:57 PM
    I have a (maybe silly) question. Is it expected that when mapping tasks, the flow exists when the first element of the map succeeds (i.e. the first branch of the reference task)? That seems to be what’s happening to me, but perhaps I’ve missed an implementation detail. As a followup, if I want to wait for all mapped results, do I have to make the final reference task a reduction over upstream mapped results?
    k
    • 2
    • 3
  • m

    Michael Hadorn

    09/06/2021, 1:36 PM
    Hi there I'm facing again this error with the current version 0.15.5:
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='host.docker.internal', port=4200): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f43cca967c0>: Failed to establish a new connection: [Errno 111] Connection refused'))
    First time i got this was in 0.14.19, but not in 0.15.4.
    k
    • 2
    • 9
  • t

    Tim Pörtner

    09/06/2021, 3:34 PM
    Hello everybody! My colleague (@Greg Roche) and i tried to update our self hosted prefect server (on EC2) today (from 0.14.15 to the latest version - 0.15.5). After updating prefect we tried running the server but we saw already some errors in the console. We were still able to connect to the UI and we saw that all the connections to
    :4200/graphql
    were refused. After some trying we gave up and downgraded back to 0.14.15 again (it worked again but we lost the data - flows, flow runs etc.) - no big deal. I was curious and thought if connections were just refused it shouldn't be too difficult to get it running - so i created my own
    docker-compose.yaml
    file from
    docker prefect config
    and changed some values. I ended up changing the interfaces that the services are listening on (graphql,apollo,ui) to 0.0.0.0 - after that it worked again. This makes it impossible for us to update our prefect server 😞 I couldn't find anything in the
    prefecthq/server
    repo that could cause those connection problems (at least from the most recent changes) - could someone please look into this? If you need any more information please let me know 👍 Thanks in advance!
    k
    • 2
    • 3
  • f

    Fabio Grätz

    09/06/2021, 4:16 PM
    Hey prefect community, I’m building a prototype of an ETL pipeline that is supposed to be run on dask/kubernetes. Let’s assume the dummy ETL pipeline is supposed to download 50k images from a bucket, rotate them, and write them to another bucket. To do so, I created three separate tasks:
    @task
    def load_image_from_bucket(fn):
        # Load image img from filepath fn in bucket
        return img, fn
    
    @task
    def rotate_image(x):
        img, fn = x
        return img.rotate(90), fn
    
    @task
    def save_image_to_bucket(x):
        # Save rotated image to new bucket
    The flow looks as follows:
    with Flow(
                'Load images from bucket',
                run_config=KubernetesRun(),
                executor=DaskExecutor(...)
              ):
        # Get list of image uris `imgs`
        loaded_images = load_image_from_bucket.map(imgs)
        rotated_images = rotate_image.map(loaded_images)
        save_image_to_bucket.map(rotated_images)
    According to the docs, this should create parallel pipelines (without reducing after getting e.g.
    loaded_images
    ):
    However, if a mapped task relies on another mapped task, Prefect does not reduce the upstream result. Instead, it connects the nth upstream child to the nth downstream child, creating independent parallel pipelines.
    From what I observe when using the `DaskExecutor(…)`and applying the above flow to 500 images, all workers only download images at first and only start rotating once all have been loaded. It either takes multiple minutes until they start rotating and uploading but often the flow just crashes before the flow reaches that point. I would want depth-first execution: a worker downloads an image, rotates it, pushes it to the bucket, forgets about this image and moves on to the next one. This appears to be (roughly) the case when running with the following config using
    executor=LocalDaskExecutor()
    instead of an ephemeral dask cluser in k8s:
    with Flow("name") as flow:
       ...
    
    state = flow.run(executor=LocalDaskExecutor())
    Transforming 500 images in a bucket then takes only ~50s. Is there a way to influence the order in which the tasks are executed when using the DaskExecutor with an ephemeral dask cluster in k8s? Thanks a lot!
    k
    • 2
    • 11
  • f

    Fabio Grätz

    09/06/2021, 5:25 PM
    I’m experimenting with a preprocessing pipeline using a DaskExecutor using an ephemeral dask cluster within a k8s cluster. I’m seeing this warning:
    //anaconda3/envs/sandbox/lib/python3.8/site-packages/distributed/worker.py:3862: UserWarning: Large object of size 1.65 MiB detected in task graph:
      {'task': <Task: load_image_from_bucket>, 'state':  ... _parent': True}
    Consider scattering large objects ahead of time
    with client.scatter to reduce scheduler burden and
    keep data on workers
    Is it a bad idea to pass e.g. images around between prefect tasks?
    k
    • 2
    • 10
  • j

    Jeffery Newburn

    09/06/2021, 5:29 PM
    When I run a flow without using all the parameters I get an error Flow:
    with Flow("ROI Report Generator") as flow:
    
        start_date = Parameter(name="start_date")
        end_date = Parameter(name="end_date")
        to_list = Parameter(name="to_list")
        email_domain = Parameter(name="email_domain", default="")
        account_uuid = Parameter(name="account_uuid")
    Run:
    state = flow.run(
                account_uuid=527132950,
                start_date="2020-11-01",
                email_domain="",
                end_date=None,
                to_list=["<mailto:jeff.newburn@logikcull.com|jeff.newburn@logikcull.com>"],
            )
    Error:
    TypeError: run() got an unexpected keyword argument 'account_uuid'
    Can someone help me understand what is going on here? Why do Parameters have to be used in a flow for the flow not to explode?
    k
    • 2
    • 7
  • b

    Blake List

    09/07/2021, 2:09 AM
    Hi there, what is the best way to trigger a flow from the state of another flow/s within a flow of flows? I.e, given flows
    a
    ,
    b
    , and
    c
    started within parent flow
    p
    using
    startflowrun
    , how can I trigger (or not trigger) flow
    c
    , given the states of flows (startflowrun tasks)
    a
    and
    b
    ? Within flows
    a
    and
    b
    , there is a task to skip downstream tasks (raising a SKIP signal) based on a condition. Thanks in advance.
    k
    • 2
    • 1
  • o

    Omar Sultan

    09/07/2021, 6:17 AM
    Hi guys, I am trying to understand a specific behavior when flows are being built and sent to the server. In one of my tasks I am defining the URL for Livy as follows
    livy_url = f"https://{host}:{port}/gateway/default/livy/v1"
    host and port are referring to variables that have the following code:
    os.environ.get('LIVY_HOST')
    and
    os.environ.get('LIVY_PORT')
    the livy_url is getting the variables from my local machine when the flow is being built. and thus the values for host and port are incorrect.
    j
    • 2
    • 2
Powered by Linen
Title
o

Omar Sultan

09/07/2021, 6:17 AM
Hi guys, I am trying to understand a specific behavior when flows are being built and sent to the server. In one of my tasks I am defining the URL for Livy as follows
livy_url = f"https://{host}:{port}/gateway/default/livy/v1"
host and port are referring to variables that have the following code:
os.environ.get('LIVY_HOST')
and
os.environ.get('LIVY_PORT')
the livy_url is getting the variables from my local machine when the flow is being built. and thus the values for host and port are incorrect.
j

Julian

09/07/2021, 8:34 AM
If you place the assignments of host and port inside a task, those variables will be evaluated on the machine running your flow instead of the machine where you build&register the flow
👍 1
o

Omar Sultan

09/07/2021, 9:00 AM
Hi Julian, I think i got where the problem is. You are right. The assignments were being done in my code and then being passed to the task while setting up the task. Moved them inside the task and all is good. Thanks for that
👍 1
View count: 4