https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Rob McInerney

    05/23/2022, 4:36 PM
    Hi! I’m trying to configure reference tasks for a flow with case statements and having some difficulty. Despite tasks failing, the flow is still succeeding. What am I doing wrong? Code follows the following example:
    with Flow(FLOW_NAME, storage=set_storage(FLOW_NAME), run_config=set_run_config(),) as flow:
        dataset = Parameter("dataset", default={})
        
        dataset_is_valid = check_dataset_is_valid(dataset)
        
        with case(dataset_is_valid, True):
           
            access_token = get_access_token()
            response = refresh_dataset(dataset, access_token)
    
            result = check_result(dataset, response, access_token)
            flow.set_reference_tasks([response, result])
        
        with case(dataset_is_valid, False):
            flow.set_reference_tasks([dataset_is_valid])
    k
    4 replies · 2 participants
  • i

    Ilhom Hayot o'g'li

    05/23/2022, 5:58 PM
    Hi! Should child flow be registered in the scheduled Flow each time? (Prefect version 1.2)
    k
    10 replies · 2 participants
  • j

    Josh

    05/23/2022, 8:59 PM
    I can’t find an environment variable in my flow run. The error includes a message
    This may be due to one of the following version mismatches between the flow build and execution environments:\n - python: (flow built with '3.7.13', currently running with '3.7.12')")
    The CI/CD system registering my flow I guess is using python 3.7.13. But how do I know what the execution environment python is if it’s executing from a docker image in a prefect docker agent?
    k
    21 replies · 2 participants
  • j

    Josh

    05/23/2022, 10:47 PM
    Is there a way to have flow-runs named a particular way similar to how we can have Task Runs named based on parameters?
    k
    o
    3 replies · 3 participants
  • h

    HaoboGu

    05/24/2022, 2:27 AM
    Hello folks, I’m looking for a tool which could automate our data processing steps and pytorch model training steps in our machine learning pipeline. Is prefect a good tool for this task? I can see prefect is good at ETL, but I’m not sure if prefect has good integrations with pytorch distributed training. Any suggestions? Thanks
    k
    m
    17 replies · 3 participants
  • d

    Dylan Lim

    05/24/2022, 2:52 AM
    Hello! I’m trying to run a registered flow but I got this error message in the agent logs, could anyone help to explain what this error means?
    k
    a
    36 replies · 3 participants
  • g

    Guillaume Latour

    05/24/2022, 6:57 AM
    Hello, I have an issue with the intermediate results of the tasks of a flow taking too much space. My setup is like this: • 1 prefect server (launched with docker-compose) and 1 (local) prefect agent on the same server (sharing the ~/.prefect/config.toml configuration) • 1 dask cluster (launched with docker on another server) with the needed python libraries for the flows to execute correctly, those cluster do not have any prefect configuration So obviously I am using a DaskExecutor which is correctly configured in this ~/.prefect/config.toml file. What I see is that the dockers create a
    /home/<user>/.prefect/results
    folder with
    <user>
    beeing the user launching the prefect server & agent on the other server (and who are not present in the docker) I've added this line into the configuration:
    flows.checkpointing = false
    , relaunched the server and agent but nothing has changed: the results folder is still beeing filled with intermediate tasks results. Am I doing something wrong? Is it possible to prevent this intermediate backups? Ty in advance
    a
    k
    14 replies · 3 participants
  • e

    Emma Rizzi

    05/24/2022, 7:17 AM
    Hello! I'm trying to implement some unit tests for tasks, supposed to run with pytest in gitlab ci Problem is, some tasks use prefect logger which is not available in test environment Is there a recommended way to perform tasks unit test ? I thought about using a environment variable to detect if the code is executed in a flow context, I didn't find one already defined by prefect in the docs
    a
    k
    5 replies · 3 participants
  • i

    Ilhom Hayot o'g'li

    05/24/2022, 8:38 AM
    Hello! Is that mandatory to create an agent when running the flow of flow? I have been reading this example. But any agents there If created one why it is running too many processes? Should agent be killed after each run in scheduled task or is it done by program?. Otherwise it is eating too much RAM and increasing each time it runs. how to kill the agent after process.
    a
    k
    13 replies · 3 participants
  • k

    Kayvan Shah

    05/24/2022, 8:49 AM
    Getting the following traceback while shutting down the orion server.(running locally) Did not get such errors a few days ago. Confused what is this stack trace trying to say
    Error.py
    👀 1
    a
    2 replies · 2 participants
  • t

    Todd de Quincey

    05/24/2022, 9:45 AM
    Prefect 2.0 - Snowflake PUT command from local file error
    a
    19 replies · 2 participants
  • e

    Ed Burroughes

    05/24/2022, 11:59 AM
    Hey All, Apologies if this is the wrong channel for this. We've come into a bit of an issue with creating a dynamic flow in Prefect 1.0, see below:
    with Flow("tax_assessor_inc") as flow:
        
        raw_s3_keys = Parameter("raw_s3_keys")
        
        flow_tasks = []
        for s3_key in raw_s3_keys:
            _clean = clean([s3_key])
            _load_data = load_data(_clean)
            _test_raw_data = test_raw_data()
            _transform_task = transform_task()
            flow_tasks.append([_clean, _load_data, _test_raw_data, _transform_task])
    
        reduced_flow_tasks = sum(flow_tasks, [])
        for i in range(len(reduced_flow_tasks) - 1):
            reduced_flow_tasks[i].set_downstream(reduced_flow_tasks[i + 1])
    The issue arises when trying to iterate over the
    raw_s3_keys
    parameter as it's not iterable. It works when hardcoding the s3 paths but obviously that's not what we want, as it would great to dynamically define that list. Also annoyingly each task has to occur synchronously otherwise we'd use something like prefect map. Firstly is it possible to iterate the output of a parameter object. If not is there a better way of solving this problem. Sorry for the long winded message, Thank you 🙂
    a
    9 replies · 2 participants
  • f

    Frank Embleton

    05/24/2022, 1:21 PM
    What are my options for alerting with Prefect 2.0? I guess I can create alert tasks and then call them as part of a flow on failure. Is there any plan or ability to handle that on a higher level? Such as on Cloud?
    a
    2 replies · 2 participants
  • r

    Rainer Schülke

    05/24/2022, 1:41 PM
    Good afternoon 🙂 I am on Prefect 1.0 and was asking myself - mainly out of curiosity - if it is possible to exclude a specific element while calling .map(), store that element and process it at a later point within the flow? (Not talking about unmapped)
    ✅ 1
    a
    c
    7 replies · 3 participants
  • v

    Vamsi Reddy

    05/24/2022, 2:33 PM
    Hi there, we are using an ECS agent to run our flows. is it possible to set the cpu and memory per flow. I tried setting the cpu and memory for the flow in ECSRun config but the task that gets spun up still uses the default that was specified in the yaml file i.e cpu 1024 and memory 2048. i want to change the cpu and memory per flow. my current ecs run config is as follows:
    RUN_CONFIG = ECSRun(
        labels=["dev"],
        task_role_arn="arn:aws:iam::xxxxx:role/prefectTaskRole",
        execution_role_arn="arn:aws:iam::xxxxx:role/prefectECSAgentTaskExecutionRole",
        image="<http://xxxxxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:latest|xxxxxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:latest>",      # this is our custom image that also has our flows
        run_task_kwargs=dict(cluster="prefectEcsClusterDev", launchType="FARGATE", overrides=dict(
            containerOverrides=[
                dict(
                    name="flow",
                    cpu=4096,
                    memory=8192,
                )])),
        cpu=4096,                    #tried specifying here but still no luck
        memory=8192
    )
    k
    20 replies · 2 participants
  • t

    Todd de Quincey

    05/24/2022, 2:36 PM
    Prefect 2.0 Add users & edit permissions
    ✅ 1
    k
    a
    9 replies · 3 participants
  • j

    JK

    05/24/2022, 2:42 PM
    Hello, I’m not seeing logs on our flow runs, have a feeling that it may be something on our account — is there a way to double check?
    ✅ 1
    k
    1 reply · 2 participants
  • m

    Madison Schott

    05/24/2022, 4:50 PM
    Hi 🙂 wondering if anyone has resources for creating a (near) real-time data pipeline using Fivetran and Prefect?
    k
    4 replies · 2 participants
  • c

    Constantino Schillebeeckx

    05/24/2022, 6:02 PM
    Question about environment variables, if I set something like
    "FOO": "BAR"
    in the run config, and then do something like
    os.environ["FOO"]: "BAZ"
    in the flow.py - what will
    FOO
    bet set to when the flow runs?
    k
    2 replies · 2 participants
  • p

    Patrick Tan

    05/24/2022, 6:39 PM
    Hi, today I setup a personal account for development using personal email address. Projects and flows are registered under my account. For production deployment, we want multiple people to be able to monitor flow run and access Prefect Cloud UI. What is best practice on setting this up? Should we setup a service account (share email address) and everyone login to same account?
    k
    3 replies · 2 participants
  • t

    Tom Manterfield

    05/24/2022, 7:07 PM
    Hello all! Are there any release notes for the 2.0b5 release anywhere?
    ✅ 1
    a
    7 replies · 2 participants
  • c

    Christian Vogel

    05/24/2022, 7:18 PM
    Hi all! I am currently trying to use Prefect together with Ray using the @flow(task_runner=RayTaskRunner()) annotation. While on the head node of the cluster everything works fine, on the workers the flows continously fail with the message: "(begin_task_run pid=1659, ip=172.31.2.206) File "/home/ray/anaconda3/lib/python3.7/site-packages/prefect/orion/models/task_runs.py", line 282, in set_task_run_state (begin_task_run pid=1659, ip=172.31.2.206) raise ValueError(f"Invalid task run: {task_run_id}") (begin_task_run pid=1659, ip=172.31.2.206) ValueError: Invalid task run: 3e46e4ce-c740-47d8-9ac7-79951b6fd98f". Any idea why? I am using ray 1.12.1, prefect 2.0b5 and python 3.7.7.
    ✅ 1
    k
    a
    +2
    20 replies · 5 participants
  • k

    Kevin Kho

    05/24/2022, 7:30 PM
    message has been deleted
    🔥 1
  • s

    Sander

    05/24/2022, 8:45 PM
    Hi, I was trying to find some information on using redis as a store to persist data between task runs (or flow runs). Is there any? Thx!
    k
    m
    23 replies · 3 participants
  • p

    PRASHANT GARG

    05/25/2022, 5:39 AM
    Hi , What can be the reasons for flowstorage error while running a flow in prefect cloud
    k
    3 replies · 2 participants
  • t

    Thomas Opsomer

    05/25/2022, 10:16 AM
    Hello 🙂 We're having issues with tasks that require manual validation: they won't start 😕
    a
    5 replies · 2 participants
  • n

    Naga Sravika Bodapati

    05/25/2022, 12:08 PM
    In Prefect 1.0, is there a way to run the flow next on schedule only if its earlier run is successful? Looking for program based solutions but other also welcome! Thanks.
    ✅ 1
    a
    13 replies · 2 participants
  • k

    Kayvan Shah

    05/25/2022, 12:14 PM
    Will decorating class constructors as task and a function that pipe those tasks as flow work with prefect 2.0 In cases where we create deployments What is your opinion and what do you suggest? Should a function be created where that constructor is called and decorating that with flow works better than the previous approach???
    a
    k
    7 replies · 3 participants
  • d

    Daniel Sääf

    05/25/2022, 12:29 PM
    Hi! This morning i was refactoring some code and also upgraded to prefect2.0b5 - but when i tried to deploy and run my agent on a “Compute Engine” instance i ran into problems. (Dont think it’s related to prefect2.0b5 since the problem can be reproduced in b4 as well. When i run a flow from the compute engine i get an exception saying that the client cannot be pickled:
    _pickle.PicklingError: Pickling client objects is explicitly not supported.
    Clients have non-trivial state that is local and unpickleable.
    After reading up on this it sounds like this is due to some other exception that results in an unpickleable object. Is that right? Are there any ways i can reach the underlying exception? Traceback and code in thread..
    ✅ 1
    a
    k
    13 replies · 3 participants
  • j

    Jonathan Mathews

    05/25/2022, 12:44 PM
    Hi, we’ve got a flow of flows, which runs every time a file is added and then at the end runs a flow that updates a dataset in Power BI. Since we’re only able to refresh PowerBI 8 times a day, is there a way to group the power bi refreshes together in case multiple files are added? Just wondering what might be the best way to architect that, or if anyone’s done something similar
    a
    k
    7 replies · 3 participants
Powered by Linen
Title
j

Jonathan Mathews

05/25/2022, 12:44 PM
Hi, we’ve got a flow of flows, which runs every time a file is added and then at the end runs a flow that updates a dataset in Power BI. Since we’re only able to refresh PowerBI 8 times a day, is there a way to group the power bi refreshes together in case multiple files are added? Just wondering what might be the best way to architect that, or if anyone’s done something similar
a

Anna Geller

05/25/2022, 12:55 PM
iterating some business logic over a runtime-discoverable list of files looks like a perfect use case for mapping - did you try that?
alternatively, you could have a flow that fires any time a new file is added in an event-driven way if I misunderstood your question, could you explain in more detail what you are trying to do?
to group multiple flow runs or task runs, in Prefect 2.0 there is a flexible concept of tags and the UI provides filter functionality allowing you to even build custom dashboards based on that - this topic may help
j

Jonathan Mathews

05/25/2022, 12:59 PM
Thanks! We have it set up in an event driven way so that a new flow is triggered every time a file is added. The problem is when a new file is added and then 2 mins later another one, and so on…
Ideally it would be something like merging flows, or have the powerbi refresh on some kind of a delay
Then if the refresh flow had already been triggered and was pending it would not allow it to trigger again
k

Kevin Kho

05/25/2022, 2:35 PM
I think there are things you can do there, but the workload sounds too unpredictable. Is this Prefect 1 or 2? For Prefect 1, you can use the state handler to check for the last flow run and if it’s too recent, cancel the current run. The “merging” is slightly possible with that state handler. The parent flow can also schedule the child flow with a delay. But the problem is you don’t know the exact delay here to use. And I think going this route makes the architecture far more complicated that trying to bump up the 8 refreshes a day would be significantly less work.
View count: 4