https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    aaron

    06/10/2022, 5:17 PM
    quick question on the Prefect-maintained Orion collections — are there plans to port the 1.0 Jupyter task to Orion?
    ✅ 1
    k
    • 2
    • 1
  • a

    Andreas Nigg

    06/10/2022, 6:17 PM
    Hey, quick question, prefect 2.0: I can't figure out how to set time to live or successful/failed job history limits for k8s jobs which were spawned from a kubernetes orion agent. Everything works fine, but I've got hundreds of completed/failed jobs (and corresponding pods) lying around. Is there a way to configure job history limits, or configure automatic cleanup? (Now that I think about it, I could also use a prefect flow to clean up, but maybe that's too much 😄 )
    ✅ 1
    k
    • 2
    • 1
  • c

    Christopher Haack

    06/10/2022, 7:55 PM
    Prefect 2.0 beginner here- Is there a way to retry a subflow in prefect 2.0 upon failure? Or are retries only reserved for tasks?
    ✅ 1
    a
    • 2
    • 2
  • b

    Boggdan Barrientos

    06/10/2022, 8:31 PM
    Hi, its possible naming a schedule of a flow in cloud? I run many schedules of the flow with different parameters but I would like named it to difference each other.
    k
    • 2
    • 1
  • p

    Parth Patel

    06/10/2022, 9:42 PM
    Hello, we’ve been running Prefect for some our jobs for a while now (3 months) and usually runs go through successfully, in our PROD environment every job has been getting this error since yesterday: “No heartbeat detected from the remote task; marking the run as failed.” Any thoughts on how I should go about debugging this?
    k
    • 2
    • 1
  • y

    Yehor Anisimov

    06/11/2022, 5:41 AM
    Hi, trying to run DbtShellTask using command "dbt run" but get such kind of error:
    [2022-06-11 08:35:49+0300] ERROR - prefect.CloudTaskRunner | Task 'DbtShellTask': Exception encountered during task execution!
    Traceback (most recent call last):
      File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\engine\task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\utilities\executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\utilities\tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\tasks\dbt\dbt.py", line 192, in run
        return super(DbtShellTask, self).run(
      File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\utilities\tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\tasks\shell.py", line 145, in run
        line = raw_line.decode("utf-8").rstrip()
    UnicodeDecodeError: 'utf-8' codec can't decode byte 0xad in position 7: invalid start byte
    Any ideas what is the issue?
    ✅ 1
    a
    k
    • 3
    • 3
  • m

    Massimiliano Fanciulli

    06/12/2022, 10:21 AM
    Hello, i'm stuck in the following issue setting up Prefect + Dask cluster. When the lfow is executed multiple tasks are run on the Dask Worker. When it run i get an exception (see thread)
    a
    • 2
    • 19
  • v

    Volker L

    06/12/2022, 11:06 AM
    Hello, I have an async function, which fetches some data from an REST API (using aiohttp). The task returns a pandas DataFrame. A second function saves the pandas DataFrame as a parquet file into a s3 bucket. Running this functions in python is really fast (<1s). However, if I wrap both functions into prefect tasks and place them into a prefect flow, the flow execution takes up to 5 minutes. I do not understand, why the pure python solution os so much fast, compared to when using prefect. I have posted some sample code inside of the thread.
    ✅ 1
    a
    • 2
    • 6
  • m

    mahmoud elhalwany

    06/12/2022, 11:47 AM
    hello -- i have installed prefect-2 but I got this error
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/cli/base.py", line 59, in wrapper
        return fn(*args, **kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in wrapper
        return run_async_in_new_loop(async_fn, *args, **kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 67, in run_async_in_new_loop
        return anyio.run(partial(__fn, *args, **kwargs))
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/anyio/_core/_eventloop.py", line 56, in run
        return asynclib.run(func, *args, **backend_options)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 233, in run
        return native_run(wrapper(), debug=debug)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
        return loop.run_until_complete(main)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
        return future.result()
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
        return await func(*args)
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/cli/base.py", line 162, in version
        if is_ephemeral:
    UnboundLocalError: local variable 'is_ephemeral' referenced before assignment
    An exception occurred.
    a
    • 2
    • 2
  • j

    Joshua Greenhalgh

    06/12/2022, 1:48 PM
    Hi - just wondering if anyone knows of a way to point studio.apollographql - at the cloud api? It has nice query building and introspection features I would like to use? Not sure how I would do the authentication?
    ✅ 1
    a
    • 2
    • 3
  • d

    Dung Khuc

    06/13/2022, 8:04 AM
    Hello again, do agents work with Azure's Storage Blob Data Reader privilege to pull Flow from Azure Blob Storage?
    ✅ 1
    a
    • 2
    • 2
  • j

    Joshua Greenhalgh

    06/13/2022, 10:40 AM
    Could someone help me understand what is happening with this flow_run?
    063bd788-bc0c-46f5-85e4-1092c2b19297
    ✅ 1
    a
    • 2
    • 10
  • p

    Priyank

    06/13/2022, 11:14 AM
    Hi all, how can I skip all the tasks in the flow if there is holiday on that day or any particular date? Assuming I already have a list of holidays. What can be the best approach?
    ✅ 1
    a
    • 2
    • 3
  • i

    Izu

    06/13/2022, 11:28 AM
    Hi all, is there a way I can specify a task to run only once within a flow? I have noticed some of my tasks run concurrently, this is an issue when I am writing to a table
    ✅ 1
    a
    • 2
    • 8
  • a

    Anna Geller

    06/13/2022, 11:38 AM
    Hi everyone, Whenever you post a question, please let us know whether your question is about Prefect 2.0 or Prefect 1.0. Otherwise, we all have to waste time figuring out which product you are asking about- that's negative engineering - let's eliminate it. Thanks a lot!
    👍 2
  • j

    Jelle Vegter

    06/13/2022, 1:36 PM
    Hi all, I'm switching to DockerRun (image on Azure container registry) and GitStorage (github) on Prefect 2.0. I'm running into an ValueError error when attempting to run a flow. It doesn't throw an error on a specific task so I'm not sure where the problem lays. My runs agent on a VM that is authenticated with "docker login myregistry.azurecr.io". The agent shows that, for the run, the image was succesfully pulled. Any pointers where the issue could be? Thanks!
    ✅ 1
    a
    • 2
    • 7
  • o

    Oscar Krantz

    06/13/2022, 2:29 PM
    Hi. Unclear if this is the correct forum for this so please correct me if not. I'm evaluating prefect as a long-term solution for my company, and from a cursory glance prefect 1.0 looks like a perfect fit, with the one exception of the dependency on docker which we're not thrilled about. I tested a copy of 2.0 locally and whilst the solution works well the UI leaves a bit to be desired compared to 1.0. Is the plan to completely replace the 1.0 functionality with 2.0 in the future? I understand that DAGs are now out and so no more schematic diagrams and such in the future
    ✅ 1
    k
    a
    • 3
    • 4
  • r

    Renuka

    06/13/2022, 2:59 PM
    By when can we expect to have databricks notebook triggering feature in prefect 2.0 ?
    ✅ 1
    k
    • 2
    • 1
  • y

    yu zeng

    06/13/2022, 3:20 PM
    ModuleNotFoundError("No module named '/root/'") , is there anybody meet thish prob. when run. a flow on. k8s agent ?
    ✅ 1
    k
    a
    • 3
    • 3
  • b

    Brian Phillips

    06/13/2022, 5:44 PM
    Can I scale an (1.0) agent to 3x replicas with the same service account / api key? Or does each replica need its own service account and api key?
    ✅ 1
    k
    • 2
    • 10
  • m

    Mary Clair Thompson

    06/13/2022, 7:08 PM
    Apologies if this has already been answered--but in the 'old' Prefect we started up agents with the -p flag so that we can point them to local modules that our flows needed to import. This functionality seems to have gone away with Orion. Is there some analogous way to indicate to the agent the location of local files that need to be imported? For reference, the short stack trace I'm getting is
    Flow could not be retrieved from deployment.
    Traceback (most recent call last):
    File "/tmp/flow-script-calculate-blob-size4cemrmn_.py", line 1, in <module>
    ModuleNotFoundError: No module named 'local_module'
    ✅ 1
    k
    a
    • 3
    • 13
  • s

    Slackbot

    06/13/2022, 7:17 PM
    This message was deleted.
    k
    b
    • 3
    • 4
  • m

    Mitchell Bregman

    06/13/2022, 8:01 PM
    Hi there - I am running into an issue with the
    DbtShellTask
    ; locally works all as expected. when i deploy to prefect cloud, I am getting a
    dbt: command not found
    .. requirements include
    dbt-redshift
    . My flow storage is Docker. when i try to build the Docker image locally, inside the container I also get
    dbt: command not found
    even though it is installed in my python environment packages. Has anyone run into this before?
    ✅ 1
    👀 1
    m
    a
    l
    • 4
    • 14
  • k

    Ken Nguyen

    06/13/2022, 11:15 PM
    Is there a way to make a state handler do something different if a specific task fails? For example, I have a chain of tasks that depend on each other:
    a -> b ->c
    I want it so that if
    a
    and
    b
    fails, it sends a Slack notifications saying “Flow failed”. But if
    c
    fails, I want it alter the Slack message by adding b’s output
    k
    • 2
    • 30
  • w

    Wei Mei

    06/14/2022, 12:32 AM
    Hi! Is there a difference between placing a Parameter() before and after the flow context?
    k
    • 2
    • 3
  • m

    Mitchell Bregman

    06/14/2022, 1:58 AM
    Follow up to the above thread - we are running into an issue now with connecting dbt to our Redshift warehouse in Cloud... have verified that all the environment variables are working correctly, do we need to install anything explicitly in our Docker image?
    01:13:51  Running with dbt=1.1.0
    01:13:51  Partial parse save file not found. Starting full parse.
    01:13:52  Found 9 models, 75 tests, 0 snapshots, 0 analyses, 196 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
    01:13:52
    01:13:52  Encountered an error:
    Database Error
      could not connect to server: No such file or directory
      	Is the server running locally and accepting
      	connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5439"?
    ✅ 1
    a
    m
    • 3
    • 5
  • m

    Michelle Brochmann

    06/14/2022, 3:10 AM
    Hi, I have a question about subclassing the
    Result
    object. I am doing operations on spark
    DataFrames
    in
    Tasks
    and would like to pass a
    DataFrame
    from
    Task
    to
    Task
    via the
    Result
    object. If the
    DataFrame
    is too large to fit in memory simply passing a custom serializer won’t work. So I am thinking I could create a subclass of
    Result
    called
    SparkDataFrameResult
    where the
    read
    and
    write
    methods are overridden to use the spark
    load
    and
    save
    methods. 1. Is there any reason this wouldn’t work or wouldn’t be recommended? 2. Are there any best practices for doing something like this?
    ✅ 1
    a
    k
    • 3
    • 26
  • w

    William Jamir

    06/14/2022, 9:01 AM
    Hi, How can I execute a map inside a task? Something like this (code in thread):
    ✅ 1
    a
    • 2
    • 6
  • s

    Satnam Singh

    06/14/2022, 9:22 AM
    Hey Prefect team I am using edge function in my prefect work flow here is code
    ✅ 1
    a
    • 2
    • 13
  • f

    Florian Guily

    06/14/2022, 10:12 AM
    Hey, i was wondering if it is possible to use a map inside an apply map
    a
    k
    • 3
    • 20
Powered by Linen
Title
f

Florian Guily

06/14/2022, 10:12 AM
Hey, i was wondering if it is possible to use a map inside an apply map
in prefect 1
a

Anna Geller

06/14/2022, 11:15 AM
apply_map is used for more complex mapped tasks, usually involving conditional branching but it's just a normal function that gets applied over some list of inputs and thus creates a task - in theory, this normal function could call map with .run() but I wouldn't recommend that, there is certainly a better way (btw you don't need any of that in Prefect 2.0 so it may be easier to migrate later if you keep things simple) can you explain what you are trying to do? talking about features without context is rarely helpful 😁
k

Kevin Kho

06/14/2022, 1:50 PM
This is not, I think that results in a two stage mapping, which you can’t do
f

Florian Guily

06/15/2022, 8:44 AM
this was mainly to mimic a nested for loops
i know prefect 2 is perfectly suited for this but this is on prefect 1 for now
the goal is to write records in base but verifications have to be done and some elements of each records have to be written in base before those records.
python code should look like this:
records = get_records()
for record in records:
    tags = []
    record_id = get_record_id_if_exist()
    if record_id == None:
        for tag in record["tags"]:
            tag_id = get_record_tag_if_exist()
            if tag_id == None:
                tag_id = create_tag()
            tags.append(tag_id)
        record_id = create_open_data(record['open_data'], tag_list)
    version_id = create_version(record_id, record['version'])
maybe i can do an apply map for the first for loop and to the second one sequentially ?
a

Anna Geller

06/15/2022, 12:42 PM
how much visibility do you need here? if it's enough to see this as one "node" in your workflow graph, you can totally put the entire logic here into a single task without mapping - it depends a lot on your observability needs
k

Kevin Kho

06/15/2022, 12:55 PM
You can create a task that creates all possible combinations and make it a one stage map
f

Florian Guily

06/15/2022, 1:07 PM
if possible i would like to have observability on all of the create functions as they have to be retried if they fail (the write in base)
k

Kevin Kho

06/15/2022, 1:10 PM
Ah let me rephrase. You can create a task that creates all possible combinations and then feed that to apply_map
f

Florian Guily

06/15/2022, 1:49 PM
what do you mean by "all possible combination" ?
k

Kevin Kho

06/15/2022, 2:04 PM
Let me make a sample here
Starting from the Prefect example on
apply_map
, I just add another input called
y
but it doesn’t do anything:
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x,y):
    return x + 1

@task
def negate(x,y):
    return -x

@task
def is_even(x,y):
    return x % 2 == 0

def inc_or_negate(x, y):
    cond = is_even(x,y)
    with case(cond, True):
        res1 = inc(x,y)
    with case(cond, False):
        res2 = negate(x,y)
    return merge(res1, res2)
I believe your goal is like this:
for x in [1,2,3,4]:
    for y in ["A","B","C","D"]:
        inc_or_negate(x, y)
So we can make another task `cross_product`:
@task(nout=2)
def cross_product(x_list, y_list):
    res = []
    for x in x_list:
        for y in y_list:
            res.append((x,y))
    res_x = [_[0] for _ in res]
    res_y = [_[1] for _ in res]
    return res_x, res_y
and then run the Flow:
with Flow("apply-map example") as flow:
    x, y = cross_product(range(4), ["A","B","C", "D"])
    result = apply_map(inc_or_negate, x, y)

flow.run()
So
x
and
y
already represent all the possible combinations
f

Florian Guily

06/15/2022, 2:15 PM
oooh ok i see
k

Kevin Kho

06/15/2022, 2:17 PM
Yeah it becomes a 1 stage mapping problem instead of 2
👍 1
f

Florian Guily

06/15/2022, 2:17 PM
i'll give it a try thanks !
View count: 4