https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
data-tricks-and-tips
  • f

    Falk

    08/16/2022, 8:34 AM
    Hey everyone, quick question about the
    jira_notifier
    . I'm trying to set up a Jira Notification which is triggered whenever a task fails:
    from prefect import task
        from prefect.utilities.notifications.jira_notification import jira_notifier
    
        @task(state_handlers=[
            jira_notifier(
                only_states=[Failed],
                options={'project': 'TEST', 'issuetype': {'name': 'Bug'}},
                assignee='tester'
            )
        ])
        def add(x, y):
            return x + y
    However I'm getting the following error:
    Failed to load and execute flow run: NameError("name 'Failed' is not defined")
    Is there an import for States or something like this? Or am I missing something else?
    ✅ 1
    s
    • 2
    • 3
  • a

    Angel Acosta

    08/16/2022, 8:07 PM
    Hello, I have a quick question about how to manually trigger a failed task state in code using 2.0. Essentially I am looking at an http response and if != 200 then I want to fail the task so that I can receive an alert. Not sure if this is the most elegant solution but I would appreciate any suggestions. Thanks!
    ✅ 1
    a
    • 2
    • 2
  • a

    Angel Acosta

    08/25/2022, 2:35 PM
    Hello, I am using prefect cloud 2.0, I just started receiving a notification that my work queue uses a deprecated tag-based approach. What is the recommended way to create a work queue and assign a deployment to it? I am also testing using a docker container to run my flows. I am curious if there is a tutorial on this. I was able to start a prefect agent from the docker container, but It never picks up any work. (I am new to docker as well)
    ✅ 1
    a
    • 2
    • 10
  • y

    Yusuf

    09/01/2022, 9:09 PM
    Hey, I just finished going through the Get Started section for the Prefect 2.0 docs. I haven't yet gotten to the Concepts yet, so bear with me if this is a dumb question. But how do I define dependencies between tasks? What is the equivalent of
    task.set_upstream
    from prefect 1? I know Prefect 2.0 tries to remove the strict dependency on DAGs but are DAGs gone altogether?
    ✅ 1
    t
    • 2
    • 3
  • a

    Aman Tripathi

    09/04/2022, 5:15 PM
    Hi Team, Newbie to 2.0. We are in process to upgrade to 2.0. We are using Prefect + PySpark in our project. In Prefect 1.0 we used resource_manager (the decorator) for SparkContext, so SparkSession is initialized at the beginning of the flow and remains throughout the lifetime of the flow. What is the equivalent of resource_manager decorator in 2.0? Thanks for your help.
    ✅ 1
    a
    • 2
    • 2
  • g

    gertjan

    09/06/2022, 10:20 AM
    Hi everyone quite new to prefect and testing something out but I’m stuck,. I want to use
    Deployment.build_from_flow
    , it works when I put this in the same directory of the
    flows
    But what I want is the following:
    - deployments:
       -- deployments.py (file with all my deployments)
    - flows:
       -- flow_a.py
       -- flow_b.py
    When I generate the deployments I get:
    ValueError: '..../updates/flows/flow_a.py' is not in the subpath of '.../updates/deployments' OR one path is relative and the other is absolute.
    It has to do with this line
    entry_path = Path(flow_file).absolute().relative_to(Path(".").absolute())
    -> it does not look like I can edit this “path” Anyone has done this before? Is this even possible?
    ✅ 1
    a
    • 2
    • 5
  • f

    Falk

    09/06/2022, 11:30 AM
    Hey everyone, just wondering if anybody is familiar with https://github.com/ydataai/pandas-profiling and managed to implement it in their Prefect Flows? I'm looking for a way to run pandas profiling on a Dataframe and show the results in the artifacts. Has something like this been implemented yet?
    ✅ 1
    a
    • 2
    • 1
  • a

    Angel Acosta

    09/06/2022, 4:28 PM
    Hello again, I am trying to use the github storage block. I create the block, here is the url https://github.com/AngelSantiagoAcosta/prefect_sb . I run the agent and run the deployment with defaults. here is the deployment string: prefect deployment build lis_qc.py:process_lis_data --name lis_qc_deployment -q "lis qc queue" -sb github/my-github --apply . the agent returns the following error: OSError: Failed to pull from remote: 'g' is not recognized as an internal or external command, operable program or batch file.
    c
    • 2
    • 7
  • y

    Yusuf

    09/07/2022, 10:36 PM
    Looking for advice for a pattern I'm trying to implement in prefect 1 that behaves like a local filesystem sensor. I don't want to rig it up to lambda/event grid for this use case. I have files arriving in a local filesystem. My thought is to just have a first task that runs every 10 minutes that checks if a new file has arrived it determines what is "new" by checking a log of processed file names that the last task in the flow updates. I can't move the files themselves to a landing/processed area because this is sitting on top of a process in place already that I can't change. My question is, how should I make it such that when the first task runs, and it sees no new files, for the remaining tasks to not execute, but the flow and tasks to all show successful?
    ✅ 1
    a
    • 2
    • 11
  • g

    gertjan

    09/09/2022, 6:30 AM
    I know you can change profiles via the CLI, but can you change profiles in the python code? For example: I have one flow, 2 tasks, but I want to execute each task on a different profile. (basically different ENV vars) Also from the docs:
    prefect config set VAR2=Y VAR3=Z
    Gives:
    Unknown setting name 'VAR2'.
    And nothing is added in the
    profiles.toml
    file, what am I missing?
    ✅ 1
    j
    • 2
    • 2
  • l

    Lucas Cavalcanti Rodrigues

    09/09/2022, 9:17 PM
    Hi, everyone. How can I run a flow several times using different parameters? I'm trying to use create_flow_run.map like this:
    dump_to_gcs_flow = create_flow_run.map(
                flow_name=unmapped(utils_constants.FLOW_DUMP_TO_GCS_NAME.value),
                project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value),
                parameters=tables_to_zip,
                labels=unmapped(current_flow_labels),
                run_name=unmapped("Dump to GCS"),
            )
    except
    parameters
    all the other arguments are constants, so I use
    unmapped
    on them.
    tables_to_zip
    is a list of dictionaries containing the parameters values for each table to be zip. However, this didn't work. I'm currently receiving the error:
    prefect.exceptions.ClientError: [{'message': 'parsing UUID failed, expected String, but encountered Array', 'locations': [{'line': 2, 'column': 5}], 'path': ['flow_run'], 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
    what am I doing wrong here?
    r
    • 2
    • 1
  • c

    Chern Hong Poh

    10/12/2022, 10:08 AM
    message has been deleted
    ✅ 1
    m
    • 2
    • 1
  • k

    Khyaati Jindal

    10/18/2022, 10:28 AM
    I can see a GUI option to delete a flow, but is there a way to bulk delete flow ? When ever my agent goes down, on my vm, the flows get stacked and when I rerun the agent, all the stacked flows get executed at the same time. I have to manually delete all flows from prefect cloud, can we do this in CLI or python code ?
    ✅ 1
    j
    • 2
    • 2
  • s

    Stephen Thibeault

    10/27/2022, 8:06 PM
    Hey team! When using Prefect cloud for deployment with a GitHub storage block, I keep getting this error and it seems to be causing the flow to be unable to run. Does anyone know what the issue may be? Block document has schema checksum sha256:196e3675d678bda766b96c223c5dd5d87223f556150ce5e6 3f55f0775feec972 which does not match the schema checksum for class 'GitHub'. This indicates the schema has changed and this block may not load.
    j
    p
    • 3
    • 34
  • a

    Andreas Nigg

    10/28/2022, 1:20 PM
    Hey guys, I've a question for all the dbt data model masterminds out there (I hope this is the right channel). I’ve a rather huge source table which is partitioned by a column “loaded_at”. I have an incremental model which reads from this source table. To exclude source table partitions to read, I could make use of the _dbt_max_partitions scripting variable - something like below
    {% if is_incremental() %}
        where loaded_at>= coalesce(_dbt_max_partition, '2022-01-01')
      {% else %}
    But, the problem is, that in my incremental model I do not partition by “loaded_at” but by a different column (due to use-case demands). So _dbt_max_partition would not help here, as it would simply return the maximum partition value of the model (which I can’t use as filter for the source table). In “native” BigQuery I would simply use a scripting variable as follows
    declare max_source_partition timestamp;
    set max_source_partition = (select max(loaded_at) as ts from `my_model_table`);
    select * from `my_source_table` where loaded_at > max_source_partition
    How can one implement such a scenario with dbt? Is there a way to create scripting variables as part of my models? Or do I need to add it as a on-start-hook? Or any better strategies to exclude partitions in my source without having the same column as partition field in my model?
    k
    • 2
    • 1
  • m

    Mark

    11/01/2022, 9:00 AM
    Is there a way to run a flow manually? without the cloud or ui
    ✅ 1
    t
    • 2
    • 2
  • b

    Boris Tseytlin

    11/02/2022, 4:34 PM
    Hello! I have a very basic case: I want to list all folders in a minio (s3) bucket. Then make parallel tasks that count the number of files in each folder. Can I achieve that via blocks? I see that the S3 block only has
    get_folder
    which will download the whole storage to my local drive. I don’t want that. I see two alternative ways: • Make an S3 block, use that. • Make a secret block, in my code set up a minio connection, use that. Which one is better?
    ✅ 1
    r
    • 2
    • 5
  • j

    Jaafar

    11/03/2022, 9:30 AM
    Hello, just started using Prefect. I tested the flow locally, configured the cloud UI and the storage block and then installed prefect on a GCP VM to take care of the execution. For now, the trick I found to not have the process stop when I quit the ssh session is using screen. It’s working fine. That being said, I have a few questions: • Using screen to detach the process from the terminal feels to me more like a hack rather than a long term solution. What would be a better way of having a prefect agent run in production on a VM? • Would you have an example that shows how to use the process block? I am not sure to understand its usage and how it gets connected to the VM. Could it be used to lunch a virtual environment or install the packages listed in a requirements.txt file for example? Thanks for your help!
    c
    r
    • 3
    • 7
  • b

    Boris Tseytlin

    11/03/2022, 5:44 PM
    I am running a test flow in docker container with image
    pytorch/pytorch
    . It fails because prefect is not installed
    17:43:10.947 | INFO    | prefect.infrastructure.docker-container - Pulling image 'pytorch/pytorch'...
    17:43:12.610 | INFO    | prefect.infrastructure.docker-container - Creating Docker container 'prudent-goshawk'...
    17:43:12.679 | INFO    | prefect.infrastructure.docker-container - Docker container 'prudent-goshawk' has status 'created'
    17:43:12.934 | INFO    | prefect.agent - Completed submission of flow run '64e545de-27ff-4d51-85d6-cec4d8eb0afe'
    17:43:12.949 | INFO    | prefect.infrastructure.docker-container - Docker container 'prudent-goshawk' has status 'running'
    /opt/conda/bin/python: Error while finding module specification for 'prefect.engine' (ModuleNotFoundError: No module named 'prefect')
    Adding prefect to the container block extra pip packages doesnt help:
    {
      "EXTRA_PIP_PACKAGES": "prefect"
    }
    ✅ 1
    m
    b
    • 3
    • 6
  • s

    Stephen Thibeault

    11/08/2022, 1:45 PM
    Hey team! I am trying to run flows off of an agent on a windows server and am getting this error while starting a flow run: "08:32:00.000 | INFO | prefect.agent - Completed submission of flow run 'a441fbfb-26bb-481a-8d87-d992532e38a8' 'C:\Program' is not recognized as an internal or external command, operable program or batch file." I know windows support is under development, does anyone know what this issue could be?
    ✅ 1
    t
    r
    h
    • 4
    • 4
  • d

    Dan Wise

    11/08/2022, 3:35 PM
    Hi, when creating a deployment in Prefect v2 in Python code using parameters, I cannot seem to get these to register correctly when using a pydantic model as the argument to the flow. Does anyone know of any workaround? If I use normal flow keyword arguments then the deployment registers the parameters correctly.
    k
    • 2
    • 1
  • k

    Khyaati Jindal

    11/29/2022, 6:39 AM
    Hi, I am facing a issue using cli command to login into my prefect cloud for my docker yaml file. This is how my yaml file looks like.
    RUN apt update ; apt upgrade -y
    WORKDIR /project_dir
    COPY requirements.txt .
    RUN pip3 install -r requirements.txt
    RUN prefect cloud login -k <key> --workspace <my_workspace_name>
    COPY . .
    CMD [ "python3", "deployment.py"]
    I am deploying this dockers image using github actions, but the image building fails, because , to me it seems like, it is expecting a input , coz I face the following build error
    Step 6/8 : RUN prefect cloud login -k <key> --workspace <workspace>
    
     ---> Running in a1f954737f0f
    Creating a profile for this Prefect Cloud login. Please specify a profile name: Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
        return fn(*args, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
        return run_async_in_new_loop(async_fn, *args, **kwargs)
    An exception occurred.
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
        return anyio.run(partial(__fn, *args, **kwargs))
      File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
        return asynclib.run(func, *args, **backend_options)
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
        return native_run(wrapper(), debug=debug)
      File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
        return future.result()
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
        return await func(*args)
      File "/usr/local/lib/python3.10/site-packages/prefect/cli/cloud.py", line 209, in login
        cloud_profile_name = app.console.input(
      File "/usr/local/lib/python3.10/site-packages/rich/console.py", line 2102, in input
        result = input()
    EOFError: EOF when reading a line
    The command '/bin/sh -c prefect cloud login -k <key> --workspace <workspace> ' returned a non-zero code: 1
    make: *** [Makefile:16: docker-build] Error 1
    
    Error: Process completed with exit code 2.
    t
    • 2
    • 3
  • a

    Anders Smedegaard Pedersen

    12/01/2022, 8:34 AM
    Is there no SFTP task in V2?
    k
    • 2
    • 1
  • z

    Zack

    12/01/2022, 3:44 PM
    Is there a comparable orchestration tool in AWS and what would be the advantages prefect has over it? Resources would be great.
    p
    a
    +2
    • 5
    • 18
  • z

    Zack

    12/02/2022, 12:52 AM
    Can Prefect run AWS glue?
    ✅ 2
    a
    • 2
    • 6
  • j

    Jon Martin

    12/02/2022, 8:07 PM
    Hey y'all, I've run into this issue where my task will be created & submitted and then... nothing. It's just stuck in Pending. I'm trying to bring Prefect to my company for data engineering, and I really need to figure this out to move forward. Thanks! 14:00:11.934 | INFO | Flow run 'dazzling-lorikeet' - Created task run 'read_file-7f2c7640-0' for task 'read_file' 14:00:11.935 | INFO | Flow run 'dazzling-lorikeet' - Submitted task run 'read_file-7f2c7640-0' for execution. I can read the file if I copy-paste the code into the flow, so I know it's not an issue there. UPDATE: With no flows running, somehow my concurrency-tag said that it was full (3/3 active tasks), so this task never made it into the queue.
    ✅ 1
    r
    • 2
    • 1
  • j

    Jason Ma

    12/02/2022, 10:44 PM
    hello all! 👋 new prefect user here. We’re still trying to learn the tips n tricks surrounding prefect, one question I had was how can we manually stop flow runs via the UI? I might be missing something, but I can’t seem to find it on the UI
    ✅ 1
    a
    j
    • 3
    • 2
  • t

    Tomas Moreno

    12/08/2022, 8:32 PM
    is there any easy way to schedule a flow for say every 30m at the beginning of the day, and then when the flow succeeds once skip the rest of the runs for that day?
    ✅ 1
    m
    • 2
    • 4
  • d

    Dang Khoi Vo

    12/21/2022, 4:35 PM
    Hello community 👋, I have a personal project in mind that i want to orchestrate a 😛ython: python script to run once daily on Prefect 😛refect:. I want to stick to best practice as possible, and i read that you shouldn’t be executing processes on the same instances as your orchestrator. Does it make sense for prefect to call :gcp: GCP Cloud function to execute my python script? Im open to ideas!
    👋 4
    a
    t
    a
    • 4
    • 4
  • n

    Nicolas Gastaldi

    01/02/2023, 8:49 PM
    Hi community, what would be a suggested/recommended setup to be used with azure as host and Prefect as keystone ; about 10 python scripts are executed daily 8 are lightweight and 2 of them do a lot of heavy lifting , they are changed with lots of frequency so maintenance overall cant be too complicated but for cost optimization some sort of elasticity would be most appreciated as the requirements of the heavy scripts are 10x the small scripts hardware reqs. thanks a bunch in advance
    ✅ 1
    b
    • 2
    • 4
Powered by Linen
Title
n

Nicolas Gastaldi

01/02/2023, 8:49 PM
Hi community, what would be a suggested/recommended setup to be used with azure as host and Prefect as keystone ; about 10 python scripts are executed daily 8 are lightweight and 2 of them do a lot of heavy lifting , they are changed with lots of frequency so maintenance overall cant be too complicated but for cost optimization some sort of elasticity would be most appreciated as the requirements of the heavy scripts are 10x the small scripts hardware reqs. thanks a bunch in advance
✅ 1
b

Bianca Hoch

01/06/2023, 7:23 PM
Hi Nicolas, I'm not terribly familiar with Azure, but I can point you in the direction for tutorials and guides for Creating a Prefect 2.0 PoC on Azure.
In regard to the flows changing with high frequency, here are some resources for CI/CD that may be of use as well.
I'm sure that others who are more familiar with Azure will be able to share their thoughts here as well. 😄
n

Nicolas Gastaldi

01/06/2023, 9:07 PM
Hi Bianca , thank you so much for the insight, i will look into it ; i made a basic poc using a single vm and docker , it worked very well for my requirements so i wanted to give it a try on Prefect when it gets real “firepower” 😄 ; hence the question, if VM strategy or ScaleSets or the Container CI as suggested in the tutorials would be best fight for heavy workload thanks again for the directions , i will look into it
🙌 1
🚀 1
View count: 1