https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Prashob Nair

    11/24/2021, 6:23 PM
    Hello! I'm trying to use
    create_flow_run
    to start a new flow 2 mins after the previous task.I have set the below parameter as follows but the flow run starts immediately instead.Please let me know where I'm going wrong.Thanks!
    scheduled_start_time=pendulum.now().add(minutes=2),
    j
    a
    y
    • 4
    • 20
  • p

    Pedro Machado

    11/24/2021, 8:33 PM
    Hi there. I am working on a flow that needs to query an API endpoint 60k times. The API is slow so I need to use concurrency. I have a python class that uses a
    requests
    session to make the API requests. This class also implements rate limiting. I'd like to confirm that if I use the `LocalDaskExecutor`with threads I can pass a single instance of the class to a mapped task and it effectively rate limit across all mapped tasks. Also, is there a benefit to using a resource manager task to instantiate the class that queries the api?
    a
    • 2
    • 5
  • h

    haf

    11/24/2021, 11:34 PM
    Is there any way to access the Dask client from within Prefect and do very granular parallelism using the dask.{dataframe,delayed} API:s? I have "large ticket" embarrassingly parallel computations that I map with prefect.task, but then inside these tasks, I want to enable dask to work no chunks of data arrays. Of course I'd also like to control this parallelism to some extent or there will be thousands of concurrently running tasks.
    j
    k
    • 3
    • 9
  • a

    aaron

    11/25/2021, 1:15 AM
    are there any plans to add support for secrets management to Orion?
    j
    • 2
    • 1
  • a

    Aaron Ash

    11/25/2021, 1:30 AM
    is it possible to retrieve the
    project
    name of the currently executing flow from the context or somewhere else?
    j
    a
    • 3
    • 5
  • a

    Aaron Ash

    11/25/2021, 1:35 AM
    i'm just starting to look into running dependent flows and am likely going to need to retrieve the project name programmatically
  • m

    Madhavi

    11/25/2021, 3:31 AM
    Hi I am trying to test out orion, getting the following error: sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: flow [SQL: INSERT INTO flow (id, created, updated, name, tags) VALUES (?, ?, ?, ?, ?) ON CONFLICT (name) DO NOTHING] [parameters: ('3120dd0f-fbcc-481e-8597-3b522b697520', '2021-11-25 03:07:12.340375', '2021-11-25 03:07:12.340409', 'main-flow', '[]')] Did I miss any step in the installation? Thanks
    k
    • 2
    • 2
  • u

    夏文思

    11/25/2021, 6:44 AM
    💃 刚刚加入!
    👋 4
  • y

    Yong Tian

    11/25/2021, 8:03 AM
    🎉Glad to Join! Good Job! Prefect Inc.
    👋 3
  • a

    André Petersen

    11/25/2021, 10:08 AM
    Hello again! I want to create a very rough price estimate based on the Prefect Setup described here (https://towardsdatascience.com/how-to-cut-your-aws-ecs-costs-with-fargate-spot-and-prefect-1a1ba5d2e2df) by @Anna Geller. I have not much experience with AWS, yet, which makes it difficult to estimate the price using the price estimator. We will only have a couple of batch jobs running a couple of times a day and the processing will take place exclusively on snowflake, so the prefect flows will only deligate work. Also we would probably use prefect cloud. I would guess that in the price calculator here https://calculator.aws/#/createCalculator/Fargate we could go with Linux OS, 2 tasks/pods running 2 hours on average with 2 vCPU allocated and 4GB memory (minimum) and 20GB of storage (minimum). 1. Do you agree? If not, why? I can not really believe that this would only cost $14 per month. 2. Do I need to calculate additional costs for the ECS tasks? If I understand correctly, these are included in the Fargate price estimation. I know that we could save money by using spot instances. My target is to make a pessimistic/conservative price estimation. Thanks in advance! Really appreciate the help in this channel!
    a
    • 2
    • 10
  • d

    Daniil Ponizov

    11/25/2021, 10:39 AM
    Hi! is it possible to pass some information to the flow run in the UI? Often there is problem, that run depends on current time: for example it downloads data with timestamp equals to previous day and if this run fails and you find later than that day, you have to run the script manually with the appropriate arguments
    a
    a
    • 3
    • 7
  • h

    haf

    11/25/2021, 1:28 PM
    KeyError: 'data'
    — have you seen this error message before?
    a
    • 2
    • 8
  • h

    haf

    11/25/2021, 1:30 PM
    What might be causing Heartbeat failures from Prefect / Dask? How would one go about debugging this?
    a
    k
    • 3
    • 26
  • v

    Vince Bob

    11/25/2021, 2:17 PM
    hello, I am struggling on a great_expectations integration problem. I obviously use RunGreatExpectationsValidation task on a checkpoint I created in GE with:
    validation_task(
    context_root_dir=root_dir,
    checkpoint_name=expectation_checkpoint_name
    )
    When I run the command on GE (great_expectations --V3-api checkpoint run my_checkpoint), it works, but on prefect task, I have an exception: With GE V3 api:
    .....
    for batch in ge_checkpoint["batches"]:
    TypeError: 'Checkpoint' object is not subscriptable
    The same with GE V2 api
    ...
    for batch in ge_checkpoint["batches"]:
    TypeError: 'LegacyCheckpoint' object is not subscriptable
    Great_expectations=0.13.43 (also tried with 0.12.10 version) prefect=0.15.9 Anyone experienced this pb?
    a
    • 2
    • 5
  • e

    Elijah Roussos

    11/25/2021, 3:38 PM
    Hi all! I got a quick question if anyone can answer. We’ve got a couple flows running on ECS with a JSON secret for postgres DB access. I want to be able to test flows locally without deploying to Prefect cloud, but obviously that means setting the secret in
    ~/.prefect/config.toml
    locally. From the docs it seems like you can only set strings in the toml, but I need JSON. I’ve tried setting it as a JSON string and also in toml syntax to no avail. Is there any way to set a local JSON secret?
    a
    • 2
    • 3
  • a

    Adam Everington

    11/25/2021, 3:48 PM
    .map().... is there an upper limit? ...and i'm sure i've asked before, but there's no way of batching it so say like 5 run at a time?
    a
    • 2
    • 1
  • a

    Anh Nguyen

    11/26/2021, 2:36 AM
    Hi all. I cannt run flow . How to fix that ?
    a
    • 2
    • 2
  • b

    Bruno Murino

    11/26/2021, 11:17 AM
    Hello everyone — I’m using Prefect Cloud and I’m trying to get a count of tasks ran this month but the account -> usage view is fully empty for me saying 0 tasks have been run today. We’ve been running tasks in prefect cloud for about a month now so this seems like a bug? Or is there a better way?
    a
    • 2
    • 12
  • j

    John Shearer

    11/26/2021, 12:42 PM
    Hi. I'd like to check the meaning of
    date
    in prefect context. The docs say "an actual datetime object representing the current time". The datetime value appears to be the same value across all tasks within a flow- so I assuem this is actually the start time of the flow? This behaviour is what I want, but want to confirm my assumption.
    a
    • 2
    • 6
  • g

    Giovanni Giacco

    11/26/2021, 1:41 PM
    Hello. From the Prefect Cloud GUI I can change memory/cpu request for a KubernetesRun ? How I can do that when I start a flow from Python? And is there a way to change the executor too? I’d like to change memory/cpu request for Dask Workers pod depending on the effort of the computation requested.
    a
    • 2
    • 3
  • a

    Aleksandr Liadov

    11/26/2021, 2:44 PM
    Hello, Could I change the task name on runtime(The main problem I need to cast dict to BaseModel but I need to keep the parameter name)? I provide the minimal example with flow in comment
    a
    • 2
    • 18
  • p

    Prasanth Kothuri

    11/26/2021, 3:41 PM
    Hi All, I want to schedule a prefect flow every minute and within the flow have a check to determine a file in s3 has changed, if the file is changed a bunch of tasks are executed, otherwise flow exits, for this I need to maintain state across flows, how can I do that, thanks a ton
    a
    a
    • 3
    • 10
  • j

    Jinho Chung

    11/26/2021, 4:46 PM
    I'm new to Prefect, and have been experimenting with running flows on my local machine with a local server as well. So far things have been very intuitive! However I have another nodejs app running locally, and want to try to have that app call the local Prefect server to start a parameterized flow. I can start flows through the UI at localhost:8080 and I've been going through the documentation but things just haven't clicked and I'm not sure how to do this. (Additional disclaimer - I have virtually no experience with GraphQL which probably isn't helping). Thanks for the help and this amazing product!
    a
    • 2
    • 2
  • e

    Erick House

    11/26/2021, 11:48 PM
    Hi all, where do I post potential bugs questions? There is some issue with sqlite creating tables as I go through the basic Orion tutorial.
    a
    • 2
    • 1
  • i

    itay livni

    11/27/2021, 3:48 AM
    Hi - I a couple of questions about orion: 1. How do you use Radar? 2. Can a task be a class? -- Thanks
    a
    • 2
    • 8
  • h

    haf

    11/27/2021, 12:05 PM
    I've managed to get around most of the problems I had with retries and stability on Dask, but this one eludes me. I'm getting the
    KilledWorker
    error which seemingly fails the whole flow. Despite this, the workers are alive and fine (more in thread)
    a
    k
    +2
    • 5
    • 59
  • j

    Jake Watson

    11/27/2021, 3:52 PM
    Hi all, I'm excited for the additional features Orion / 2.0 will bring, though is there a list of what 1.0 features that won't be included in 2.0? The one feature we use most currently in 1.0 that doesn't seem to be implemented in 2.0 yet is flow and task state handlers via the state_handler argument (apologies if it is!).
    a
    • 2
    • 2
  • e

    Erick House

    11/27/2021, 4:29 PM
    from prefect import flow
    
    
    @flow
    def my_favorite_function():
        print("This function doesn't do much")
        return 42
    
    
    print(my_favorite_function())
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: flow [SQL: INSERT INTO flow (id, created, updated, name, tags) VALUES (?, ?, ?, ?, ?) ON CONFLICT (name) DO NOTHING] [parameters: ('f4971b70-0675-41c8-af7b-efcf8e3c2254', '2021-11-27 16:24:21.653684', '2021-11-27 16:24:21.653699', 'my-favorite-function', '[]')] (Background on this error at: https://sqlalche.me/e/14/e3q8) ❯ prefect version 2.0a5 ❯ sqlite3 version SQLite version 3.36.0 2021-06-18 18:58:49
    a
    • 2
    • 2
  • l

    Lana Dann

    11/27/2021, 4:34 PM
    Hi! https://docs.prefect.io/api/latest/run_configs.html#ecsrun Is there any way we can configure
    ECSRun
    to take the most recent (or the only revision) of a
    task_definition_arn
    ? Otherwise we’d have to update and deploy the flow every time we update a task definition which is not ideal
    a
    • 2
    • 5
  • i

    itay livni

    11/27/2021, 9:37 PM
    Hi - I have one task in orion I am trying to run in a flow that returns a pandas dataframe. The function runs correctly without the task decorator. But the function returns a disambiguation error in a flow. Code in thread
    a
    a
    +2
    • 5
    • 19
Powered by Linen
Title
i

itay livni

11/27/2021, 9:37 PM
Hi - I have one task in orion I am trying to run in a flow that returns a pandas dataframe. The function runs correctly without the task decorator. But the function returns a disambiguation error in a flow. Code in thread
@task
def get_oup_deinitions(node_input_dict):
    
    oupDefs = oup.OUPDefinitions("en-us", creds.OUP)
    oup_df = oupDefs.run(node_input=node_input_dict)
    
    return oup_df

@flow
def get_node_resources(node_input_dict):
   
    oup_df = get_oup_deinitions(node_input_dict=node_input_dict)

    return oup_df
This is the traceback:
16:03:19.983 | Flow run 'pumpkin-pony' encountered exception:
Traceback (most recent call last):
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 377, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 48, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(context.run, call, cancellable=True)
  File "py39lmap/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable,
  File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 805, in run_sync_in_worker_thread
    return await future
  File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 743, in run
    result = func(*args)
  File "/egm_make_lmap/extract_resources/get_resources.py", line 83, in get_node_resources
    oup_df = get_oup_deinitions(
  File "py39lmap/lib/python3.9/site-packages/prefect/tasks.py", line 271, in __call__
    return enter_task_run_engine(
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 450, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "py39lmap/lib/python3.9/site-packages/prefect/utilities/asyncio.py", line 59, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "py39lmap/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run
    return asynclib.run_async_from_thread(func, *args)
  File "py39lmap/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 830, in run_async_from_thread
    return f.result()
  File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "py39lmap/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 510, in create_and_submit_task_run
    future = await flow_run_context.executor.submit(
  File "py39lmap/lib/python3.9/site-packages/prefect/executors.py", line 180, in submit
    self._results[task_run.id] = await run_fn(**run_kwargs)
  File "py39lmap/lib/python3.9/site-packages/prefect/client.py", line 59, in wrapper
    return await fn(*args, **kwargs)
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 620, in orchestrate_task_run
    terminal_state = await user_return_value_to_state(
  File "py39lmap/lib/python3.9/site-packages/prefect/engine.py", line 734, in user_return_value_to_state
    if is_state(result) or is_state_iterable(result):
  File "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
    if isinstance(obj, IterableABC) and obj:
  File "py39lmap/lib/python3.9/site-packages/pandas/core/generic.py", line 1537, in __nonzero__
    raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
16:03:19.989 | Shutting down executor `SequentialExecutor`...
16:03:20.066 | Flow run 'pumpkin-pony' finished in state Failed(message='Flow run encountered an exception.', type=FAILED)
Failed(message='Flow run encountered an exception.', type=FAILED)
a

Anna Geller

11/28/2021, 10:49 AM
You got a Pandas error - this may help: https://stackoverflow.com/questions/36921951/truth-value-of-a-series-is-ambiguous-use-a-empty-a-bool-a-item-a-any-o
a

Anurag Bajpai

11/28/2021, 11:45 AM
@Anna Geller This looks like an issue in orion actually:
File "py39lmap/lib/python3.9/site-packages/prefect/orion/states.py", line 17, in is_state_iterable
    if isinstance(obj, IterableABC) and obj:
casting obj, which in this case is a pandas dataframe, to bool causes that error
✅ 1
a

Anna Geller

11/28/2021, 1:06 PM
I see. In that case, it would be good to have a look at the flow code. @itay livni could you share your flow file so that I can replicate the issue?
i

itay livni

11/28/2021, 2:52 PM
Hi @Anna Geller - Below is a reproducible flow that breaks with pandas
from prefect import flow, task

import numpy as np
import pandas as pd

@task
def pandas_task(config: dict):
    df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))
    return df

@flow
def main_flow():
    # do some things
    # then call another flow function
    df = pandas_task(config={})
    # do more things

# run the flow
flow_state = main_flow()
a

Anna Geller

11/28/2021, 3:01 PM
Thanks for the example and for reporting that. Looks like a problem with serialization of task results. I’ll open an issue for it. As a workaround for now, if instead of returning a dataframe, you would return
<http://df.to|df.to>_dict()
, this should work.
i

itay livni

11/28/2021, 3:05 PM
@Anna Geller Thanks. FYI - I am basically porting my prefect core code to orion.
a

Anna Geller

11/28/2021, 3:08 PM
Gotcha. If you do it to test out Orion, then that’s great. But later, we will also provide migration guides and porting Core to Orion will be easier. Thanks again! 👍
i

itay livni

11/28/2021, 3:54 PM
Hi @Anna Geller - Sorry to be a pain here. But can you throw a short example of how you would run a method in a class with orion. I am getting all sorts of errors. FWIW: I dont mind being a guinea pig testing orion
a

Anna Geller

11/28/2021, 4:18 PM
@itay livni can you share those errors? You can call arbitrary code in Orion, if you look for some class scaffold:
from prefect import flow, task


class SomeDataProcessor:
    def __init__(self):
        pass

    def run_etl(self):
        pass


@task
def some_task():
    SomeDataProcessor().run_etl()


@flow
def main():
    some_task()


if __name__ == "__main__":
    main()
i

itay livni

11/28/2021, 4:22 PM
Ah - So you need a helper function with
@task
. I was hoping to get rid of those.
a

Anna Geller

11/28/2021, 4:25 PM
You could call it directly in the flow if you prefer:
@flow
def main():
    SomeDataProcessor().run_etl()
The trade-off here is that the more tasks you have, the more fine-granular visibility you have when something fails.
✔️ 1
m

Michael Adkins

11/28/2021, 5:51 PM
This is definitely an issue with the way we’re checking if the return value is a state
We can fix this by changing our evaluation. I do hate that pandas raises an exception for truth checks though -.-
👍 2
i

itay livni

11/28/2021, 11:34 PM
Hi @Anna Geller - Is there a place to follow orion updates? Basically I want to take up migrating my flow again when the pandas serialization bug is fixed. Here is my unsolicited feedback migrating from prefect core to orion (so far) for a hobby project. 1. Tasks that were previous classes require more code because there is no way to run
SomeClass(Task)
without making a helper function (I could be missing something) 2. I enjoy not having to register flow to get to a UI. 3. Overall less template code i.e. flow.visualize, flow.register ... 4. Radar is cool. I cannot wait to see how the subflows and recursive nature of my flow is represented. 5. The new graph showing flow runs is less intuitive to understand than the current simple bar chart. But I imagine as I get used to it things will change.
:thank-you: 2
👀 3
a

Anna Geller

11/29/2021, 12:08 AM
Thank you so much for your feedback! I will direct it to the team. Regarding updates, we usually post updates about new releases in the announcement channel.
:upvote: 1
n

nicholas

11/29/2021, 8:44 PM
Hi @itay livni - on your flow run graph point; can you expand further on that? I’d be curious to hear how you’re currently interacting with that chart and what functionality you’re expecting/would like to see there. There are some notable differences including grouping flow runs on time buckets for a more consistent experience (e.g. if you’re looking at last month’s runs vs. this month’s runs you should have the same number of bars, whereas in the current chart you couldn’t choose a time interval because it’s based on a fixed number of runs (100 or so))
i

itay livni

11/30/2021, 12:10 AM
Hi @nicholas - Firstly, I use case with prefect is limited to a "couple" of flows for personal use with an eye to production. In short take my use case with a heavy grain of salt. I basically quickly glance at Flow Run Graph (And like how they are not grouped, IMO this can be a view?) -> Look if a flow took longer/ above average,.. to run -> Click on any bar/flow that did not run or is of interest because of time -> Analyze -> Fix/refactor if needed. I think you hit on the head. Not used to looking at flows grouped. Hope this helps. Let me know to if this needs more clarification or you want more feedback on the UI. i.e. Radar takes one to many clicks to get view 🙂
n

nicholas

11/30/2021, 12:12 AM
Thanks @itay livni - this is helpful 🙂
View count: 6