https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    ciaran

    03/25/2021, 11:28 AM
    Is there a way to parameterise a Flows storage? I.E I want to use S3 but the bucket name is likely to change with multiple deployments of my infrastructure - Can I pass this name to the flow before running it?
    • 1
    • 1
  • c

    ciaran

    03/25/2021, 12:04 PM
    Also I've noticed that the docs for
    0.14.3
    are not aligned to the commands I'm able to run...
    $ prefect version
    0.14.3
    $ prefect register -h
    Usage: prefect register [OPTIONS] COMMAND [ARGS]...
    
      Register flows
    
      Usage:
          $ prefect register [OBJECT]
    
      Arguments:
          flow    Register flows with a backend API
    
      Examples:
          $ prefect register flow --file my_flow.py --name My-Flow
    
    Options:
      -h, --help  Show this message and exit.
    I'd expect to see
    Register one or more flows into a project.
    
    Options:
      --project TEXT     The name of the Prefect project to register this flow in.
                         Required.
    
      -p, --path TEXT    A path to a file or a directory containing the flow(s) to
                         register. May be passed multiple times to specify
                         multiple paths.
    
      -m, --module TEXT  A python module name containing the flow(s) to register.
                         May be passed multiple times to specify multiple modules.
    
    ...
    As defined in the docs at https://docs.prefect.io/api/latest/cli/register.html
    g
    • 2
    • 5
  • c

    ciaran

    03/25/2021, 1:48 PM
    Does someone have the policy definition an ECS Agent itself requires? Just had mine fall over because it needed to Describe VPCs, would be handy to just give it all the permissions it needs than doing them 1 by 1 😄
    👀 1
    m
    • 2
    • 16
  • r

    Ryan Abernathey

    03/25/2021, 1:52 PM
    Hi @ciaran
    c
    • 2
    • 1
  • r

    Ryan Abernathey

    03/25/2021, 1:56 PM
    A Dask question. I frequently use
    Client.register_worker_plugin
    (https://docs.dask.org/en/latest/futures.html#distributed.Client.register_worker_plugin) with my Dask clusters to install custom pip packages (e.g. install from dev branch for debugging.) If I am using prefect with an existing dask scheduler, how can I call
    client.register_worker_plugin
    ? I tried
    executor = DaskExecutor(
        address=cluster.scheduler_address,
        client_kwargs={"security": cluster.security}
    )
    executor.client.register_worker_plugin
    but it looks like the
    client
    attribute on
    executor
    is None.
    m
    • 2
    • 8
  • a

    Amit Gal

    03/25/2021, 2:11 PM
    Hi all! Does anyone have a best practice for a “prefect-equivalent” list comprehension on the result of a mapped task? Trying:
    from prefect import Flow, task
    
    @task()
    def some_task(x):
        return x+1
    
    some_list = [1, 2, 3, 4]
    
    with Flow("list_comp") as list_comp_flow:
        mapped_result = some_task.map(some_list)
        list_comp = [result+1 for result in mapped_result]
    results in the following error:
    TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.
    Which makes sense of course, since while building the flow,
    mapped_result
    is still a
    Task
    , not a
    list
    .
    s
    k
    • 3
    • 9
  • s

    Steve Aby

    03/25/2021, 4:13 PM
    Hi everyone. Not sure if this is the right forum for this question but let me give it a try. I want to setup a 45 day schedule where I would run a set of tasks each day. My question is how do I go about testing this? I would like to "change" the date each pass in a loop controlling the test. I could not not find any guidance how to test something like this 45 day schedule in an easy way without running it for 45 days. I was originally trying IntervalClock or DatesClock. I assume there is an easier way but could not find anything. Any guidance is most appreciated!!
    m
    • 2
    • 4
  • m

    Marwan Sarieddine

    03/25/2021, 4:17 PM
    Hi Folks, what is the current approach to “force interrupt” a flow and its tasks when cancelling? The reason I ask is currently when one sets the state of the flow as “Cancelled” - prefect will try to cancel the running tasks gracefully (i.e. wait for the task to complete before cancelling).
    j
    • 2
    • 17
  • j

    Josh Greenhalgh

    03/25/2021, 4:54 PM
    Hi I am getting this erro on CI when I try to register my flow (which uses the
    RunNamespacedJob
    task); How can I force it to defer trying to access the cluster until its actually running?
    n
    j
    • 3
    • 8
  • c

    ciaran

    03/25/2021, 5:38 PM
    Any idea how I can progress with:
    Failed to load and execute Flow's environment: StorageError("An error occurred while unpickling the flow:\n  TypeError('code() takes at most 15 arguments (16 given)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n  - python: (flow built with '3.8.6', currently running with '3.7.10')")
    ? For background I'm using an ECS Agent.
    j
    j
    • 3
    • 17
  • c

    ciaran

    03/25/2021, 6:19 PM
    Can you point https://docs.prefect.io/orchestration/flow_config/executors.html#connecting-to-an-existing-cluster to a ECS cluster? I.E I have an ECS Agent deployed as a task in a AWS ECS Cluster, can I point the DaskExecutor to the same cluster to contain everything?
    j
    • 2
    • 26
  • r

    Renzo Becerra

    03/25/2021, 6:21 PM
    Hi I am passing a networkConfiguration yaml file at agent start, yet running into this botocore error on flow execution: I'm unable to start an ecs agent w/out a path to networkConfig (i get an infer default config error) however flows are failing to run. any insight here? thanks botocore.exceptions.ParamValidationError: Parameter validation failed: Unknown parameter in input: "networkConfiguration", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators
    s
    m
    • 3
    • 26
  • b

    Berty

    03/25/2021, 7:30 PM
    Is there a prefect cloud status page? I was getting a cloudfare error this morning and now I'm getting
    NET::ERR_CERT_COMMON_NAME_INVALID
    for prefect.io
    c
    • 2
    • 3
  • j

    Jack Sundberg

    03/25/2021, 7:48 PM
    Hey everyone! Quick question -- When you reduce flow concurrency limits while its use is already at 100%, will some active flow runs be canceled to meet the new limit? Or will this have a "draining" effect, where flow runs are allowed to finish first?
    c
    m
    • 3
    • 4
  • t

    Trevor Kramer

    03/25/2021, 10:15 PM
    How can I get more information about why a task failed?
    [2021-03-25 17:12:55-0500] DEBUG - prefect.TaskRunner | Task 'AWSClientWait': Handling state change from Running to Success
    [2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'AWSClientWait': Finished task run for task with final state: 'Success'
    [2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'pruning': Starting task run...
    [2021-03-25 17:12:55-0500] DEBUG - prefect.TaskRunner | Task 'pruning': Handling state change from Pending to Failed
    [2021-03-25 17:12:55-0500] INFO - prefect.TaskRunner | Task 'pruning': Finished task run for task with final state: 'Failed'
    It seems to be failing before it even gets called.
    m
    • 2
    • 4
  • m

    matta

    03/25/2021, 11:30 PM
    Is there a way to force the Docker storage to rebuild Python packages from scratch? We're having some trouble with cached versions causing problems between flows. Ops is trying to fix it from the CI/CD pipeline side, but I wonder if there's a way to do it from within Prefect?
    t
    m
    • 3
    • 12
  • p

    Po Stevanus

    03/26/2021, 8:09 AM
    Hi Guys, i hope this is the correct place to ask this question. I have an error when running my flow and this is my error message
    Failed to load and execute Flow's environment: Forbidden('GET <https://storage.googleapis.com/storage/v1/b/bonza-dev-files?projection=noAcl&prettyPrint=false>: Caller does not have storage.buckets.get access to the Google Cloud Storage bucket.')
    Background: • Agent: Kubernetes Autopilot • Storage: Google Cloud Storage What i’ve done: 1. I have prepare a service account with
    Storage.Admin
    role, i called it
    OLYMPUS_DEV_SA
    2. When declaring the storage i have mention this
    storage = GCS(bucket="bonza-dev-files", project="bonza-dev", secrets=["OLYMPUS_DEV_SA"])
    If it helps, i’ve attach the code to this thread Is there a way to debug this? 🙇
    m
    • 2
    • 2
  • m

    Mahesh

    03/26/2021, 10:23 AM
    Hello Team, I am trying to execute below flow, It is working fine with flow.run(). But when i trigger this same flow with quick run in prefect UI with local agent, It is getting succeeded with in a few seconds but queries are not submitted with snowflake account.
    k
    • 2
    • 21
  • c

    ciaran

    03/26/2021, 11:48 AM
    Hey folks 👋 Having some difficulties installing
    dask-cloudprovider[aws]==2021.3.0
    when I have
    prefect[aws]==0.14.13
    installed. It seems they have different requirements for botocore:
    There are incompatible versions in the resolved dependencies:
      botocore<1.19.53,>=1.19.52 (from aiobotocore==1.2.2->dask-cloudprovider[aws]==2021.3.0->-r /var/folders/kf/93zlmdv15vz6sjhr2xd0j7y40000gn/T/pipenv_bvj4rpkrequirements/pipenv-1_o8bqwg-constraints.txt (line 6))
      botocore<1.21.0,>=1.20.38 (from boto3==1.17.38->prefect[aws]==0.14.13->-r /var/folders/kf/93zlmdv15vz6sjhr2xd0j7y40000gn/T/pipenv_bvj4rpkrequirements/pipenv-1_o8bqwg-constraints.txt (line 5))
    Is there a specific version of
    dask-cloudprovider
    that
    prefect
    works with?
    👀 2
    j
    • 2
    • 80
  • g

    Greg Roche

    03/26/2021, 3:18 PM
    Hi folks, I'm running into an issue when trying to register a flow which has some labels already added to the run config. Calling
    flow.register()
    on such a flow fails with error
    'list' object has no attribute 'update'.
    A reproducible example is in the thread. Has anybody come across this before or knows how to resolve it?
    n
    m
    j
    • 4
    • 10
  • b

    Ben Fogelson

    03/26/2021, 3:26 PM
    Is there a way for a running task to know whether its upstream tasks succeeded or failed? The idea is I want a task whose trigger is
    all_finished
    but that does different things depending on whether it would have been triggered by
    all_successful
    or
    some_failed
    .
    n
    m
    • 3
    • 10
  • r

    Ryan Abernathey

    03/26/2021, 3:37 PM
    I had a flow die 80% of the way a big mapped task. (It died due to a problem with my dask cluster that has been resolved.) Is there a way to restart the flow from where it left off? I'm not using cloud.
    j
    c
    • 3
    • 9
  • l

    liren zhang

    03/26/2021, 3:47 PM
    Hi experts, just wondering if
    StartFlowRun(flow_name="A", project_name="examples", wait=True)
    can execute child flow that has not been registered. Basically, I would like to test everything locally before I register anything.
    n
    • 2
    • 1
  • k

    kevin

    03/26/2021, 4:39 PM
    is it kosher to invoke a task inside a task?
    j
    • 2
    • 16
  • c

    Christian Eik

    03/26/2021, 6:06 PM
    hey everyone, beginner question: i have a flow that looks like this:
    with Flow('fiege_po_uploads', schedule=cron_schedule) as fiege_po_uploads:
        po_nos_query = build_po_nos_query.map(brand=brands)
        po_nos = snowflake_queryfetch.map(
            query=po_nos_query, upstream_tasks=[po_nos_query])
        po_query = build_po_data_query.map(
            po_no=po_nos, brand=brands, upstream_tasks=[po_nos])
    brands is a list of 2 strings. in the third task I'm trying to use 2 mapped arguments and i can kinda understand that this doesn't really work. however what is the proper way to do something like this? i'm a complete beginner to both prefect and anything functional, so i probably have a conceptual misunderstanding here. essentially what i'm trying to do here is having an outside loop over
    brands
    , an inside loop over
    po_nos
    , and be able to use the value of
    brands
    in every inner iteration.
    k
    • 2
    • 15
  • v

    Vincent

    03/26/2021, 6:09 PM
    Hi all. I was wondering if it is possible to set a default context when registering a flow. (similar to how
    Parameter
    can have a default) I know that the client can submit a context but it would be a nice feature to also specify a persistent default context for every run.
    n
    • 2
    • 5
  • m

    Mary Clair Thompson

    03/26/2021, 6:24 PM
    Hi folks! This is a really simple question but I'm not finding the answer in the documentation. I have a particular flow that occasionally fails due to temporary network outages, and when it does fail i'd like to trigger the entire flow to run again. I'm not finding anything super straightforward such as a flow-level parameter that retries the flow. Can you all point me to documentation for how to do this?
    c
    • 2
    • 2
  • l

    Louis-David Coulombe

    03/26/2021, 7:56 PM
    Hi guys! I have a small issue with Github storage with my flow. My flow has grown to a point where I need to split tasks and functions into different files. My flow are stored in a flows folder (with __init.py) and I have submodules into the flows folder where I store my common functions. When I try to register my flow, I get ModuleNotFoundError: No module named 'flows' when I use from flows.aws.s3 import I get KeyError: "'__name__' not in globals" when I use from .aws.s3 import Any idea?
    j
    • 2
    • 3
  • l

    Luke Orland

    03/26/2021, 8:29 PM
    If I cancel a flow run, should I expect the Fargate task executing the flow to automatically be stopped? I've found that I have to manually stop it.
    j
    • 2
    • 1
  • c

    Colin Dablain

    03/27/2021, 12:36 AM
    Hello! Conceptual question: how do you create a flow that starts parallel tasks from an generator? I have a large file that I want to transform in chunks because it is too big load it into memory, so I wrote an generator that yields chunks of the file. My first thought was to use the LocalDaskExecutor and try mapping the generator to my transform task, but because generators aren't subscriptable, I get
    TypeError: Cannot map over unsubscriptable object of type <class 'generator'>:
    Is there something I'm missing conceptually/am I framing my problem incorrectly? I can provide code samples that I've tried if that would help. Thanks!
    m
    m
    • 3
    • 4
Powered by Linen
Title
c

Colin Dablain

03/27/2021, 12:36 AM
Hello! Conceptual question: how do you create a flow that starts parallel tasks from an generator? I have a large file that I want to transform in chunks because it is too big load it into memory, so I wrote an generator that yields chunks of the file. My first thought was to use the LocalDaskExecutor and try mapping the generator to my transform task, but because generators aren't subscriptable, I get
TypeError: Cannot map over unsubscriptable object of type <class 'generator'>:
Is there something I'm missing conceptually/am I framing my problem incorrectly? I can provide code samples that I've tried if that would help. Thanks!
m

matta

03/27/2021, 1:29 AM
I think you might be able to do this with a Dask Bag instead of a Generator? https://examples.dask.org/bag.html
m

Michael Adkins

03/27/2021, 1:54 AM
You could also look into task looping https://docs.prefect.io/core/advanced_tutorials/task-looping.html#introducing-task-looping
We're considering supporting
yield
/ generators in tasks but there's not even a design document yet.
c

Colin Dablain

03/27/2021, 2:55 AM
@matta Thanks! I looked into the bags, that would work if I had already divided the large file into smaller files, but it didn't work with a single file to start. @Michael Adkins I'll check that out! Digging around the Dask docs some more, it seems like you'd have to use the Dask Advanced API and async to implement generators
View count: 1