https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Dan Stoner

    09/01/2021, 2:28 PM
    I registered a Flow to the wrong Project. How do I delete it from that Project? The cli seems like it should but it does not seem to understand anything other than
    delete project
    .
    $ prefect delete --help                                                                                                                                                     
    Usage: prefect delete [OPTIONS] COMMAND [ARGS]...
    
      Delete commands that refer to mutations of Prefect API metadata.
    
      Usage:
          $ prefect delete [OBJECT]
    
      Arguments:
          project    Delete projects
    ...
    k
    • 2
    • 3
  • m

    marios

    09/01/2021, 2:41 PM
    Hey! How can I mount volumes on a specific flow without affecting the global configuration of docker agent as described here https://docs.prefect.io/orchestration/agents/docker.html#mounting-volumes
    👀 2
    k
    m
    • 3
    • 4
  • a

    An Hoang

    09/01/2021, 3:28 PM
    Hi, is there a way to set one
    Parameter
    as the default for another? Here's what I did
    #task 
    @task(target= {output_folder_path}/result)
    def example_task(output_folder_path)
    ....
    
    #in flow
    step1_folder_path = Parameter("step1_folder_path")
    output_folder_path = Parameter("output_folder_path", default = step1_folder_path)
    When I do this, I got a warning
    <ipython-input-226-3b472991f1ef>:8: UserWarning: A Task was passed as an argument to Parameter, you likely want to first initialize Parameter with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
    
      my_task = Parameter(...)  # static (non-Task) args go here
      res = my_task(...)  # dynamic (Task) args go here
    
    see <https://docs.prefect.io/core/concepts/flows.html#apis> for more info.
      output_folder_path = Parameter("output_folder_path", default = step1_folder_path)
    When I execute the flow, it runs but the
    target
    doesn't work as intended, the task still runs every time even though the target files are there. When I provide a hardcoded string to the default of
    output_folder_path
    , everything works fine
    k
    • 2
    • 1
  • s

    Samuel Hinton

    09/01/2021, 4:01 PM
    Hi all! Just wondering if anyone has noticed memory leaking when using prefect with dask workers? I am already deleting the local flow variables, calling gc.collect(), and have completely disabled checkpointing with prefect. And yet, this is typical for my workers until the OOM and die over the course of a few days. All the little bumps and step ups lines (green, red, blues) at the top are the dask workers in the swarm that keep growing.
    k
    • 2
    • 3
  • k

    Kieran

    09/01/2021, 4:52 PM
    Hey, We use CircleCI to test and register our flows (Prefect v0.14.12) which we use Docker storage for. In our recent attempts to register a new flow we have been hit with this error and failing to build:
    docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
    (full trace in this thread) From reading around is looks like a potential docker issue, an inability to connect to a socket?! Has anyone else been hit with this?
    m
    • 2
    • 53
  • a

    Abhas P

    09/01/2021, 4:56 PM
    Hi, while trying to execute a task that connects to a db(mongo) and extracts a collection I get this error :
    Unexpected error: TypeError("cannot pickle '_thread.lock' object")
    The task blueprint looks like this :
    def mongo_connect():    
        db = get_db() // wrapper on top of pymongoclient to connect to the specified db
        collection = get_collection(db). // wrapper on top of pymongoclient to get a specified collection form the db
        return collection
    I understand that prefect needs to pickle all the tasks , but this code works fine while being run as an independent script (without prefect decorators). How can I make the connection threadlock safe or rather pickle safe?
    k
    • 2
    • 8
  • l

    Leon Kozlowski

    09/01/2021, 5:35 PM
    Can a flow be registered without storage configured? Let’s say I want to use docker storage, but I have a separate build and push workflow that I’m using and then just pass the image to my run configuration
    k
    • 2
    • 3
  • g

    Gonzalo

    09/01/2021, 6:01 PM
    Hello everyone 🙌 I'm having a problem with scheduling flows, all of my flows did not schedule automatically overnight and I can't activate them through the interactive API (When I try to
    set_schedule_active
    I always get
    "success": false
    ). When I try to manually toggle the flow through the GUI the entire server hangs until I manually kill the GraphQL process on the server. Can anyone help or provide some insight? I'm acting as an interpreter for the tech team so I have to do some back and forth with my questions.
    🆘 1
    👀 1
    k
    z
    • 3
    • 40
  • w

    Wilson Bilkovich

    09/01/2021, 6:05 PM
    Is there a way to specify an alternative location for
    ~/.prefect/config.toml
    ? Maybe an environment variable, because I don't see any CLI flags about it?
    k
    • 2
    • 2
  • m

    Martin

    09/01/2021, 6:08 PM
    Hello! I was wondering what are some of the patterns that people use to set up flows with configurable number of workers for Dask? Given that you need to pass the executor to a flow it seems like you can't use parameters (as far as I can tell)? Right now we're using KubeCluster to spin up ephermal Dask clusters in K8s but our n_workers is hardcoded, but we'd like to configure it per environment or possibly based on task inputs in the future. Thanks!
    k
    e
    m
    • 4
    • 10
  • s

    Sean Talia

    09/01/2021, 6:37 PM
    Does anyone have any experience with dynamically configuring a state handler for a flow or some subset of a flow's tasks? We have a flow that basically serves as a "flow template" (specifically, for running different
    dbt
    projects) that multiple teams at my org use. The individual teams will set up an orchestrator flow that has one task (the
    StartFlowRun
    task), and they'll pass in the various parameters that they need to pass in so that their specific
    dbt
    project will run. They also will directly attach state handlers that they configure to their
    StartFlowRun
    task within their orchestrator flow. Of course, in order for the status of the underlying template flow to bubble up to the orchestrator flow, we have to set
    wait=True
    on that flow. The issue with this setup is that it unnecessarily eats up our flow concurrency, since this setup always leads to at least 2 flows running at a given time. We don't really need the orchestrator flow to wait around; we're only keeping it waiting as a means to capture the template flow's status, which then gets passed along to the custom state handler that the respective team within our org has configured.
    k
    • 2
    • 8
  • c

    Charles Leung

    09/01/2021, 10:08 PM
    Hey Team! it seems like the interactive GraphQL API doesn't reflect my role permissions. I created a tenant using GraphQL, then tried to assign it my license with the assign_tenant_license mutation. It's forbidden; then i just created my tenant through the UI and it worked, automatically attached with my enterprise license. Now i have to delete the old zombie tenant but i don't have permissions to do that either 😞 . Any resolution to this?
    k
    d
    • 3
    • 3
  • d

    Donny Flynn

    09/01/2021, 10:10 PM
    For a Cloud Automation to a webhook, it mentions that you can use a Prefect Secret for the Webhook you want to send. But I cannot figure out how to get the secret that I made to be filled in here. Anyone have any ideas?
    k
    j
    m
    • 4
    • 4
  • d

    Dan Stoner

    09/01/2021, 10:56 PM
    Should
    prefect register -p my_flow.py
    be updating the version of the Flow? I have changed the code and ran
    register
    again, multiple changes and multiple registers, but the VERSION is still
    1
    in
    prefect get flows
    z
    • 2
    • 3
  • o

    Omar Sultan

    09/01/2021, 11:34 PM
    Hi Guys, We're looking to use Webhook Storage to store the flows in our local environment. The Prefect server will be hosted in a K8 Cluster, any recommendations for a storage service that would fit that requirement?
    k
    • 2
    • 8
  • c

    Camila Solange

    09/02/2021, 12:16 AM
    Hello! when I see the calendar on the board it appears: Reminder! The Prefect Scheduler will only schedule 10 runs in advance. How can I program more scheduling??
    z
    k
    • 3
    • 3
  • b

    Ben Muller

    09/02/2021, 6:01 AM
    Hey, I have been trying to delete a project in the UI for half of the day now. Keep getting this error
    z
    • 2
    • 12
  • a

    Alireza Taghizadeh

    09/02/2021, 6:29 AM
    Hi everyone, I am new to prefect, so maybe my question seems naive! is it possible to use @task for methods inside a class? I have a class with several methods that can be set as task.
    z
    • 2
    • 2
  • a

    Abhishek

    09/02/2021, 7:19 AM
    Hello folks, i am trying to find a way to give a flow runs a name (currently its generated dynamically in prefect, see screenshot) Is there a way i can configure a flow run name in a code? I don’t see any parameter in
    flow.register()
    I looked into the documentation but couldn’t find it.
    g
    r
    • 3
    • 4
  • k

    Konstantin

    09/02/2021, 8:46 AM
    Hey there! In the Prefect settings, you can use Parameters with the data type 'list', 'dict'. Tell me, what is the maximum number of elements a list and a dictionary in a parameter can contain?
    👍 1
    b
    z
    • 3
    • 5
  • d

    Didier Marin

    09/02/2021, 1:38 PM
    Hello there! If there a way to perform a mapping with a specific batch size ? for example lets say I have two tasks
    f
    and `g`:
    ys = f.map(xs)
    zs = g.map(ys)
    Is there a generic way to give
    g
    batches of y values, for example 10, rather than one at a time ? (And obviously, without having to wait for all the ys to be computed.) Or is this something that is specific to the executor used ?
    z
    • 2
    • 2
  • f

    Felipe Saldana

    09/02/2021, 1:54 PM
    Hello Prefect. I have a question about dealing with Flow Parameters. Whats the preferred way to concatenate flow parameters that will be passed into a number of task? Let me type up a simplified example below
    k
    • 2
    • 6
  • j

    Jacob Hayes

    09/02/2021, 2:09 PM
    Did the "Mapped Tasks" panel go away/move recently?
    k
    • 2
    • 9
  • b

    Brad I

    09/02/2021, 4:30 PM
    Hi, does anyone know if the env vars set on the prefect agent (deployed in k8s) get injected into a dask cluster automatically? Or do we also have to duplicate them when making the pod spec for dask? Our executor is defined like:
    flow.executor = DaskExecutor(
        cluster_class=lambda: KubeCluster(
            name=f"dask-{flow_image_name}",
            pod_template=make_pod_spec(
                image=prefect.context.image,
                cpu_request=worker_cpu,
                memory_request=worker_mem,
                threads_per_worker=threads_per_worker
            )
        ),
        adapt_kwargs={
            "minimum": min_workers,
            "maximum": max_workers
        },
    )
    k
    • 2
    • 6
  • c

    Constantino Schillebeeckx

    09/02/2021, 4:31 PM
    We have a CI/CD pipeline that automatically registers flows for us (e.g. when a PR is opened); in my registration script I do something like:
    if not args.dry_run:
            logger.log(SUCCESS, f"Registering flow {flow.name}")
            flow.register(project_name=args.project, idempotency_key=flow.serialized_hash())
    Everything is working as expected, except I'm getting more versions of the flow (after multiple registrations) than I was previously expecting. It makes me wonder what I don't understand about the
    idempotency_key
    - could someone explain to me what could cause the serialized_hash to change unexpectedly? for example, if a different VM registers flows, would the hash change?
    k
    • 2
    • 11
  • m

    Michael

    09/02/2021, 4:40 PM
    Hey there. I’ve been playing around with Docker storage today, trying to get all source code packaged together with the flows each time they are registered, and am using the
    files
    and
    env_vars
    attributes as outlined in the Docs. But it seems that my
    .dockerignore
    file (in the directory from which I am registering the flow) is ignored by this build process. I have massive data directories nested at different levels inside the codebase so it’s essential there be a way to include this somehow. Anyone come across this / have any ideas? I’m curious about playing around with a custom Dockerfile, but the code that copies flows to a tmp Dockerfile in
    create_docker_file
    is dynamic so it seems tough that I could write a standalone Dockerfile to handle this case.
    k
    z
    m
    • 4
    • 16
  • j

    John Shearer

    09/02/2021, 4:55 PM
    Hi there! I have some (not all) tasks that need need PandasSerializer, and also need to use different result types depending on if dev or production. I have a solution for this (below), but wondered if anyone had a cleaner solution?
    condition = True
    resultType = LocalResult if condition else S3Result
    
    @task()
    def create_list():
        return [1,2,3]
    
    @task(result=resultType(serializer=PandasSerializer(file_type='parquet')))
    def to_df(a_list):
        return pd.DataFrame({'col1': a_list})
    
    with Flow(
            name="my-flow",
            result=resultType(serializer=PickleSerializer())
    ) as my_flow:
        my_list = create_list()
        my_df = to_df(my_list)
    d
    • 2
    • 2
  • c

    Charles Leung

    09/02/2021, 5:47 PM
    Hey prefect team! I'm trying to register multiple flows generated in a loop in my file. However, it seems to only register the latest flow in the loop. Whats the right way to do this?
    k
    • 2
    • 14
  • a

    Alex Furrier

    09/02/2021, 5:48 PM
    Is there a way to pass indexed items as arguments to a mapped tasks? E.g. If I have some inputs that are a list of tuples that i'm mapping over can I specify specific elements to pass as args when mapping over them? E.g. Something like this:
    @task
    def gen_records():
        return [('item', 1), ('item', 2), ('item', 3)]
    
    
    @task
    def subscript_input(x, y):
        return x+str(y)
    
    
    with Flow(
        name="Test Flow",
    ) as test_flow:
        records = gen_records()
        total = subscript_input.map(x=records[0], y=records[1])
    k
    • 2
    • 2
  • n

    Nadav

    09/02/2021, 6:05 PM
    Hi, i have a flow with docker storage in ECR and a KubernetesRun with image_pull_policy Always , i have an agent running on my EKS cluster, for some reason when i change the flow code and builds a new image and run the flow from cloud, the flow runs with the old image, i validated this when i change the image tag then the new code is used, any clues how to solve this?
    k
    b
    z
    • 4
    • 15
