https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Christopher Stokes

    09/27/2019, 8:40 PM
    is that intended? if so, is there a way to skip the SKIP signals?
    c
    m
    • 3
    • 9
  • t

    Tobias Schmidt

    09/28/2019, 1:57 PM
    I feel like this must be pretty basic: Is there a way to combine/string together multiple flows?
    z
    d
    +3
    • 6
    • 11
  • j

    Jeff Yun

    09/30/2019, 6:27 PM
    How do I map over the product of two parameters? I consulted the docs and issues (notably https://github.com/PrefectHQ/prefect/issues/1311 and https://github.com/PrefectHQ/prefect/issues/674), but still couldn't figure it out. Some attempts: 1) Passing a task into a "compose" task as a parameter --> result = compose.map(ids, unmapped(generate_task), unmapped(dates)) --> got
    TypeError: generate_task() missing 2 required positional arguments:
    2) Calling a known task from a "compose" task --> got
    ValueError: Could not infer an active Flow context.
    3) Typecasting parameter to list --> Parameter is not iterable
    Untitled.py
  • j

    Jeff Yun

    09/30/2019, 6:47 PM
    Related to my above question, how do I access the value of a parameter? For example, I want to use
    flattened_lists = itertools.chain(*nested_lists_param)
    but can't iterate through the Parameter itself.
    c
    • 2
    • 5
  • j

    Jeff Yun

    09/30/2019, 11:15 PM
    Dask question here: I am new to distributed systems, and want to run a large number of tasks (N = 10k+) in parallel. Afaik, Prefect+Dask is scalable to 10k+ parallel tasks (although I know it's ideal to batch many small tasks to few longer-running tasks if possible). However, trying different small toy tasks (and various combinations of
    --nprocs
    --nthreads
    on workers), it seems that consistently - Running Client() locally starts immediately, as expected. - Running with one worker server, starting takes much longer as N increases For large N:
    [2019-09-30 22:59:15,781] INFO - prefect.TaskRunner | Task 'stage_0': Starting task run...
    distributed.utils_perf - INFO - full garbage collection released 561.93 MB from 0 reference cycles (threshold: 10.00 MB)
    distributed.core - INFO - Event loop was unresponsive in Worker for 34.31s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
    - Running on multiple servers, the Dask scheduler also takes much way longer than the expected ~1ms overhead/task, before any activity happens (on the client servers or the Dask scheduler dashboard). Why is scheduling taking so long? How could I adjust scheduling policy (https://distributed.dask.org/en/latest/scheduling-policies.html) to speed up running large number of small tasks?
    c
    m
    +2
    • 5
    • 60
  • g

    Gregor Müllegger

    10/02/2019, 8:18 AM
    Hi folks, I'm trying Prefect the first time and having a few questions regarding best practices that I didn't get a hold of in the documentation. 1. Logging: I'm extracting ~10k IDs out of a database and return that as a list of a task. The next task will map over it and continue working on them. However that list is printed in the logs completely, but I don't see value for that big chunk of log. Are there ways of disabling printing the result of a task in the logs for a single task without disabling logging in general? Or am I misusing tasks and should better return a generator instead of a list? 2. How to integrate a Prefect Flow into a CLI? I currently have a Python CLI tool that takes a database connection string (where to get the IDs from mentioned above) and processes this data with a plain loop and want to migrate that to Flow. How do I now call the flow in the CLI script in order to make the CLI script behave as a CLI should, most importantly the a exit code should be not 0 when the flow state is not successfull. Is there a guide for best practices of using prefect in CLIs?
    e
    j
    • 3
    • 3
  • g

    Garrett Thomas

    10/02/2019, 11:17 AM
    Hi guys! The project looks great so far and me and my team are investigating the possibility of using it instead of airflow. I looked through the docs and skimmed through the code and code not find any authentication/access management like the kind that airflow has in prefect-core. Is this correct? and is there any plan to implement in the open sourced project? Thanks a lot for your time and I am excited to see how the project evolves!
    c
    • 2
    • 1
  • j

    Jeff Yun

    10/02/2019, 8:40 PM
    If a Parameter consists of e.g. a list of length 100k, at what stage and how may it be batched with dask.Bag?
    c
    • 2
    • 29
  • e

    emre

    10/03/2019, 10:53 AM
    Hey all, I have been using local secrets in my production flows for a while. I want to use some other key-value store for my secrets, as it is the secure and intended way. I haven’t found much in the docs, how would I be telling Prefect Core where and how it should access this new
    Secrets
    store, say AWS Secrets Manager
    j
    r
    • 3
    • 5
  • r

    Rui Loureiro

    10/03/2019, 6:16 PM
    Hey all, is there any way for a Task to access the result of another Task (in the same flow) aside from passing the upstream task to the downstream task's kwargs?
    c
    j
    d
    • 4
    • 8
  • j

    Jeff Yun

    10/04/2019, 12:04 AM
    Is there a way to track data dependencies across nodes per a data lineage graph?
    c
    • 2
    • 1
  • m

    Mitchell Bregman

    10/04/2019, 1:44 PM
    hey guys, absolutely love the work that you are doing... huge fan of Prefect, i see it being the future of workflow engines... quick question - are you guys at all considering the implementation of a CLI?
    :marvin: 2
    👍 1
    d
    • 2
    • 6
  • a

    Aiden Price

    10/05/2019, 10:47 AM
    Hi again everyone, our company has our own logger class (based on the excellent Loguru) which we use as a drop in replacement for the standard logger. Is there a way to use it in Prefect? Can I replace Prefect’s logger with ours? Or redirect Prefect’s logger to ours as a handler? Thank you again all.
    c
    • 2
    • 1
  • j

    Jason

    10/05/2019, 3:09 PM
    How does Prefect compare to Kubeflow?
    c
    • 2
    • 7
  • j

    Jason

    10/05/2019, 3:22 PM
    also, does Prefect have an extension point to build custom Tasks?
    c
    a
    • 3
    • 2
  • a

    Alex Cano

    10/05/2019, 9:45 PM
    Just saw the announcement of the Prefect scheduler! That is awesome! Congrats on getting that out! Just so I’m understanding it, is the main use of it as a “trial” type of service of the Cloud offering? Or would it be used to help roll your own infra and work with Prefect? (Tried to read through as much as I could, site isn’t very mobile friendly)
    👊 1
    :marvin: 1
    j
    • 2
    • 28
  • i

    itay livni

    10/07/2019, 2:24 PM
    Hi - I ran into an error with the visualizer python 3.7.4 The
    flow
    did run correctly? any suggestions?
    visualizer eror
    c
    • 2
    • 14
  • c

    Chris Eberly

    10/08/2019, 12:55 PM
    hello prefect, long time listener first time caller, is somebody working on Azure tasks that wants to answer a few questions
    j
    • 2
    • 1
  • j

    Julio Azeredo

    10/08/2019, 3:45 PM
    There is a way to append one Flow to another? My problem is the following: I have a flow for computing data and a flow for computing data and plotting it. I'm wondering if there is a way to do this without creating two flows with repeated code. Thank you!
    c
    t
    • 3
    • 4
  • c

    Chris Eberly

    10/08/2019, 5:07 PM
    last question, and apologies if this has been answered. if folks have an existing Dask-ified task but want to use other parts of Prefect with the DaskExecutor, is it recommended to rewrite the existing task using just prefect or simply run the old task separately
    c
    • 2
    • 2
  • a

    Argemiro Neto

    10/08/2019, 5:32 PM
    Hello team! I'm facing a weird error that I'm not sure if related to Prefect or not: I'm invoking a lambda inside a task and getting the following error:
    [2019-10-08 17:26:55,377] ERROR - prefect.TaskRunner | Unexpected error: TooManyRequestsException('An error occurred (TooManyRequestsException) when calling the Invoke operation (reached max retries: 4): Rate Exceeded.')
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 869, in get_task_run_state
        self.task.run, timeout=self.task.timeout, **raw_inputs
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 79, in timeout_handler
        return fn(*args, **kwargs)
      File "/Users/aneto/Documents/github/data-platform/scheduler/main.py", line 244, in run_sync_loads
        Payload=json.dumps(load)
      File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 357, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 661, in _make_api_call
        raise error_class(parsed_response, operation_name)
    botocore.errorfactory.TooManyRequestsException: An error occurred (TooManyRequestsException) when calling the Invoke operation (reached max retries: 4): Rate Exceeded.
    The function has a 15min timeout and it is being called as:
    client = boto3.client('lambda')
    sync_results = client.invoke(
                FunctionName='myfunction',
                Payload=json.dumps(load)
            )
    The code following this invocation is not being called. I noticed that even before the Prefect retry the function is being called more than once.
    c
    z
    • 3
    • 4
  • a

    Aiden Price

    10/09/2019, 9:43 AM
    Hi Prefects, I’ve run into a problem when trying to
    merge
    two conditional branches back together, if the result of the previous step was a Pandas dataframe you get these errors;
    [2019-10-09 09:27:02,948] INFO - prefect.TaskRunner | Task 'Merge': Starting task run...
    [2019-10-09 09:27:02,961] ERROR - prefect.TaskRunner | Unexpected error: ValueError('The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().')
    ...
    [2019-10-09 09:27:02,965] INFO - prefect.TaskRunner | Task 'Merge': finished task run for task with final state: 'Failed'
  • a

    Aiden Price

    10/09/2019, 9:44 AM
    Presumably this is because of the
    Merge
    Task comparing the result to the
    NoResult
    object.
  • a

    Aiden Price

    10/09/2019, 9:45 AM
    Is there something I can do instead of
    merge
    , perhaps my own task to determine which of a variadic number of results I want to pass on?
    c
    • 2
    • 4
  • i

    itay livni

    10/10/2019, 8:50 PM
    Hi - Is there a way to
    pass
    on a if else branch?
    j
    • 2
    • 11
  • t

    Tobias Schmidt

    10/11/2019, 7:16 AM
    Perhaps I’m just a bit dense but as per the docs JinjaTemplate() should take **kwargs (https://docs.prefect.io/api/unreleased/tasks/strings.html#jinjatemplate). Yet when I pass it keyword arguments it complains about super().__init__() getting unexpected keyword arguments. Bug?
    e
    • 2
    • 9
  • j

    Joe Howarth

    10/13/2019, 2:59 PM
    Hi! Is there a way to use checkpointing as a persistent cache? It doesn't seem to load the checkpointed file by default
    c
    • 2
    • 14
  • b

    Brad

    10/14/2019, 8:44 PM
    Hi prefect team, with the slack notifier is it possible to get the parameters that the task was called with passed through also?
    d
    • 2
    • 4
  • m

    matta

    10/15/2019, 1:01 AM
    Heya. Project looks awesome. I've been messing with RAPIDS, which is also kinda part of the Daskverse (basically a way to painlessly use GPUs from the Python data stack). Would Prefect be able to easily interface with it? https://rapids.ai/dask.html
    c
    • 2
    • 3
  • i

    itay livni

    10/15/2019, 2:34 AM
    Hi - In a
    Flow
    I am looking to: (1) run a set of
    tasks
    (2) do something with the result in another `task`i.e. munge data. What is the suggested way to implement this? Is it using
    set_upstream
    ? the example I see
    say_hello.set_upstream(second_add, flow=flow)
    waits for a specific task to finish not a set of tasks. Does this get handled under the hood?
    c
    • 2
    • 2
Powered by Linen
Title
i

itay livni

10/15/2019, 2:34 AM
Hi - In a
Flow
I am looking to: (1) run a set of
tasks
(2) do something with the result in another `task`i.e. munge data. What is the suggested way to implement this? Is it using
set_upstream
? the example I see
say_hello.set_upstream(second_add, flow=flow)
waits for a specific task to finish not a set of tasks. Does this get handled under the hood?
c

Chris White

10/15/2019, 10:49 AM
Hey Itay - have a look at task.set_dependencies for setting multiple upstream dependencies at once!
i

itay livni

10/15/2019, 5:49 PM
something like this? How do I access the results of the upstream tasks?
This what I am trying to do
View count: 1