https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
data-tricks-and-tips
  • l

    Lucas Cavalcanti Rodrigues

    09/09/2022, 9:17 PM
    Hi, everyone. How can I run a flow several times using different parameters? I'm trying to use create_flow_run.map like this:
    dump_to_gcs_flow = create_flow_run.map(
                flow_name=unmapped(utils_constants.FLOW_DUMP_TO_GCS_NAME.value),
                project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value),
                parameters=tables_to_zip,
                labels=unmapped(current_flow_labels),
                run_name=unmapped("Dump to GCS"),
            )
    except
    parameters
    all the other arguments are constants, so I use
    unmapped
    on them.
    tables_to_zip
    is a list of dictionaries containing the parameters values for each table to be zip. However, this didn't work. I'm currently receiving the error:
    prefect.exceptions.ClientError: [{'message': 'parsing UUID failed, expected String, but encountered Array', 'locations': [{'line': 2, 'column': 5}], 'path': ['flow_run'], 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
    what am I doing wrong here?
    r
    • 2
    • 1
  • c

    Chern Hong Poh

    10/12/2022, 10:08 AM
    message has been deleted
    ✅ 1
    m
    • 2
    • 1
  • k

    Khyaati Jindal

    10/18/2022, 10:28 AM
    I can see a GUI option to delete a flow, but is there a way to bulk delete flow ? When ever my agent goes down, on my vm, the flows get stacked and when I rerun the agent, all the stacked flows get executed at the same time. I have to manually delete all flows from prefect cloud, can we do this in CLI or python code ?
    ✅ 1
    j
    • 2
    • 2
  • s

    Stephen Thibeault

    10/27/2022, 8:06 PM
    Hey team! When using Prefect cloud for deployment with a GitHub storage block, I keep getting this error and it seems to be causing the flow to be unable to run. Does anyone know what the issue may be? Block document has schema checksum sha256:196e3675d678bda766b96c223c5dd5d87223f556150ce5e6 3f55f0775feec972 which does not match the schema checksum for class 'GitHub'. This indicates the schema has changed and this block may not load.
    j
    p
    • 3
    • 34
  • a

    Andreas Nigg

    10/28/2022, 1:20 PM
    Hey guys, I've a question for all the dbt data model masterminds out there (I hope this is the right channel). I’ve a rather huge source table which is partitioned by a column “loaded_at”. I have an incremental model which reads from this source table. To exclude source table partitions to read, I could make use of the _dbt_max_partitions scripting variable - something like below
    {% if is_incremental() %}
        where loaded_at>= coalesce(_dbt_max_partition, '2022-01-01')
      {% else %}
    But, the problem is, that in my incremental model I do not partition by “loaded_at” but by a different column (due to use-case demands). So _dbt_max_partition would not help here, as it would simply return the maximum partition value of the model (which I can’t use as filter for the source table). In “native” BigQuery I would simply use a scripting variable as follows
    declare max_source_partition timestamp;
    set max_source_partition = (select max(loaded_at) as ts from `my_model_table`);
    select * from `my_source_table` where loaded_at > max_source_partition
    How can one implement such a scenario with dbt? Is there a way to create scripting variables as part of my models? Or do I need to add it as a on-start-hook? Or any better strategies to exclude partitions in my source without having the same column as partition field in my model?
    k
    • 2
    • 1
  • m

    Mark

    11/01/2022, 9:00 AM
    Is there a way to run a flow manually? without the cloud or ui
    ✅ 1
    t
    • 2
    • 2
  • b

    Boris Tseytlin

    11/02/2022, 4:34 PM
    Hello! I have a very basic case: I want to list all folders in a minio (s3) bucket. Then make parallel tasks that count the number of files in each folder. Can I achieve that via blocks? I see that the S3 block only has
    get_folder
    which will download the whole storage to my local drive. I don’t want that. I see two alternative ways: • Make an S3 block, use that. • Make a secret block, in my code set up a minio connection, use that. Which one is better?
    ✅ 1
    r
    • 2
    • 5
  • j

    Jaafar

    11/03/2022, 9:30 AM
    Hello, just started using Prefect. I tested the flow locally, configured the cloud UI and the storage block and then installed prefect on a GCP VM to take care of the execution. For now, the trick I found to not have the process stop when I quit the ssh session is using screen. It’s working fine. That being said, I have a few questions: • Using screen to detach the process from the terminal feels to me more like a hack rather than a long term solution. What would be a better way of having a prefect agent run in production on a VM? • Would you have an example that shows how to use the process block? I am not sure to understand its usage and how it gets connected to the VM. Could it be used to lunch a virtual environment or install the packages listed in a requirements.txt file for example? Thanks for your help!
    c
    r
    • 3
    • 7
  • b

    Boris Tseytlin

    11/03/2022, 5:44 PM
    I am running a test flow in docker container with image
    pytorch/pytorch
    . It fails because prefect is not installed
    17:43:10.947 | INFO    | prefect.infrastructure.docker-container - Pulling image 'pytorch/pytorch'...
    17:43:12.610 | INFO    | prefect.infrastructure.docker-container - Creating Docker container 'prudent-goshawk'...
    17:43:12.679 | INFO    | prefect.infrastructure.docker-container - Docker container 'prudent-goshawk' has status 'created'
    17:43:12.934 | INFO    | prefect.agent - Completed submission of flow run '64e545de-27ff-4d51-85d6-cec4d8eb0afe'
    17:43:12.949 | INFO    | prefect.infrastructure.docker-container - Docker container 'prudent-goshawk' has status 'running'
    /opt/conda/bin/python: Error while finding module specification for 'prefect.engine' (ModuleNotFoundError: No module named 'prefect')
    Adding prefect to the container block extra pip packages doesnt help:
    {
      "EXTRA_PIP_PACKAGES": "prefect"
    }
    ✅ 1
    m
    b
    • 3
    • 6
  • s

    Stephen Thibeault

    11/08/2022, 1:45 PM
    Hey team! I am trying to run flows off of an agent on a windows server and am getting this error while starting a flow run: "08:32:00.000 | INFO | prefect.agent - Completed submission of flow run 'a441fbfb-26bb-481a-8d87-d992532e38a8' 'C:\Program' is not recognized as an internal or external command, operable program or batch file." I know windows support is under development, does anyone know what this issue could be?
    ✅ 1
    t
    r
    h
    • 4
    • 4
  • d

    Dan Wise

    11/08/2022, 3:35 PM
    Hi, when creating a deployment in Prefect v2 in Python code using parameters, I cannot seem to get these to register correctly when using a pydantic model as the argument to the flow. Does anyone know of any workaround? If I use normal flow keyword arguments then the deployment registers the parameters correctly.
    k
    • 2
    • 1
  • k

    Khyaati Jindal

    11/29/2022, 6:39 AM
    Hi, I am facing a issue using cli command to login into my prefect cloud for my docker yaml file. This is how my yaml file looks like.
    RUN apt update ; apt upgrade -y
    WORKDIR /project_dir
    COPY requirements.txt .
    RUN pip3 install -r requirements.txt
    RUN prefect cloud login -k <key> --workspace <my_workspace_name>
    COPY . .
    CMD [ "python3", "deployment.py"]
    I am deploying this dockers image using github actions, but the image building fails, because , to me it seems like, it is expecting a input , coz I face the following build error
    Step 6/8 : RUN prefect cloud login -k <key> --workspace <workspace>
    
     ---> Running in a1f954737f0f
    Creating a profile for this Prefect Cloud login. Please specify a profile name: Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
        return fn(*args, **kwargs)
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
        return run_async_in_new_loop(async_fn, *args, **kwargs)
    An exception occurred.
      File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
        return anyio.run(partial(__fn, *args, **kwargs))
      File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
        return asynclib.run(func, *args, **backend_options)
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
        return native_run(wrapper(), debug=debug)
      File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
        return future.result()
      File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
        return await func(*args)
      File "/usr/local/lib/python3.10/site-packages/prefect/cli/cloud.py", line 209, in login
        cloud_profile_name = app.console.input(
      File "/usr/local/lib/python3.10/site-packages/rich/console.py", line 2102, in input
        result = input()
    EOFError: EOF when reading a line
    The command '/bin/sh -c prefect cloud login -k <key> --workspace <workspace> ' returned a non-zero code: 1
    make: *** [Makefile:16: docker-build] Error 1
    
    Error: Process completed with exit code 2.
    t
    • 2
    • 3
  • a

    Anders Smedegaard Pedersen

    12/01/2022, 8:34 AM
    Is there no SFTP task in V2?
    k
    • 2
    • 1
  • z

    Zack

    12/01/2022, 3:44 PM
    Is there a comparable orchestration tool in AWS and what would be the advantages prefect has over it? Resources would be great.
    p
    a
    +2
    • 5
    • 18
  • z

    Zack

    12/02/2022, 12:52 AM
    Can Prefect run AWS glue?
    ✅ 2
    a
    • 2
    • 6
  • j

    Jon Martin

    12/02/2022, 8:07 PM
    Hey y'all, I've run into this issue where my task will be created & submitted and then... nothing. It's just stuck in Pending. I'm trying to bring Prefect to my company for data engineering, and I really need to figure this out to move forward. Thanks! 14:00:11.934 | INFO | Flow run 'dazzling-lorikeet' - Created task run 'read_file-7f2c7640-0' for task 'read_file' 14:00:11.935 | INFO | Flow run 'dazzling-lorikeet' - Submitted task run 'read_file-7f2c7640-0' for execution. I can read the file if I copy-paste the code into the flow, so I know it's not an issue there. UPDATE: With no flows running, somehow my concurrency-tag said that it was full (3/3 active tasks), so this task never made it into the queue.
    ✅ 1
    r
    • 2
    • 1
  • j

    Jason Ma

    12/02/2022, 10:44 PM
    hello all! 👋 new prefect user here. We’re still trying to learn the tips n tricks surrounding prefect, one question I had was how can we manually stop flow runs via the UI? I might be missing something, but I can’t seem to find it on the UI
    ✅ 1
    a
    j
    • 3
    • 2
  • t

    Tomas Moreno

    12/08/2022, 8:32 PM
    is there any easy way to schedule a flow for say every 30m at the beginning of the day, and then when the flow succeeds once skip the rest of the runs for that day?
    ✅ 1
    m
    • 2
    • 4
  • d

    Dang Khoi Vo

    12/21/2022, 4:35 PM
    Hello community 👋, I have a personal project in mind that i want to orchestrate a 😛ython: python script to run once daily on Prefect 😛refect:. I want to stick to best practice as possible, and i read that you shouldn’t be executing processes on the same instances as your orchestrator. Does it make sense for prefect to call :gcp: GCP Cloud function to execute my python script? Im open to ideas!
    👋 4
    a
    t
    a
    • 4
    • 4
  • n

    Nicolas Gastaldi

    01/02/2023, 8:49 PM
    Hi community, what would be a suggested/recommended setup to be used with azure as host and Prefect as keystone ; about 10 python scripts are executed daily 8 are lightweight and 2 of them do a lot of heavy lifting , they are changed with lots of frequency so maintenance overall cant be too complicated but for cost optimization some sort of elasticity would be most appreciated as the requirements of the heavy scripts are 10x the small scripts hardware reqs. thanks a bunch in advance
    ✅ 1
    b
    • 2
    • 4
  • d

    Darren Liu

    01/25/2023, 3:05 AM
    Hi prefectionists, first of all great work on a promising product! I am looking for a solution rec for this use case. Suppose there are events that starts at unknown times. Once it is detected that an event has started a flow is looped with some persistent state until it's detected that the event has ended. There can be multiple events happening at once, so there can be multiple loops but only as many as there are events. Therefore, only one loop for each event, as there are no safeguards for multiple loops working on the same event with the same state and inserting identical data. Normally, I would have a repeating event monitoring job that detects the start of events, publish the job with initial state into a work queue, and job consumers that carry out the job, and re-publish into the queue once completed with latest state so the next iteration can begin. The same job can detect end of event and complete without publishing back into the queue. What would be the ideal setup using prefect? thanks in advance for reading the long rant!
    p
    • 2
    • 6
  • i

    Ishan Anilbhai Koradiya

    01/31/2023, 5:33 AM
    Hi community, I just want to undertand is there a way I can interact with prefect apis (not the cloud version) to handle multi-tenancy ? Can I create workspaces in the open source using the prefect apis ?
  • y

    Yusuf

    02/01/2023, 7:27 AM
    Is there a workaround to do
    prefect cloud login
    without verifying ssl certs? I'm getting an ssl, unable to find local issuer cert error. I can resolve it with IT it'll just take a week and a bit to go through the whole process. I'm just prototyping some stuff. I checked here: https://discourse.prefect.io/t/how-to-disable-the-ssl-verification-when-setting-up-a-pr[…]verify-failed-unable-to-get-local-issuer-certificate/597/2 But the solutions didn't seem to work. I'm using windows btw
  • y

    Yusuf

    02/01/2023, 8:13 AM
    Also if I do put in a ticket with IT what domain do I need to whitelist *.prefect.cloud ?
  • h

    Haotian Li

    02/08/2023, 9:09 AM
    Hi Prefect Team, first a thank you to the wonderful job you have done creating this platform! I have a question about heterogeneous flow run: 1. Is there any way for a user to create a flow that runs on different type of hardware in the queue? I need to create a flow that partially run on CPU kubernetes queue and partially on GPU enabled nodes. There doesn't seem to be a way to do this and each flow can only run on pure CPU or all with GPU. 2. If not, what's the best practice for doing this? Should I break a flow into separate subflows and deploy them separately and run each step by hand? Or can I trigger this automatically by having a task run other deployed flow in a different queue. 3. Is there any plan to add per task tagging system for agents so when an agent pulls from a queue it only pulls tasks with a certain tag
    t
    r
    • 3
    • 13
  • e

    Evan Curtin

    02/08/2023, 5:14 PM
    Is this a supported pattern in prefect 2? • Task A generates some output • Use a remote storage for persistence • Task B retrieves output from storage I want to be able to: • Control the filename in azure blob store (right now prefect gives it some random nonsense name) • If the file already exists, skip task A
  • s

    Stephen

    02/09/2023, 1:45 AM
    Hi - its been toted that prefect could be used to run arbitrary workflows in languages other than python (via docker or aome other means). Maybe it's a rare use case but I haven't seen an example of this. Are there any toy examples around?
    👍 1
    a
    • 2
    • 1
  • j

    Jacob Bedard

    02/13/2023, 10:27 PM
    I'm just upgrading to prefect 2.0 and I' finding the secrets don't work when I try to get() them. There's a post in the community forum, but it's not resolved. Has anyone run into this problem
    AttributeError: 'coroutine' object has no attribute 'get'
    and resolved it?
    ✅ 1
    j
    • 2
    • 3
  • r

    Ravi

    02/21/2023, 11:49 AM
    Hi all, I am curious if there is a method in prefect 2 to allow for manual reading of persisted results files. I see that PersistedResult object has a .get() method, however from what I gather this is meant to be used behind the scenes. If I can use it manually, could someone show an example?
  • k

    Kelvin DeCosta

    02/22/2023, 9:38 AM
    Hey everyone! Just wanted to share something I found interesting. When submitting many
    async
    tasks, either via
    .submit
    or
    .map
    , I've found that using a
    SequentialTaskRunner
    causes the flow to run much faster than using
    ConcurrentTaskRunner
    . I'd like to know more about this behavior. Any feedback is appreciated!
    p
    • 2
    • 4
Powered by Linen
Title
k

Kelvin DeCosta

02/22/2023, 9:38 AM
Hey everyone! Just wanted to share something I found interesting. When submitting many
async
tasks, either via
.submit
or
.map
, I've found that using a
SequentialTaskRunner
causes the flow to run much faster than using
ConcurrentTaskRunner
. I'd like to know more about this behavior. Any feedback is appreciated!
p

Peyton Runyan

02/22/2023, 4:58 PM
Interesting! I'll give it a look in just a sec and see what's going on there. The
SequentialTaskRunner
currently submits asynchronously, not sequentially, on map. That's going to be changed soon. But I don't know why it would be faster
✅ 1
You can see the two submit methods here: https://github.com/PrefectHQ/prefect/blob/62c8a59e45561ed5194f1451934f0af0afbd9aba/src/prefect/task_runners.py#L200 https://github.com/PrefectHQ/prefect/blob/62c8a59e45561ed5194f1451934f0af0afbd9aba/src/prefect/task_runners.py#L248 It looks like there's just less setup overhead
✅ 1
Do you have an example of the async tasks running faster using
.submit
? I can't reproduce this
k

Kelvin DeCosta

02/23/2023, 7:18 AM
Hey @Peyton Runyan I mean to say that the same flow code, which has
async
tasks with
.submit
or
.map
, seems to be faster when using a
SequentialTaskRunner
instead of the default.
@task(
    name="Greet",
    task_run_name="{name}",
)
async def greet(name: str) -> str:
    message = f"Hello, {name}!"

    await asyncio.sleep(2)

    print(message)

    return message

@flow(
    name="Multi Greeter",
    task_runner=SequentialTaskRunner(),
)
async def multi_greeter():
    names = [
        "Kelvin",
        "Peyton",
    ]
    
    message_states = await greet.map(name=names, return_state=True)

    messages = await asyncio.gather(
        *[state.result(fetch=True) for state in message_states if state.is_completed()]
    )
@flow(name="Main")
def main():
    multi_greeter()

if __name__ == "__main__":
    main()
Running this locally, with and without
SequentialTaskRunner
, produces runs with different run durations. Adding more
names
makes this difference larger
View count: 2