https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • s

    simone

    07/15/2020, 3:02 PM
    Hi, Is it possible to output the logs also to a file? thanks
    j
    2 replies · 2 participants
  • s

    Sven Teresniak

    07/15/2020, 4:38 PM
    i have a local prefect test cluster with a single dask-scheduler and dask-worker doing the work. i submit jobs by calling
    flow.register()
    and its working fine. i see the work in the dask-ui and in the prefect-ui as well. now the question: is there a easy way to modularize a flow? to allow imports of local modules (e.g. boilerplate code) to re-use code in different flows? in spark i can bundle a boilerplate.jar and publish the jar together with the job for a single job run (or the same jar for every job). is there any mechanism in prefect? do i have to deploy the boilerplate code to my (every) dask-worker in advance?
    j
    10 replies · 2 participants
  • r

    Robin

    07/15/2020, 5:11 PM
    Dear all, how to best use
    AWS secrets manager
    with prefect? 1️⃣ Fetch the credentials within each task 2️⃣ Create a task that fetches the credentials once 3️⃣ another option Which one is considered most save? We are aware of Prefect secrets, but would also like to save all secrets at one place, e.g. AWS secrets manager…
    2️⃣ 3
    s
    j
    +1
    10 replies · 4 participants
  • a

    Ankit

    07/15/2020, 6:01 PM
    Hey guys, I am facing an issue while trying to use a
    Dask executor
    along with prefect ui+server. My prefect ui+server and dask are all on docker containers via a docker-compose file, belonging to the same network. I start up my agent, which is a local agent on a docker container and use
    flow.register()
    to register my flow. When I try to set the executor in flow.environment , I get an error. Not sure how to proceed here. This is how I am setting the dask executor:
    client = Client("scheduler:8786")
            dask_executor = DaskExecutor(address=client.scheduler.address)
            flow.environment = LocalEnvironment(executor=dask_executor)
            flow.register()
    The error that I get is :
    flow.environment = LocalEnvironment(executor=dask_executor)
    TypeError: __init__() got an unexpected keyword argument 'executor'
    I am also getting a warning:
    warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
    j
    5 replies · 2 participants
  • g

    Gary

    07/16/2020, 3:24 AM
    Hi folks, we are evaluating Prefect to handle our data flow. By far all things are great for our needs. The only concern is we are not sure is it okay if we generate massive number of mapped tasks? One of our scenario is to use Prefect to perform crawl, calculate and store financial data (e.g., financial statement and daily trading data of US stocks). For the purpose, here is the simplified example code:
    @task
    def get_stock_id_list():
      # query stock id list from database
      stock_id_list = query_stock_id_list_from_db()
      return stock_id_list
    
    @task
    def crawl_stock_data_from_external_website(stock_id):
      # Crawler related work
      return crawl_stock_data(stock_id)
    
    @task
    def perform_calculation(crawled_data):
      # Some calculation with Pandas
      perform_some_calculation(crawled_data)
    
    
    with Flow('Example flow') as flow:
      # The number of stock id in list is about ten thousand.
      stock_id_list = get_stock_id_list()
    
      crawled_data_list = crawl_stock_data_from_external_website.map(stock_id_list)
    
      perform_calculation.map(crawled_data_list)
    Is it okay for Prefect to generate about 10,000~50,000 mapped tasks within the above flow without any problem? Another scenario is to generate about 1 million tasks within a flow. In this scenario, we query user id list from our database, and perform some calculation about user behavior analysis, (One user is mapped to one mapped task.) Is it okay? Or is there a better way to do this?
    c
    j
    +1
    10 replies · 4 participants
  • s

    simone

    07/16/2020, 8:27 AM
    Hi I am trying to port our image processing pipeline to prefect. The analysis we run relays on a couple of configuration yaml file that are loaded from each process when the analysis runs in parallel. I was just wandering if I should keep this approach or add the parameters to the
    prefect.context
    and avoid the file loading. Is there any reason why I should not have the config data added to the context. Thanks a lot!
    j
    2 replies · 2 participants
  • a

    Arlo Bryer

    07/16/2020, 9:30 AM
    Hi everyone - could somebody tell me if there's a way to ensure that a flow doesn't start if the previous run of the flow hasn't yet finished?
  • a

    Arlo Bryer

    07/16/2020, 9:31 AM
    i.e. for a scheduled run of a flow (say every two hours), if a flow that started at t0 is still running when the flow at t1(2h later) should kick off, the t1 flow is delayed until t0 has finished. This then propagating to t2, t3, ..., tn?
    j
    2 replies · 2 participants
  • i

    Iain Dillingham

    07/16/2020, 11:27 AM
    Hi everyone. I have a mapped task that returns a list of objects; actually it returns a list of lists of objects, because it's mapped. I'd like to flatten the list of lists, before passing the list of objects to another task. I have written my own flatten task, but was wondering whether I'd missed something obvious: an idiom, a built-in function, etc. I'd appreciate the community's thoughts 🙂
    j
    3 replies · 2 participants
  • a

    Alfie

    07/16/2020, 12:20 PM
    Hi Team, a question about Functional API, can I specify upstream of a task manually? I want to have a task which is just wait for a few seconds, and it’s upstream of a task executes some actions
    j
    6 replies · 2 participants
  • a

    Alfie

    07/16/2020, 12:20 PM
    thanks
  • a

    Avi A

    07/16/2020, 12:27 PM
    I’ve tried using
    DaskExecutor
    in the past but it was even worse because every now and then the execution froze and I stopped receiving logs so I left it for the time being because it’s enough for me to run on a strong machine and don’t need a cluster yet.
    l
    j
    +1
    57 replies · 4 participants
  • a

    Avi A

    07/16/2020, 12:27 PM
    Hey, I’m using
    LocalDaskExecutor
    . It has been working well for a while but I’m getting the following errors in the last couple of days, in the middle of running a long flow. IDK what changed, but it seems that the process is unable to spawn a new thread of something. So I reset the server to make sure that stale processes/threads don’t exist in the system, but I’m still getting these.
    👀 1
  • a

    Avi A

    07/16/2020, 12:27 PM
    OpenBLAS blas_thread_init: pthread_create failed for thread 29 of 32: Resource temporarily unavailable
    OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
    OpenBLAS blas_thread_init: pthread_create failed for thread 30 of 32: Resource temporarily unavailable
    OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
    OpenBLAS blas_thread_init: pthread_create failed for thread 31 of 32: Resource temporarily unavailable
    OpenBLAS blas_thread_init: RLIMIT_NPROC 837332 current, 837332 max
        _run_code(code, mod_globals, init_globals,
      File "/usr/lib/python3.8/runpy.py", line 85, in _run_code
        exec(code, run_globals)
      File "/home/avi/.virtualenvs/prefect/bin/prefect", line 5, in <module>
        from prefect.cli import cli
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/__init__.py", line 8, in <module>
        import prefect.triggers
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/triggers.py", line 48, in <module>
        from prefect.engine import signals
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 116, in spawn_main
        exitcode = _main(fd, parent_sentinel)
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 125, in _main
        prepare(preparation_data)
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 236, in prepare
        _fixup_main_from_path(data['init_main_from_path'])
      File "/usr/lib/python3.8/multiprocessing/spawn.py", line 287, in _fixup_main_from_path
        main_content = runpy.run_path(main_path,
      File "/usr/lib/python3.8/runpy.py", line 262, in run_path
        return _run_module_code(code, init_globals, run_name,
      File "/usr/lib/python3.8/runpy.py", line 95, in _run_module_code
        _run_code(code, mod_globals, init_globals,
      File "/usr/lib/python3.8/runpy.py", line 85, in _run_code
        exec(code, run_globals)
      File "/home/avi/.virtualenvs/prefect/bin/prefect", line 5, in <module>
        from prefect.cli import cli
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/__init__.py", line 8, in <module>
        import prefect.triggers
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/triggers.py", line 48, in <module>
        from prefect.engine import signals
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/engine/__init__.py", line 8, in <module>
        from prefect.engine.flow_runner import FlowRunner
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 13, in <module>
        from prefect.core import Edge, Flow, Task
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/core/__init__.py", line 4, in <module>
        from prefect.core.flow import Flow
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/core/flow.py", line 40, in <module>
        from prefect.environments import Environment
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/__init__.py", line 1, in <module>
        from prefect.environments.execution import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/__init__.py", line 8, in <module>
        from prefect.environments.execution.dask import DaskKubernetesEnvironment
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/dask/__init__.py", line 2, in <module>
        from prefect.environments.execution.dask.cloud_provider import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/prefect/environments/execution/dask/cloud_provider.py", line 5, in <module>
        from distributed.deploy.cluster import Cluster
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/__init__.py", line 3, in <module>
        from .actor import Actor, ActorFuture
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/actor.py", line 6, in <module>
        from .client import Future, default_client
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/client.py", line 43, in <module>
        from .batched import BatchedSend
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/batched.py", line 8, in <module>
        from .core import CommClosedError
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/core.py", line 18, in <module>
        from .comm import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/comm/__init__.py", line 1, in <module>
        from .addressing import (
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/comm/addressing.py", line 5, in <module>
        from ..utils import get_ip_interface
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/utils.py", line 97, in <module>
        mp_context = _initialize_mp_context()
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/distributed/utils.py", line 88, in _initialize_mp_context
        importlib.import_module(pkg)
      File "/usr/lib/python3.8/importlib/__init__.py", line 127, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/__init__.py", line 142, in <module>
        from . import core
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/__init__.py", line 24, in <module>
        from . import multiarray
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/multiarray.py", line 14, in <module>
        from . import overrides
      File "/home/avi/.virtualenvs/prefect/lib/python3.8/site-packages/numpy/core/overrides.py", line 7, in <module>
        from numpy.core._multiarray_umath import (
    KeyboardInterrupt
  • s

    Sven Teresniak

    07/16/2020, 12:50 PM
    https://docs.prefect.io/orchestration/recipes/k8s_dask.html Why is work stealing disabled in your example and whats the purpose of
    $DASK_DISTRIBUTED__SCHEDULER__BLOCKED_HANDLERS
    ?
    j
    5 replies · 2 participants
  • s

    Sven Teresniak

    07/16/2020, 1:49 PM
    How can i change a flow run's name. the flow is triggered by a cron scheduler with time information as a parameter (aggregation for hourly-data). i want to tag each run with this parameter. whats the right way to achive this?
    👀 1
    d
    8 replies · 2 participants
  • c

    Chris Goddard

    07/16/2020, 3:19 PM
    I may just be missing something but I keep getting a result of None when I try to do something with the result of the DbtShellTask and looking at the source code it looks like the
    run
    method of DbtShellTask doesn't return the result of the superclass's run method.
    👀 1
    l
    3 replies · 2 participants
  • s

    Sven Teresniak

    07/16/2020, 4:33 PM
    I want to run a registered flow with
    prefect run serer --name flowname
    but I need to set a parameter for this run. Is this possible?
    👀 1
    d
    11 replies · 2 participants
  • l

    Luke Orland

    07/16/2020, 5:26 PM
    I'm attempting to parameterize the name of a secret for a flow. This attempt is not working:
    👀 1
    d
    6 replies · 2 participants
  • j

    Jennifer Meng

    07/16/2020, 8:14 PM
    Hello everyone! I try to run a flow with graphql, but failed to pass parameters. Please help! 🙂
    mutation($parameters: JSON!) {create_flow_run(input: { flow_id: "my_flow_id", parameters: $parameters }) {id}}
    {"parameters":{"param1":"my_param1"}}
    
    Argument 'input' has invalid value {flow_id: \"my_flow_id\", parameters: $parameters}.
    z
    7 replies · 2 participants
  • m

    Matt Allen

    07/16/2020, 11:38 PM
    I'm trying to run the prefect ui and apollo in kubernetes, and I've got everything almost working. When I try to register a flow from my local machine I get a 400 from the graphql endpoint, but I can't see any other information about what's going wrong with the request. Is it possible to get some kind of debug or verbose output from flow.register?
    c
    m
    4 replies · 3 participants
  • a

    Alfie

    07/17/2020, 5:39 AM
    Hi folks, I want to use a Parameter which is a list and each item is parameter for a task to run. I cannot iterator Parameters in definition of the flow, any suggestions about how to achieve that?
    j
    3 replies · 2 participants
  • m

    Mac Gréco Péralte Chéry

    07/17/2020, 5:41 AM
    Hello, everyone i am trying to pause an unpause the schedule of a flow using the setFlowScheduleState mutation as defined in the documentation by using this graphl code:
    mutation {
      setFlowScheduleState(input: { flow_id: "<flow id>", set_active: true }) {
        success
      }
    }
    but i get this error:
    {
      "graphQLErrors": [],
      "networkError": {
        "name": "ServerError",
        "response": {},
        "statusCode": 400,
        "result": {
          "errors": [
            {
              "message": "Cannot query field \"setFlowScheduleState\" on type \"Mutation\". Did you mean \"set_schedule_active\", \"set_flow_run_states\", or \"set_schedule_inactive\"?",
              "locations": [
                {
                  "line": 2,
                  "column": 3
                }
              ],
              "extensions": {
                "code": "GRAPHQL_VALIDATION_FAILED"
              }
            }
          ]
        }
      },
      "message": "Network error: Response not successful: Received status code 400"
    }
    👀 1
    j
    10 replies · 2 participants
  • s

    Sumant Agnihotri

    07/17/2020, 7:34 AM
    Hi all, I'm new to Prefect. Can you help me out with this: This is the basic flow:
    def main():
         # list of arguments
         ls = [1,2,3,4]
     
         # start a thread for calculate method for each arg in ls
         for each in ls:
             thread(each)
     
     def thread(arg):
         t = threading.Thread(target=calculate, args=(arg))
         t.start()
     
     def calculate(*args):
         # does time consuming calculations and pushes the result in the db
    I'm running
    calculate
    method in a python thread, so that multiple threads can run the calculations. Issue is, if
    ls
    is too long, I crash the server (maybe due to too many threads). I want to do this using prefect-dask, but I'm not sure how to do it, or where to look for examples. Any help will be much appreciated. Thanks.
    j
    1 reply · 2 participants
  • s

    Sven Teresniak

    07/17/2020, 8:47 AM
    I have a flow that periodically checks a $thing for some $state. If the check fails I need to run a registered flow. What is the most elegant way to schedule a flow run from within another flow? Writing a query like the
    run.py
    from the CLI code?
    client = Client()
        result = client.graphql(query)
    I don't want to use
    prefect run server …
    from a shell task. I like it pythonic https://docs.prefect.io/api/latest/client/client.html seems perfect. But it states
    Client for communication with Prefect Cloud
    On the other hand: prefect CLI uses the
    Client
    class for non-cloud schedules as well… 🙂
    e
    2 replies · 2 participants
  • m

    Matthias

    07/17/2020, 9:44 AM
    Hi, I have the following situation: When I run this example class, it creates a persisted result stored on disk when I
    flow.register()
    and run via UI. When run via
    flow.run()
    it does not store anything. What do I have to do to not store anything? This keeps filling up storage on my server. It is just an example, it seems to me to be related to the mapping.
    from prefect import Flow, task
    from string import ascii_lowercase
    
    
    @task(checkpoint=False)
    def extract_things(c):
        return c
    
    with Flow(
        "Do it",
    ) as flow:
        extract_things.map(list(ascii_lowercase))
    j
    4 replies · 2 participants
  • k

    Klemen Strojan

    07/17/2020, 11:35 AM
    I am trying to delete a flow via Interactive API:
    mutation {
      delete_flow(input: {flow_id: "e1bd5dfa-3761-41ca-ac82-2104cf5157c1"}) {
        success,
        error
      }
    }
    Here is the error:
    {
      "graphQLErrors": [
        {
          "path": [
            "delete_flow"
          ],
          "message": "Operation timed out",
          "extensions": {
            "code": "API_ERROR"
          }
        }
      ],
      "networkError": null,
      "message": "GraphQL error: Operation timed out"
    }
    I was unable to delete it in the UI (the older versions were deleted though). We are using Cloud. Any ideas?
    j
    3 replies · 2 participants
  • b

    bruno.corucho

    07/17/2020, 12:26 PM
    Hello Prefect! So I just noticed that the Dask K8s Parallelism issue should now be solved and working after checking this but unfortunately, I couldn't have it running properly so far. Can anyone give me a hand? Source code:
    @task
    def hello_world():
        time.sleep(15)
        logger = prefect.context.get("logger")
        logger.info("I WOKE UP, GUYS!!!")
    
    with Flow("strdata-test") as test:
        hello_world()
        hello_world()
        hello_world()
        hello_world()
        hello_world()
    
        for n in range(4):
    test.storage = Docker(registry_url=os.environ["REGISTRY_URL"],
                                           dockerfile="deploy/Dockerfile",
                                           image_name="strdata-flow"
                                           )
    
    test.environment = DaskKubernetesEnvironment(max_workers=4)
    test.register(project_name=os.environ["PROJECT_NAME"])
    Output:
    j
    r
    +2
    29 replies · 5 participants
  • m

    Matt Wong-Kemp

    07/17/2020, 12:27 PM
    Hey there - does anyone know the minimum version of cloudpickle required for prefect? I suspect I have a version mismatch, as am getting
    unsupported pickle protocol: 5
    as an error on deserializing a flow inside an image that has
    cloudpickle==1.2.2
    in it. FWIW it serialized fine from inside the same image.
    j
    3 replies · 2 participants
  • k

    karteekaddanki

    07/17/2020, 12:53 PM
    Hey guys, Is 
    FlowRunTask
     expected to update the runs of the child flow in prefect cloud? I'm not seeing it. I'm trying to implement backfill functionality using 
    FlowRunTask
     inside a generic wrapper flow. Any help on the idiomatic way to achieve backfill functionality would be appreciated.
    n
    6 replies · 2 participants
Powered by Linen
Title
k

karteekaddanki

07/17/2020, 12:53 PM
Hey guys, Is 
FlowRunTask
 expected to update the runs of the child flow in prefect cloud? I'm not seeing it. I'm trying to implement backfill functionality using 
FlowRunTask
 inside a generic wrapper flow. Any help on the idiomatic way to achieve backfill functionality would be appreciated.
n

nicholas

07/17/2020, 1:15 PM
Hi @karteekaddanki, can you clarify what you mean? The FlowRunTask is only responsible for creating a new Flow Run, which then becomes its own independent entity.
k

karteekaddanki

07/17/2020, 1:49 PM
Sorry. Turns out I need to actually run the
FlowRunTask
that I created for it to actually run. I am trying to get it work. I'll post any further issues that I encounter in this thread. Thanks for your quick response.
n

nicholas

07/17/2020, 1:59 PM
No worries @karteekaddanki - I've also been notified that the
FlowRunTask
can await the return of the Flow Run it creates, and then take on the status of that Flow Run, so that may be helpful to what you're doing.
k

karteekaddanki

07/17/2020, 4:41 PM
Okay I made it work. It wasn't working earlier because of the context in which i was initializing the task. This works without any issues
task = FlowRunTask()
with Flow("foo") as flow:
    task.run()
but this doesn't
with Flow("foo") as flow:
    task = FlowRunTask()    
    task.run()
This distinction is in the docs but might be worth highlighting.
n

nicholas

07/17/2020, 4:51 PM
Glad you got it solved @karteekaddanki! Feel free to PR the docs if you find that could use some clarifying.
k

karteekaddanki

07/17/2020, 5:05 PM
Sure thing. Thanks so much for your help. You did an amazing job with Prefect. It solves a lot of our use cases.
🚀 2
View count: 1