https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Rafael Sá

    03/15/2022, 6:03 PM
    Hi Everyone, I'm new to prefect and I'm running into the following issue. I defined a task and inside a flow block I call this task. It happens that I expect the return of this task to be a list of urls(variable "extratos"), but don't know why I'm getting the following error message when "len(arquivos_extrato" is run: "object of type 'GetItem' has no len()". sharepoint4 is a customized class that i have built and the function listar_arquivos() is part of this class. My code is something like this: Can someone help me to understand why am I getting a GetItem object and not a list ?
    ✅ 1
    👋 2
    c
    k
    • 3
    • 14
  • s

    Sarah Floris

    03/15/2022, 6:27 PM
    Okay so I have my container built on azure container registry, but Docker() storage refuses to connect. How do I choose the container in Azure Container registry?
    k
    a
    • 3
    • 16
  • r

    Rafael Sá

    03/15/2022, 8:16 PM
    Hi folks, is it possible to call a task inside another task? Something like this: def function_a(): c = a + b return c task def do_something(): y = function(a) return y
    k
    • 2
    • 7
  • s

    Sarah Floris

    03/15/2022, 8:38 PM
    so I have my flow in one file and then the tasks in another file. I finally got them to play nicely with Docker storage, but now in the RUN python /opt/prefect/healthcheck.py, it can't find the file that is in the same folder as the flow.
    System Version check: OK
    /opt/prefect/healthcheck.py:130: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/storage.html#docker>
      flows = cloudpickle_deserialization_check(flow_file_paths)
    Traceback (most recent call last):
      File "/opt/prefect/healthcheck.py", line 130, in <module>
        flows = cloudpickle_deserialization_check(flow_file_paths)
      File "/opt/prefect/healthcheck.py", line 43, in cloudpickle_deserialization_check
        flows.append(cloudpickle.loads(flow_bytes))
    ModuleNotFoundError: No module named "Module"
    k
    • 2
    • 9
  • b

    Brad

    03/15/2022, 9:04 PM
    hey team - orion 2.0b1 is throwing some typer errors on success (I can see they are succeeding but it looks potentially misleading)
    Set default storage to 'local'.
    Traceback (most recent call last):
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/base.py", line 58, in wrapper
        return fn(*args, **kwargs)
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 120, in wrapper
        return run_async_in_new_loop(async_fn, *args, **kwargs)
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 67, in run_async_in_new_loop
        return anyio.run(partial(__fn, *args, **kwargs))
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 56, in run
        return asynclib.run(func, *args, **backend_options)
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 233, in run
        return native_run(wrapper(), debug=debug)
      File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
        return loop.run_until_complete(main)
      File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
        return future.result()
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
        return await func(*args)
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/storage.py", line 136, in create
        exit_with_success(f"Set default storage to {name!r}.")
      File "/home/brad/.local/pipx/venvs/prefect/lib/python3.9/site-packages/prefect/cli/base.py", line 193, in exit_with_success
        raise typer.Exit(0)
    click.exceptions.Exit: 0
    An exception occurred.
    :thank-you: 3
    a
    d
    +2
    • 5
    • 33
  • d

    Darshan

    03/15/2022, 9:33 PM
    Orion Question - is there a way to see flow schematic in Orion UI outside of flow run like Prefect 1.0 ?
    k
    m
    • 3
    • 4
  • b

    Brad

    03/15/2022, 10:11 PM
    Another orion question; I'm trying to run a simple flow with an existing dask SSH cluster and its failing on trying to open the orion sqlite db on the remote host
    a
    • 2
    • 3
  • n

    Nitin Bansal

    03/16/2022, 2:34 AM
    Has anybody consume Prefect api in C# application?
    k
    • 2
    • 2
  • d

    davzucky

    03/16/2022, 6:21 AM
    How can I setup in Orion the task runner at deployment? I want to be able to change the mode between local dev and prod. I can only see how to do that for flow runners at the deployment level
    👀 2
    a
    m
    m
    • 4
    • 16
  • s

    Serge Tarkovski

    03/16/2022, 8:45 AM
    hi all, maybe a newbie Orion question - how can I use more than one Python file to describe a flow with multiple tasks (we've got tons of internal ML libraries)? apart from putting all those in a Docker image, I haven't found a way for now
    a
    m
    • 3
    • 14
  • m

    Muddassir Shaikh

    03/16/2022, 9:16 AM
    Can we schedule each task at a different time inside a single flow? Example: a Flow is cronned for 3pm and it has TASK A, B, C. Task A should start at 3 pm but Task B and Task C should start at 4:30 pm and 5pm respectively.
    s
    a
    k
    • 4
    • 3
  • j

    Junhyun Park

    03/16/2022, 10:53 AM
    Hello, I have some questions. Please look at my figures. I understood Prefect works like figure 1(1 Pod or container for Agent) with autoscailing by Dask. Did I understand correctly? Plus, is there a way to use Prefect like figure 2(N Pods for flow)
    a
    • 2
    • 1
  • v

    Vadym Dytyniak

    03/16/2022, 11:26 AM
    Hello. Is it possible using prefect.Client or graphql get upstream task_run_ids by task_run_id?
    a
    • 2
    • 6
  • c

    Chris Reuter

    03/16/2022, 12:26 PM
    Hey all! Quick reminder that PrefectLive is happening today at 3p Eastern with Chris White. Hope to see you on Twitch! https://prefect-community.slack.com/archives/C036FRC4KMW/p1647289203959699
    ❤️ 2
    🚀 2
  • n

    Noam polak

    03/16/2022, 12:35 PM
    Hey All I have a flow with "*state_handlers*" to slack and some api. Regarding that I have 2 questions: 1. When I change the flow's state from the Ui to Failed or Cancelled , I don't get any message from the handler , it's like it doesn't trigger the state handlers, Am I correct? 2. I want to create API call to the prefect server to change state or cancel flow_run, is it supposed to trigger the state handlers?
    k
    • 2
    • 20
  • a

    Azer Rustamov

    03/16/2022, 12:36 PM
    Hello everyone! I have a question about Orion s3 storage. When I created a test deployment, a file was created on my bucket, but when I tried to run the deployment, I got this error(in the thread) Of note: I use corporate AWS account and do not have full S3 Access, could this be the core of the issue?
    a
    • 2
    • 5
  • b

    Brett Naul

    03/16/2022, 2:11 PM
    we just (yesterday?) started running into a cloudpickle error when starting flow runs:
    prefect.engine.signals.FAIL: Flow finished in state <Failed: "Failed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n  AttributeError("\'Comment\' object has no attribute \'_end\'")')">
    the flows are stored in GCS, python is 3.9.10 both locally (MacOS) and remote (docker+k8s). same prefect+cloudpickle versions. I've had the same thing happen before when python versions don't match but bumping has always fixed it, not really sure what else to look into now. any suggestions...?
    k
    a
    • 3
    • 8
  • a

    Adam Roderick

    03/16/2022, 2:25 PM
    Can I reuse a single Docker storage object for multiple registered flows?
    a
    k
    • 3
    • 14
  • b

    Bradley Hurley

    03/16/2022, 2:43 PM
    Hi Prefect Experts - Is it possible to not display generated tasks like
    List
    and
    Dict
    from the visual schematic displayed in the UI?
    k
    a
    +2
    • 5
    • 15
  • c

    Chris Reuter

    03/16/2022, 3:00 PM
    Highlighting this morning's news here - today we announced Club :42:, our ambassador program! Some of our most experienced and dedicated community members have come together to help the community grow and thrive. 🥂 Here's to the 😛refect: community! https://prefect-community.slack.com/archives/CKNSX5WG3/p1647439185450549
    :marvin: 3
    :42: 3
  • j

    Juan David Barreto

    03/16/2022, 3:19 PM
    Hi, I'm trying to run DataBricks Tasks using a docker agent. However the flow is failing when importting de DB tasks. When writting from prefect.tasks.databricks.databricks_get_job_id import DatabricksGetJobID, it fails with: 'ModuleNotFoundError: No module named 'pydantic''. I'm using the latest-python3.8 docker image
    k
    • 2
    • 3
  • a

    Arun Giridharan

    03/16/2022, 3:48 PM
    Hi, I have a flows.json that contains the metadata for a flow. I want to register this flow inside of another prefect flow. Is that possible?
    k
    • 2
    • 1
  • r

    Rajan Subramanian

    03/16/2022, 6:36 PM
    hello, i am having issues with installing prefect 2.0 in my aws. I have a ubuntu 18.0 running which comes with a older version of sqlite3. Prefect 2.0 uses version greater than 3.24. is there anyway i can not install the sqlite3 depdency when installing prefect? @Kevin Kho @Anna Geller
    k
    m
    a
    • 4
    • 166
  • c

    Chris Reuter

    03/16/2022, 6:48 PM
    Hi all! 👋 Sorry for the late cancellation but we had to move PrefectLive to tomorrow at 2p Eastern as @Chris White had some issues come up. Apologies again, I know you probably had your popcorn popped but we'll see you LIVE 📺 in under 24 hours.
  • e

    E Li

    03/16/2022, 8:13 PM
    Hi, I am running on prefect UI and getting this attribute error: can't get attribute 'get_target_name" on <module prefect_scripts.tasks.trade_cache_tasks'. the trade_cache_task is like below, similar structure works fine on other scripts, anyone has any ideas?
    def get_target_name(x, **kwargs):
    	…
    	return target_name
    
    def get_task_name(x, **kwargs):
    	…
    	return task_name
    
    @task(task_run_name=get_task_name,target=get_target_name,checkpoint=True,result=LocalResult())
    def task_a(x, y, z):
    	…
    	return …
    k
    a
    • 3
    • 55
  • j

    Jared Robbins

    03/16/2022, 8:47 PM
    Want to check if what I read about orion is accurate... Right now my team runs a bunch of tasks via cron jobs. Would it be possible to start with 2.0 by only using the scheduler aspect? It'd be cool to have a UI for what has run. edit: I don't have experience with 1.0 or 2.0
    j
    • 2
    • 4
  • d

    Darshan

    03/16/2022, 10:13 PM
    Hello, signed up for cloud beta but now stuck on the new workspace page. When I am trying to create a new workspace, everything is greyed out. Do I need to complete any additional steps to make this work ?
    a
    k
    • 3
    • 7
  • d

    dherincx

    03/16/2022, 10:21 PM
    I've built a flow_of_flows orchestration for my prefect project. Is it possible to see the task dependencies under each nested flow in the UI schematic? Right now I just see 2 flows, but I can't seem to drill down further into each flow
    k
    a
    • 3
    • 6
  • s

    Shaoyi Zhang

    03/16/2022, 11:56 PM
    Hi! For Kubernetes agent, is there a way to set environment variables in the Kubernetes jobs that are spun up by prefect agent pod?
    k
    • 2
    • 6
  • d

    dherincx

    03/17/2022, 2:03 AM
    So I have an ETL flow that I'm executing (shown below). There is a task named
    execute_ddls
    that only executes if
    new_ddl_exist
    is True. When
    execute_ddls
    is skipped, all my downstream dbt tasks are skipped but regardless if the case statement is entered, I want all downstream tasks to run. I tried
    skip_on_upstream_skip = False
    on the dbtShellTask but it doesn't work. I'm sure I'm missing something so trivial...
    with Flow('bi_test_flow') as flow:
    
        # new DDLs (if any)
        ddls = new_ddls_to_run(loaded_files, os.listdir(DDL_PATH))
    
        # # # execute new DDLs ONLY if they exist
        new_ddl_exist = do_new_ddl_scripts_exist(ddls)
        with case(new_ddl_exist, True):
            execute_ddls = execute_sql(ddls)
    
        dbt = dbt(
            command="dbt run -m anlyz_base.views",
            upstream_tasks=[execute_ddls],
        )
        dbt_operations = dbt(
            command="dbt run-operation materialize_views"
        )
    k
    • 2
    • 5