Powered by Linen
Title
n

Nadav

09/02/2021, 6:05 PM
Hi, i have a flow with docker storage in ECR and a KubernetesRun with image_pull_policy Always , i have an agent running on my EKS cluster, for some reason when i change the flow code and builds a new image and run the flow from cloud, the flow runs with the old image, i validated this when i change the image tag then the new code is used, any clues how to solve this?
k

Kevin Kho

09/02/2021, 6:07 PM
Can I see how you set up KubernetesRun?
n

Nadav

09/02/2021, 6:10 PM
flow.run_config = KubernetesRun( image=‘*************.dkr.ecr.us-west-2.amazonaws.com/semantic-layer:copy_semantic_views_to_snowflake2’, image_pull_policy=“Always” )
k

Kevin Kho

09/02/2021, 6:21 PM
This looks good to me. Will ask the team for any ideas
Hey, would you be able to check if the `image_pull_policy`is applied to the job that the agent created?
b

Brad I

09/03/2021, 5:23 AM
I may have run into the same issue in GKE: https://github.com/PrefectHQ/prefect/issues/4939
k

Kevin Kho

09/03/2021, 5:39 AM
Ok I’ll try this tom
z

Zach Angell

09/03/2021, 2:53 PM
@Brad I a couple questions to help in debugging • are you defining anything on your Kubernetes Agent? (e.g. a job template?) • how exactly are you running the flow? (UI, CLI, via a Schedule) • if running the flow in the UI, how exactly are you creating the flow run?
b

