https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marko Herkaliuk

    09/16/2021, 8:11 AM
    Hello, I have a mapped task B, in which, depending on the data obtained, I either generate the data (artifact) or not. This data is passed to the next mapped task C. If I do not generate data in task B, then I want to Skip it, and task C. How easy way for me to do it?
    z
    2 replies · 2 participants
  • a

    Andor Tóth

    09/16/2021, 9:09 AM
    hello! I can't find in the documentation, how can I start an agent, that will run flows without labels?
    ✅ 1
    k
    3 replies · 2 participants
  • l

    Lucas Beck

    09/16/2021, 9:21 AM
    Hi again, After getting some great help here, I have two new things I am struggling with, perhaps someone here has some input. Challenge 1 Firstly, we have a pretty common use case where we have a flow that receives a list of inputs
    [x1,x2,...xn]
    that can be processed in parallel through a DAG. Out of this
    n
    inputs, let's say that a fraction of those
    k
    fail due to various reasons. We would then find the issues, fix them and like to rerun the flow for those inputs. When
    n
    and
    k
    is large, then it is quite some work to go through the UI and/or logs to find all the inputs that fail in order to start the next run. What I am doing right now is naming the tasks with their inputs, that way I can filter failed tasks in the UI. This is not optimal, as it is still involves quite some manual work copying and pasting stuff. I initially thought about logging which inputs failed at the end of the flow execution. However, I cannot think of a trivial way of doing this without it involving some form of global variable that needs to be passed around to different tasks and be write/read thread safe. It just does not sound like the proper way of tackling this. How do others approach this? Challenge 2 Second, my tasks are spinning up Kubernetes jobs/pods to run computational heavy workloads. If the task execution goes well, then I use
    DeleteNameSpaceJob
    to clean up once a job completes. But when I manually cancel those flow runs or once a job fails, I end up with lots of clutter that I need to manually clean up in the k8s cluster. I have been thinking about using state handlers to deal with this, calling the
    DeleteNameSpaceJob
    once the new state is
    Cancelled
    or
    Failed
    . However, the state handler needs to know the job name it needs to delete, which is a piece of information that the tasks knows about. My struggle is on how to send the
    job_name
    from the task to the state handler? As I understand the state handler has a specific signature, so I cannot pass extra parameters. I tried using the context for that, but what I have written in the context within the task does not seem to be propagated to the state handler. Any ideas here? Thanks!
    a
    k
    12 replies · 3 participants
  • q

    Qin XIA

    09/16/2021, 9:43 AM
    Does anyone have tried run a pyspark ( spark cluster in local docker ) task from a local agent in Docker without using bash command ?
    a
    10 replies · 2 participants
  • d

    Daniil Ponizov

    09/16/2021, 3:41 PM
    Hello! does Prefect support asynchronous tasks?
    k
    z
    +1
    10 replies · 4 participants
  • s

    Shaoyi Zhang

    09/16/2021, 5:19 PM
    Hi, how would I change the
    PREFECT__CLOUD__AGENT__AGENT_ADDRESS
    field fo Kubernetes deployment without modifying the yaml file directly? Is there a command line argument that I can use? I was suspecting
    --agent-address
    , but that’s not available.
    - name: PREFECT__CLOUD__AGENT__AGENT_ADDRESS
              value: http://:8080
    Below is the command I’m testing
    prefect agent kubernetes install --backend cloud --api "<https://api.prefect.io>" --namespace xxxxx --tenant-id xxxxx --service-account-name xxxx --key xxxx --rbac | kubectl apply --namespace=xxxxx -f -
    k
    19 replies · 2 participants
  • a

    Andy Dyer

    09/16/2021, 5:27 PM
    Is it possible to increase the TTL from the idempotency key?
    k
    m
    5 replies · 3 participants
  • k

    Kevin Weiler

    09/16/2021, 5:40 PM
    Hi there - I have a flow where I would like to (at flow run time) get a date (as a string) from a function defined in my codebase and provide it to ALL of the tasks in the flow. This sounds like a Parameter, but there are a couple problems with this: 1. a function call is not JSON serializable 2. I don’t really want every single task to depend on one node that is just getting a string - the graph view would be extremely cluttered Any ideas on how to accomplish this?
    k
    m
    29 replies · 3 participants
  • k

    Kathryn Klarich

    09/16/2021, 9:33 PM
    are there any limits of what should be stored in a prefect context? I.e. is it a bad idea to store a dataframe in prefect.context so that I can pull this dataframe from a state_handler? My alternative (I think) would be writing the dataframe to disk, then setting the location of that dataframe in the context and then pulling the dataframe location from context in the state hanlder.
    k
    4 replies · 2 participants
  • b

    Ben Muller

    09/17/2021, 12:26 AM
    Hey Community, As we have been scaling out our prefect usage, we are starting to be quite hampered by a few limitations with how prefect might be handling ECS and its fargate task registration. I keep running into this error:
    an error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family
    I have followed the instructions here, but have had no change in the behaviour. We run about 2500 flows every 24 hours and they are generally short running ~60 seconds and run every 15 or so minutes. I use the aws cdk and this is how I have injected the variables into my fargate ecs agent.
    fargate_task_definition.add_container(
                id="prefect-fargate-agent-container",
                container_name="prefect-fargate-agent-container",
                image=ecs.ContainerImage.from_ecr_repository(
                    prefect_agent_image_repo, "latest"
                ),
                cpu=256,
                memory_limit_mib=256,
                logging=ecs.AwsLogDriver(stream_prefix="prefect-fargate-agent-container"),
                environment={
                    "AWS_DEFAULT_REGION": "ap-southeast-2",
                    "LAUNCH_TYPE": "FARGATE",
                    "AWS_RETRY_MODE": "adaptive",
                    "AWS_MAX_ATTEMPTS": "100",
                    "TASK_ROLE_ARN": task_role.role_arn,
                    "LABEL": "prefect-agent-fargate",
                    "NAME": "prefect-fargate-agent",
                },
                secrets={
                    "PREFECT__CLOUD__AGENT__AUTH_TOKEN": ecs.Secret.from_ssm_parameter(
                        prefect_cloud_agent_auth_token
                    )
                },
            )
    Is there anyone who might be able to help me fix this issue, its really hurting us at the moment. Cheers
    z
    r
    15 replies · 3 participants
  • a

    Andreas Tsangarides

    09/17/2021, 7:27 AM
    Hi all! new member here, so hoping not to be asking an already answered question! Q: How do you set up a project folder structure that allows locally developing and registering multiple flows? If using a single script (not a package) having
    if __name__ == '__main__':
        # run/register flow
    works fine. But if you have modules python does not like it. I was hoping for a structure like the screenshot attached below. For now, I import the flow I am working on the
    main,py
    file and that's the one I use to register my flow and test locally (with docker-compose, local agent and the UI)
    k
    d
    9 replies · 3 participants
  • l

    Lucas Beck

    09/17/2021, 8:48 AM
    Hi, Anyone knows how to name task runs based on inputs, but without using the
    @task
    decorator? I am sub classing
    Task
    and would like to name it based on the inputs given to
    run
    ✅ 1
    g
    2 replies · 2 participants
  • j

    Jacob Blanco

    09/17/2021, 8:51 AM
    Is there a way to retrieve a full list of users on a Cloud tenant? I couldn’t find anything on the GraphQL docs
    k
    e
    7 replies · 3 participants
  • d

    Daniil Ponizov

    09/17/2021, 9:37 AM
    Hi, how can I pass environment variables to Docker agent without restarting? is there any options to set it in flow, for example pass the path for *.env file or make visible variables from current terminal session to Local agent?
    b
    k
    3 replies · 3 participants
  • h

    haf

    09/17/2021, 12:38 PM
    Can I make tasks read from a local folder as a cache using
    snapshotting
    ?
    k
    9 replies · 2 participants
  • b

    Benjamin Holzknecht

    09/17/2021, 1:10 PM
    Hey guys, I`m new working prefect (love it so far :bowtie:). Our plan is to use it as a job scheduler for our dwh. In general we are planning to do ELT with Fivetran and dbt, therefore we are also looking into dbt cloud. My first idea was using pyhton request library to simply trigger via api the dbt job, but I`m wondering if you guys have any smarter idea or what's the prefect-isch way to solve the task. I also looked into the dbt shell execution, but does it also work together with dbt cloud? Thanks in advance, Ben
    k
    m
    +1
    6 replies · 4 participants
  • p

    Pedro Machado

    09/17/2021, 1:15 PM
    Hi there. I recently started getting some errors on VS Code using pylance. For example, the following line
    from prefect import Flow, Parameter, task, unmapped
    produces
    "Flow" is not exported from module "prefect"
    Import from "prefect.core" instead Pylance(reportPrivateImportUsage)
    
    "Parameter" is not exported from module "prefect"
    Import from "prefect.core" instead Pylance(reportPrivateImportUsage)
    
    "task" is not exported from module "prefect"
    Import from "prefect.utilities.tasks" instead Pylance(reportPrivateImportUsage)
    
    is not exported from module "prefect"
    Import from "prefect.utilities.edges" instead Pylance(reportPrivateImportUsage)
    Also, I am getting pylance errors when passing Parameters to tasks that expect
    str
    arguments. I don't believe this was happening before. Do you know if something changed recently in pylance or in how we are supposed to import modules? I am using prefect 0.15.4 Thanks!
    k
    5 replies · 2 participants
  • c

    Constantino Schillebeeckx

    09/17/2021, 3:40 PM
    I understand that for local execution, we can set local secrets; is the same possible for the key-value store? My unit tests are failing because I don't auth to Prefect, so I'm wondering whether I should mock
    get_key_value
    or if there's an ENV I can alternatively set.
    k
    z
    4 replies · 3 participants
  • j

    Jacob Goldberg

    09/17/2021, 4:49 PM
    I am hoping to have a task be blocked by an upstream task whose execution depends on a parameter, and that does not necessarily provide any input to the downstream task. With my current flow structure the downstream task is only executed when my parameter is set to
    false
    . How can i get it to execute in both cases where the parameter is
    true
    or
    false
    ? Sample code in thread
    k
    5 replies · 2 participants
  • j

    Jason Prado

    09/17/2021, 5:27 PM
    Can I look at a prefect flow run and tell what image it used in the
    KubernetesRun
    ? The name of the image or ideally its hash?
    z
    2 replies · 2 participants
  • j

    Jason Prado

    09/17/2021, 6:32 PM
    I’m trying to understand how
    KubernetesAgent
    (code here) uses the environment variables used in the k8s deployment, particularly
    IMAGE_PULL_POLICY
    , to build the podspec for a flow run. It looks like if a flow specifies
    run_config=KubernetesRun(…)
    at all, then
    IMAGE_PULL_POLICY
    from the deployment env is ignored? Is that right?
    k
    8 replies · 2 participants
  • k

    Kevin Weiler

    09/17/2021, 8:58 PM
    related to this thread, I’m trying to set a key in the context in one task, and then read it in a subsequent task. Is that something that should work? Here is the task that sets the key:
    class SetTrDate(Task):
        def run(self, tr_date: str):
            prefect.context.tr_date = tr_date or get_last_tr_date()
    and then I read it in subsequent tasks with :
    prefect.context.get('tr_date')
    The subsequent tasks seem to think that it’s set to
    None
    Should this work?
    k
    25 replies · 2 participants
  • d

    Daniel Saxton

    09/18/2021, 3:43 PM
    new to Prefect here, but how do we avoid crash-looping when using an
    IntervalSchedule
    if the workflow is raising an exception (so we'd like to simply cancel the workflow as soon an error occurs)?
    z
    k
    14 replies · 3 participants
  • j

    Jacolon Walker

    09/19/2021, 4:55 AM
    Hello folks! Is there anyone to cancel a scheduled flow from the prefect cli command?
    k
    1 reply · 2 participants
  • n

    Nadav Nuni

    09/19/2021, 9:25 AM
    hey =] couldn’t find a clear explanation in the docs - what could be a reason for me to get a
    task is already running
    (for a KubernetesRun)? and then
    Finished task run for task with final state: 'Running'
    …. (other than that, it seems like the task doesn’t really run.
    k
    8 replies · 2 participants
  • n

    Nadav Nuni

    09/19/2021, 9:28 AM
  • w

    William Burdett

    09/19/2021, 3:18 PM
    I seem to be having an issue when registering a flow with the cloud. It fails here:
    File "/usr/local/lib/python3.9/site-packages/marshmallow/fields.py", line 1410, in _serialize
        return int(value.total_seconds() / base_unit.total_seconds())
    AttributeError: 'int' object has no attribute 'total_seconds'
    In the comments I have the full stack trace
    k
    19 replies · 2 participants
  • h

    haf

    09/20/2021, 8:30 AM
    If you return a python set from a FunctionTask and try to use it in another task B, Prefect fails B silently without reason.
    e
    22 replies · 2 participants
  • a

    Abhishek

    09/20/2021, 11:06 AM
    Problem in running ecs agent: i am running an ecs agent as followed:
    prefect agent ecs  start --key $KEY --task-role-arn $TASK_ARN --log-level INFO --label s3_sync --name farget-dev --execution-role-arn $EXEC_ROLE_ARN
    and its giving me an error:
    ValueError: Failed to infer default networkConfiguration, please explicitly configure using `--run-task-kwargs`
    I am created a ecs cluster (using default). I used following command to use create cluster:
    aws ecs create-cluster
    which creates a default cluster. Can anyone point out if i am missing anything?
    a
    k
    +3
    75 replies · 6 participants
  • a

    Abhishek

    09/20/2021, 11:07 AM
    i see this is the default task_defination template : https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/task_definition.yaml But i am not sure if i need to pass anything or create a task definition template.
    k
    1 reply · 2 participants
Powered by Linen
Title
a

Abhishek

09/20/2021, 11:07 AM
i see this is the default task_defination template : https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/task_definition.yaml But i am not sure if i need to pass anything or create a task definition template.
k

Kevin Kho

09/20/2021, 1:33 PM
Hey @Abhishek, ECSRun will get merged with that. You don’t need to pass anything. Just saw Anna responded above. Will read that.
View count: 3