Powered by Linen
Title
d

dherincx

03/17/2022, 2:03 AM
So I have an ETL flow that I'm executing (shown below). There is a task named
execute_ddls
that only executes if
new_ddl_exist
is True. When
execute_ddls
is skipped, all my downstream dbt tasks are skipped but regardless if the case statement is entered, I want all downstream tasks to run. I tried
skip_on_upstream_skip = False
on the dbtShellTask but it doesn't work. I'm sure I'm missing something so trivial...
with Flow('bi_test_flow') as flow:

    # new DDLs (if any)
    ddls = new_ddls_to_run(loaded_files, os.listdir(DDL_PATH))

    # # # execute new DDLs ONLY if they exist
    new_ddl_exist = do_new_ddl_scripts_exist(ddls)
    with case(new_ddl_exist, True):
        execute_ddls = execute_sql(ddls)

    dbt = dbt(
        command="dbt run -m anlyz_base.views",
        upstream_tasks=[execute_ddls],
    )
    dbt_operations = dbt(
        command="dbt run-operation materialize_views"
    )
k

Kevin Kho

03/17/2022, 2:21 AM
This happens because the mechanism for skipping a task is propagating a skip signal so
execute_ddls
raises SKIP and then dbt raises SKIP. To avoid that, set dbt’s trigger to always_run. You can find more on triggers here
d

dherincx

03/17/2022, 2:43 AM
Hey Kevin when I try this, I get an
unexpected keyword argument: trigger
. I understand the propogration, but not sure why the trigger parameter isn't recognized here, despite inheriting from the Task class
dbt_operations = dbt(
        command="dbt run-operation materialize_views",
        trigger=all_successful
    )
k

Kevin Kho

03/17/2022, 2:53 AM
The trigger needs to be defined at the task definition
d

dherincx

03/17/2022, 2:55 AM
Oh gosh, duh.....my gosh...it's been a long day...of course....
Thanks Kevin!
View count: 4