https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Daniel Komisar

    02/16/2022, 8:16 PM
    I am having trouble seeing task runs by task in Prefect Cloud. Here’s an example https://cloud.prefect.io/anaconda/task/b3cfcc12-9ba8-4960-8dee-7c677c23b477?runs, all of these pages that I’ve looked at are empty. I’ve tested this through the API and I can’t get it to return anything when using a
    where
    filter with a task id, but it works when I query by flow run id.
    k
    • 2
    • 20
  • d

    Daniel Komisar

    02/16/2022, 9:23 PM
    is there a way to provide custom names for tasks such as
    case(False)
    ?
    k
    • 2
    • 1
  • f

    Farid

    02/16/2022, 9:32 PM
    Hi Is there anything to consider when saving JSON as a
    Result
    using Prefect? I dump the dict objects to
    str
    JSONs using
    json.dumps
    and then save them using
    S3Result
    or
    LocalResult
    and noticed both of them get some extra characters added to the beginning or end of json object:
    ��QXQ >> to the beginning
    �. >> to the end
    which makes it un-parseable. Saving the same object using
    with open()
    on local machine does not bear those extra characters
    k
    • 2
    • 12
  • k

    Kelly Huang

    02/17/2022, 1:37 AM
    Hi, I'm trying to create a flow that is dependent on 2 other flows. Is it possible (using the imperative API) to add tasks to the final flow?
    k
    • 2
    • 29
  • a

    An Hoang

    02/17/2022, 5:02 AM
    I have a task (
    task_B
    ) downstream of a mapped task (
    task_A
    ). Some of the input for
    task_B
    (output of
    task_A
    ) is
    prefect.engine.signals.SKIP(None)
    . In
    task_B
    , do I have to filter out these skip signals? If output of
    task_B
    is mapped to
    task_C
    ...
    task_X
    , do I have to do
    if isinstance(input, prefect.signals.SKIP): raise prefect.signals.SKIP
    for every single task in the chain?
    k
    • 2
    • 1
  • a

    Ayah Safeen

    02/17/2022, 8:46 AM
    HI all, Hope you all are good and safe, Can I have a realtime task in prefect ETL?? Thanks in advance 🙏😁
    a
    • 2
    • 2
  • d

    damien michelle

    02/17/2022, 10:50 AM
    Hi, I'm new to prefect and I'm facing a TypeError issue when I try to register my prefect workflow to cloud prefect. Does anyone have a solution ? Thanks in advance
    a
    • 2
    • 3
  • d

    Daniel Nilsen

    02/17/2022, 12:47 PM
    I have a task that is returning an object with attribute
    data
    . But when I try to access this in the flow I get
    'FunctionTask' object has no attribute 'data'
    . How do I access the response.data?
    with Flow('data_transformation') as flow:
         response = myTask()
         response.data //error
    a
    • 2
    • 2
  • m

    Matthias Roels

    02/17/2022, 1:07 PM
    I have a question regarding Orion that is confusing me. With Prefect 1.0, we have the concept of flow storage. Is there an equivalent for Orion or how does it work? Say I want to run a flow on k8s, can I just package my flow code in a Docker container along with the required dependencies (maybe using the prefect image as base image)? Thanks for clarifying!
    a
    • 2
    • 5
  • m

    Marcel M

    02/17/2022, 3:16 PM
    I'm new to prefect. Took some time to find out what Orion is. It seems to be a prefect 2.0? Should I, as a beginner go straight to Orion? Or stick to Prefect 1?
    a
    • 2
    • 3
  • m

    Marcel M

    02/17/2022, 3:17 PM
    Also, I would be interested in seeing a comparison of Dagster and Prefect/Orion. Anyone aware of such a comparison.
    a
    • 2
    • 3
  • t

    Tim Enders

    02/17/2022, 4:04 PM
    Dumb question... how do I pass a parameter to
    flow.run()
    ?
    k
    • 2
    • 2
  • l

    Lorenzo Randazzo

    02/17/2022, 4:26 PM
    Hi all! We have an EKS cluster in which we would like to run our flows. We managed to build an architecture purely based on fargate but for our purposes we moved to an hybrid cluster, in which there are both managed nodegroups and a fargate profile. I would like to have a small node (identified by -ns prefect-agent) for my prefect agent which runs jobs on the fargate profile (identified by -ns serverless). I thought it was enough to specify the different namespace in the job_template of the KubernetesRun, by the way the flow run fails once started with a 400 error and the message: "the namespace of the provided object does not match the namespace sent on the request". I cannot find anything useful about running flows in a different namespace from the agent. Does anyone have managed something similar? Thanks in advance
    a
    • 2
    • 3
  • a

    Andrew Lawlor

    02/17/2022, 5:37 PM
    For a KubernetesRun, I set a memory request, but see errors when the job needs more memory. i didnt set a memory_limit. is it possible to have it scale up automatically if I need more memory than the request?
    a
    m
    • 3
    • 11
  • d

    David Yang

    02/17/2022, 6:43 PM
    Hi All, we have three environments: DEV,QA and Production and have corresponding docker images that contain dbt source codes. A prefect flow is used to run these dbt codes. The flow uses dockerrun. but dockerrun requires docker image name and label. and the image name and label are flow metadata. How can I use one flow for all these three environments?
    a
    • 2
    • 5
  • r

    Rio McMahon

    02/17/2022, 8:54 PM
    Hello - my flows on prefect cloud keep failing. The state message for the failed run is
    State Message: {'_schema': 'Invalid data type: None'}
    but I am unclear on what that means. Could you clarify what this might indicate or ways to get more informative error messages? Thanks.
    a
    • 2
    • 8
  • h

    Heeje Cho

    02/17/2022, 9:30 PM
    What are some ways that people execute flows remotely (not from the cloud UI)? If we wanted to execute a flow from a separate application what is the best way? Is there a webhook that we can call? CLI command to the agent?
    a
    l
    • 3
    • 4
  • b

    Billy McMonagle

    02/17/2022, 9:50 PM
    Hi there! I have a question about task inputs and triggers.
    a
    k
    • 3
    • 23
  • l

    Lee Cullen

    02/18/2022, 2:21 AM
    Hi all! I keep running into an issue when registering a flow to prefect cloud. I can successfully do it when running via CLI but when I switch to registering through flow.register() i get
    Invalid Project!
    even though the project exists.
    k
    • 2
    • 18
  • r

    Romain

    02/18/2022, 7:21 AM
    Hello folks, Is it possible to have a run with some tasks with status TriggerFailed, while there is no upstream Failed tasks? See screenshot below:
    n
    a
    • 3
    • 13
  • d

    Daniel Nilsen

    02/18/2022, 10:12 AM
    how do I make one task run after the other even though they do not have any edges between them?
    a
    • 2
    • 2
  • m

    Michael Hadorn

    02/18/2022, 10:47 AM
    Where and how do we store our applications settings in Orion?
    a
    • 2
    • 2
  • b

    Bogdan Bliznyuk

    02/18/2022, 11:50 AM
    Hi all! We have an issue with the prefect docker agent. We're using Azure and ACR with a system-managed identity assigned to vm. In order to login to acr we run every 3 hrs, there's a systemd timer for that:
    az acr login
    But, it seems that prefect docker agent only reads the token during start and stores it in memory. Unless we restart the prefect docker agent, it is unable to pull docker image flows after 3hrs (the acr token's expired)
    a
    m
    • 3
    • 26
  • i

    iñigo

    02/18/2022, 1:57 PM
    Hello, I've been working with prefect the last months to see if it is able to replace some of our scripts I'm having an issue with the use of a prefect task built in. Postgres. The flow that I'm trying to achieve it's pretty simple, we just need to execute a query to gather some data ND then transform it and send it to another db. I'm not having a problem using the built in task but trying to connect with the rest of them as they don't appear in the UI as task... Thank you
    a
    • 2
    • 11
  • a

    Andrew Black

    02/18/2022, 2:01 PM
    Re:dbt features — Hi everyone, I look after Prefect’s partnerships and we're collaborating with dbt on what we should enable together. What would you like to see?
    :dbt: 4
    :upvote: 2
    n
    j
    m
    • 4
    • 14
  • c

    Christopher Schmitt

    02/18/2022, 3:09 PM
    Hi Prefect community, I recently started to use Prefect and so far I mostly like it. Sometimes I can't wrap my head around how to apply the Prefect concepts. As in this problem I'm having now: I want to write a script that queries database tables and writes the data to a blob storage. The tables have to be queried with a number of different parameters (dates). So what I need is a product of tables x dates. I solved this by using a task named "create_input_tuples" that maps on the list of dates and gets the unmapped(tables) as an argument, too. Inside the create_input_tuples I loop over the tables and return a list of tuples (for each date) which I flatten then. Question 1: Is this the best approach or are there other ways to achieve that? After the queries have run, I filter for empty tables (which can happen) with a FilterTask and want to write the data from these tables to the blob. For this I need the table_name and date (for the naming). Question 2: Is there a way to keep track of the names and dates (also taking into account the filtering)? Usually I had a task returning a list of tables, to which I mapped the querying task, to which I mapped the load_blob task and the list of tables from the beginning. Without the filter this has always worked since the table_names and the tasks with the data from database aligned and could be mapped to the blob task. To make it a little clearer (I hope) I drew this picture, I wasn't sure if the Prefect visualization was really helping. Maybe you could help me find the prefect solution for this! Christopher
    k
    • 2
    • 8
  • p

    Pedro Machado

    02/18/2022, 4:00 PM
    Hi there. Could someone help me understand why a flow (using Docker storage) is registered with a new version every time I run
    prefect register
    even though the code has not changed? The only thing I see that could change is the Docker tag which I am not setting explicitly. Would this cause it to increment the version number? Docker build log in thread.
    k
    • 2
    • 3
  • o

    Olivér Atanaszov

    02/18/2022, 4:06 PM
    Hi, I'm having the probably infamous issue with task slugs:
    [2022-02-17 12:29:19+0000] INFO - prefect.wait_for_flow_run | Flow 'affable-piculet-flow-build│
    -dataset': Entered state <Failed>: Unexpected error while running flow: KeyError('Task slug do_someting-1 is not found in the current Flow. This is usually caused by a mismatch between th│
    e flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did│
     you change the flow without re-registering it?\n- Did you register the flow without updating │
    it in your storage location (if applicable)?')                                                │
    when trying to run a flow of flows like this:
    from flows.a import flow as flow_a   # flow's name is "flow-a"
    from flows.b import flow as flow_b   # flow's name is "flow-b"
    
    kwargs = {
    "project_name": "foo",
    "labels": ["test"]
    }
    
    flow_a.register(**kwargs)
    flow_b.register(**kwargs)
    
    with Flow("flow-of-flows", run_config=run_config, storage=storage, result=result) as flow:
    
       run_id_flow_a = create_flow_run(flow_name="flow-a", **kwargs)
       wait_for_flow_a = wait_for_flow_run(run_id_flow_a, raise_final_state=True)
       output_a = get_task_run_result(run_id_flow_a, task_slug="do_something-1")
       
       run_id_flow_b = create_flow_run(flow_name="flow-b", parameters={"input": output_a}, **kwargs)
       run_id_flow_b.set_upstream(wait_for_flow_a)
    
    flow.register(**kwargs)
    @George Coyne
    k
    g
    • 3
    • 3
  • d

    Donnchadh McAuliffe

    02/18/2022, 4:08 PM
    Hi there. I am new to Prefect Orion. I am trying to schedule a flow run on my local Prefect server following the example POST request provided in the documentation. Here is my request:
    POST: <http://localhost:4200/api/flow_runs/>
    
    PAYLOAD:
    {
      "name": "my_scheduled_flow",
      "deployment_id": "0ff88fc8-a71c-4b71-b8dd-078d6a36fb39",
      "flow_id": "94374bea-70ae-4e49-8988-e9077daa280a",
      "flow_version": "1.0",
      "state": {
        "type": "SCHEDULED",
        "name": "my_scheduled_flow",
        "message": "a message",
        "state_details": {
            "scheduled_time": "2022-02-18T16:10:49.005Z"
        }
      }
    }
    The flow run then appears in the server UI (as yellow) but never gets ran at the correct time. Does anyone have any idea on this? Am I missing an important part of the payload.
    k
    a
    m
    • 4
    • 30
  • d

    David Yang

    02/18/2022, 4:27 PM
    Anyone got this error today? Traceback (most recent call last): File "/usr/local/bin/dbt", line 5, in <module> from dbt.main import main File "/usr/local/lib/python3.9/site-packages/dbt/main.py", line 11, in <module> import dbt.version File "/usr/local/lib/python3.9/site-packages/dbt/version.py", line 11, in <module> import dbt.semver File "/usr/local/lib/python3.9/site-packages/dbt/semver.py", line 8, in <module> import dbt.utils File "/usr/local/lib/python3.9/site-packages/dbt/utils.py", line 9, in <module> import jinja2 File "/usr/local/lib/python3.9/site-packages/jinja2/__init__.py", line 12, in <module> from .environment import Environment File "/usr/local/lib/python3.9/site-packages/jinja2/environment.py", line 25, in <module> from .defaults import BLOCK_END_STRING File "/usr/local/lib/python3.9/site-packages/jinja2/defaults.py", line 3, in <module> from .filters import FILTERS as DEFAULT_FILTERS # noqa: F401 File "/usr/local/lib/python3.9/site-packages/jinja2/filters.py", line 13, in <module> from markupsafe import soft_unicode ImportError: cannot import name 'soft_unicode' from 'markupsafe' (/usr/local/lib/python3.9/site-packages/markupsafe/__init__.py) It seems that we have to downgrade markupsafe: Downgrading markupsafe to 2.0.1 fixes the issue on my side.
    pip install markupsafe==2.0.1
    k
    m
    • 3
    • 13
Powered by Linen
Title
d

David Yang

02/18/2022, 4:27 PM
Anyone got this error today? Traceback (most recent call last): File "/usr/local/bin/dbt", line 5, in <module> from dbt.main import main File "/usr/local/lib/python3.9/site-packages/dbt/main.py", line 11, in <module> import dbt.version File "/usr/local/lib/python3.9/site-packages/dbt/version.py", line 11, in <module> import dbt.semver File "/usr/local/lib/python3.9/site-packages/dbt/semver.py", line 8, in <module> import dbt.utils File "/usr/local/lib/python3.9/site-packages/dbt/utils.py", line 9, in <module> import jinja2 File "/usr/local/lib/python3.9/site-packages/jinja2/__init__.py", line 12, in <module> from .environment import Environment File "/usr/local/lib/python3.9/site-packages/jinja2/environment.py", line 25, in <module> from .defaults import BLOCK_END_STRING File "/usr/local/lib/python3.9/site-packages/jinja2/defaults.py", line 3, in <module> from .filters import FILTERS as DEFAULT_FILTERS # noqa: F401 File "/usr/local/lib/python3.9/site-packages/jinja2/filters.py", line 13, in <module> from markupsafe import soft_unicode ImportError: cannot import name 'soft_unicode' from 'markupsafe' (/usr/local/lib/python3.9/site-packages/markupsafe/__init__.py) It seems that we have to downgrade markupsafe: Downgrading markupsafe to 2.0.1 fixes the issue on my side.
pip install markupsafe==2.0.1
k

Kevin Kho

02/18/2022, 4:28 PM
Haven’t seen it..looks like a dbt dependency version conflict right?
d

David Yang

02/18/2022, 4:29 PM
Yes. It just happened today.
k

Kevin Kho

02/18/2022, 4:30 PM
Gotcha that might be word reporting on their discourse if you feel like it
I suspect more people will run into it
d

David Yang

02/18/2022, 4:30 PM
yeah... seems it's dbt problem. not prefrect
m

Matthias

02/18/2022, 4:33 PM
Strange, in the current version of dbt the markupsafe package is pinned to the one you downgraded to: https://github.com/dbt-labs/dbt-core/blob/main/core/setup.py#L55
d

David Yang

02/18/2022, 6:19 PM
upgrading dbt to 1.0.2 could solve the issue... I'll let your guys know if it works.
m

Matthias

02/18/2022, 6:31 PM
Which version did you use before?
d

David Yang

02/18/2022, 6:32 PM
1.0.1
m

Matthias

02/18/2022, 6:50 PM
I see, in that version there is no explicit version of markupsafe specified. This is a dependency of Jina2, but set very broadly e.g
>0.23
so that’s why you explicitly had to downgrade it. Am alternative fix is to pin the version in the requirements of your application and use dbt 1.0.1.
Still think it is worth mentioning on their discourse/slack channel though… seems the release of the latest markupsafe is only 16hrs old so that’s why it was never a problem before…
💯 1
d

David Yang

02/18/2022, 8:19 PM
After I upgrade dbt to 1.0.2, the error is gone. Upgrading dbt or downgrading markupsafe with dbt version earlier than 1.0.2 works.
m

Matthias

02/18/2022, 8:26 PM
Indeed, in version 1.0.2, the markupsafe version is pinned to 2.0.1!
View count: 3