Brad I

09/03/2021, 3:08 PM
Hi @Zach Angell • We are deploying the standard agent yaml config from the command:
prefect agent kubernetes install
• No custom job template •
IMAGE_PULL_POLICY
is left blank (assume that means
IfNotPresent
) • We are running the flow both from the UI and API • In the UI, select the flow, click ‘Run’, setting our run inputs, and clicking ‘Run’ • API is similar, sending the input dictionary to the
create_flow_run
mutation • The
KubernetesRun
config is set on registration and everything works except the
image_pull_policy
flag
z

Zach Angell

09/03/2021, 3:17 PM
Thank you for the detail here! For the UI, is image pull policy respected if you just click "Quick Run"? For the API, could you share the
create_flow_run
mutation you're using? (Feel free to DM instead of post in the thread)
I haven't had a chance to test, but for the UI I suspect we need to add an "Image Pull Policy" field to the "Run" tab. If you edit the flow's run config while creating a flow run, it will overwrite the run config specified at registration. For the API, it's possible the Flow Group's run config is overriding the Flow's run config. Although this is not a common scenario. If you specify the full
run_config
in
create_flow_run
, I would expect it to respect the
image_pull_policy
argument
b

Brad I

09/03/2021, 7:48 PM
I can try that from the API, we currently don’t override anything other than the input parameters when creating a run since the client doesn’t know the correct backend environment variables to use. Our query looks like:
const query = gql`
      mutation ($input: create_flow_run_input!) {
        create_flow_run(input: $input) {
          id
        }
      }
    `;

    const variables = {
      input,
    };
Where the input is defined like:
parameters = {
        some_var: 42,
        uuid: fileId,
}
In general, we don’t really want our users to have to configure every run_config parameter and should just use the default that the flow was registered with (just like all the default env vars we set).
z

Zach Angell

09/07/2021, 3:58 PM
In general, we don’t really want our users to have to configure every run_config parameter and should just use the default that the flow was registered with
Understood and 100% agree that is how the system should work. For the UI, we need to add a new field. I've opened an issue here https://github.com/PrefectHQ/ui/issues/1040. For the API, I can do some further testing. Could you DM me the ID of your Flow and Flow Group? Both can be found in the "Flow" page -> "Overview" tab -> "Details" option on the "Overview" tile"
@Brad I I did some testing against the API and I'm not able to reproduce the issue you're seeing. Unless
run_config
is provided as an input to
create_flow_run
, Prefect should use the Flow's run config. I've also confirmed the Flow Group does not have a
run_config
that would be overriding this behavior. The UI definitely has a bug, but I'm a bit perplexed on the API behavior
b

Brad I

09/10/2021, 12:40 PM
Thanks @Zach Angell, we’ve been doing a lot of testing just from the UI, so I’ll double check from the API today.
👍 1
View count: 5