https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • c

    Chris Hart

    07/31/2019, 6:32 PM
    I tried switching from the
    LocalExecutor
    which was working fine, to the
    DaskExecutor
    example from this page in the docs: https://docs.prefect.io/guide/tutorials/dask-cluster.html after making sure the scheduler and workers are running, when I try the flow, I get this error before it exits:
    [2019-07-31 18:29:25,853] INFO - prefect.FlowRunner | Starting flow run.
    distributed.protocol.pickle - INFO - Failed to serialize <bound method FlowRunner.run_task of <FlowRunner: open_states_to_elasticsearch>>. Exception: can't pickle _thread.lock objects
    [2019-07-31 18:29:25,901] INFO - prefect.FlowRunner | Unexpected error: TypeError("can't pickle _thread.lock objects")
    [2019-07-31 18:29:25,902] ERROR - prefect.Flow | Unexpected error occured in FlowRunner: TypeError("can't pickle _thread.lock objects")
  • c

    Chris Hart

    07/31/2019, 6:33 PM
    might be a Dask issue?
  • c

    Chris Hart

    07/31/2019, 6:41 PM
    hmm seems to be this: https://github.com/dask/dask/issues/1683 or this: https://github.com/dask/dask/issues/4349
  • c

    Chris Hart

    07/31/2019, 6:42 PM
    running Python 3.7 with
    dask==2.1.0
    distributed==2.1.0
    prefect==0.6.0
  • c

    Chris White

    07/31/2019, 6:58 PM
    What objects are your tasks returning? This looks to me like you are returning an object which has a reference to a thread lock, which isn’t allowable because thread locks are not serializable
    c
    m
    • 3
    • 89
  • b

    Brian McFeeley

    07/31/2019, 9:30 PM
    I've made some good progress getting a more "real" dask cluster setup, but i'm still running into some issues with tasks/workers being lost. Anecdotally, I see this problem arise more frequently when I create more workers. Things will hum along smoothly, then right as the flow should be ending, it seems like the connectivity between workers and scheduler is disrupted:
    2019-07-31T21:26:08.029Z [dask-cluster-worker b47e4b0c1a70]: distributed.worker - INFO - Stopping worker at <tcp://elb-trialspark-19870.aptible.in:45604>
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fb4fcd2d950>, <Future finished exception=TypeError("'NoneType' object is not subscriptable")>)
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: Traceback (most recent call last):
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:   File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 758, in _run_callback
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:     ret = callback()
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:   File "/usr/local/lib/python3.7/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:     return fn(*args, **kwargs)
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:   File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:     future.result()
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:   File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:     yielded = self.gen.send(value)
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:   File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 796, in heartbeat
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]:     if response["status"] == "missing":
    2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: TypeError: 'NoneType' object is not subscriptable
    2019-07-31T21:26:08.036Z [dask-cluster-worker b47e4b0c1a70]: distributed.nanny - INFO - Closing Nanny at '<tcp://172.17.0.67:43533>'
    2019-07-31T21:26:08.037Z [dask-cluster-worker b47e4b0c1a70]: distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
    This causes us to rerun a large portion of the previously completed tasks whose results were not persisted. I still strongly suspect the issue lies in our deployment environment -- Aptible, our PaaS, routinely kills and restarts containers that meet or exceed their memory limits, for example. I've reached out to them to get some logs to see if they're restarting these containers, but if this problem is at all familiar let me know if you have a workaround.
  • b

    Brian McFeeley

    07/31/2019, 9:31 PM
    I had a 🐋 of a time getting the network firewalling to play nice, exposing the right ports and getting workers stood up listening and advertising on the right ports
  • b

    Brian McFeeley

    07/31/2019, 9:45 PM
    any ideas for some logging, etc. I could maybe turn on to get a better detail would be helpful! Otherwise, I'll keep digging and let you know if i figure it out
    c
    • 2
    • 12
  • c

    Chris Hart

    07/31/2019, 11:09 PM
    this might be a silly question but are there any alternatives to dask for parallelization that could just do “native” multiprocessing/async? (I’ve discovered that some of our chosen 3rd party libraries don’t work on dask, but the tradeoff of not having them is not worth it)
    c
    • 2
    • 3
  • a

    Alex Cano

    08/01/2019, 6:42 PM
    Is there any way to limit concurrency when mapping over a task? I’m wanting to send a bunch of API requests, but the endpoint I’m hitting is pretty fragile. I’m wanting the exact map functionality (feed in a list, and have each execute), but limit the execution rate. Either sequentially or limiting to N concurrent requests would be great. I’ve achieved the same functionality by arbitrarily breaking up the number of requests I’d need to send into chunks, but then those either succeed or fail as a group, which I’m hoping to avoid.
    c
    m
    • 3
    • 9
  • b

    Brian McFeeley

    08/01/2019, 7:27 PM
    I have a bit of a philosophical question about chaining mapped tasks to produce parallel pipelines. Let's imagine a simple flow of tasks like:
    items = get_list(...)
    t1 = transformation.map(items)
    t2 = transformation2.map(t1)
    ...
    say we expect due to some data noise that there's a low but nonzero failure chance of one of the steps late in the series of transformations, and only for certain subsets of the data. from a business logic perspective, the thing we'd like to happen is: every success happens as it happens, and individual task failures don't torpedo the whole pipeline, we just do some failure handling (maybe quarantining the input data in a separate bucket in s3 for analysis & notifying the data engineers, for example). The run is still a "success" in business terms if we get a large majority/plurality of tasks completed, but we do want to act on the failures by either creating new test cases or finding better filters for dirty data. i've seen docs on state transition handling, but i'm wondering how to use that in the context of a larger flow to both 1. get a batch-wide view of the incident frequency of error transitions and 2. not tank the whole pipeline as failed, unless maybe we exceed some configurable threshold of error rate. Does this run counter to y'alls programming model?
    👀 2
    b
    c
    +2
    • 5
    • 8
  • a

    Andrew Fulton

    08/07/2019, 10:43 PM
    I am trying to build a dag that that runs a pypark job. Does anyone have any experience doing anything like this? Ideally I could have it set up like in the code here https://docs.prefect.io/guide/getting_started/next-steps.html but im having trouble understanding how it is run. Would the create_cluster function create spark context, and if it does how it is then submitted?
    a
    • 2
    • 2
  • a

    Alex Kravetz

    08/08/2019, 1:22 PM
    I'd like to use prefect with snowflake as our data warehouse backend. Is there any interest in having
    SnowflakeQuery
    added back upstream? (I'm assuming to tasks/database?).
    👍 2
    d
    • 2
    • 2
  • b

    Brett Naul

    08/09/2019, 2:09 AM
    are there any doc examples of using `Secret`s in prefect core? the only page I could find just says vaguely what they are and that you could set them in the context, but not how to actually do so
    c
    • 2
    • 4
  • r

    rich

    08/09/2019, 7:36 PM
    Is there an example of prefect working with
    dbt
    ? I'd like to use prefect to control dbt and other workflows. https://github.com/fishtown-analytics/dbt
  • r

    rich

    08/09/2019, 7:37 PM
    Would it just be a matter of using the ShellTask? https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/shell.py
    c
    • 2
    • 1
  • a

    Alex Kravetz

    08/10/2019, 10:17 PM
    Will there be a UI outside of the paid cloud UI?
    j
    n
    • 3
    • 3
  • h

    Henry H

    08/11/2019, 5:53 PM
    Did a quick search in slack just to make sure it wasn't answered - is it possible to schedule multiple flows within the same python process without resorting to forking? Just wanted to make sure there wasn't an easier way than kicking off 2 different python processes each with their own flow and schedule.
    c
    • 2
    • 3
  • j

    Joe Schmid

    08/14/2019, 6:04 PM
    Hi Prefect team, We had a question on monitoring flow progress when running with the DaskExecutor. We've made great progress running some initial data science workflows on Prefect Core -- last week we were able to map over a thousand items and run successfully on a 100-node Dask cluster. (What would have taken ~50 hours completed in half an hour -- so cool!) During a large run like that, it's hard to monitor progress since any logging in tasks gets output on Dask workers. We tried adding StateHandlers on our Tasks thinking we might be able to log from those and see the output in a Jupyter notebook where we're running the flow, but those also output on the workers. Is there a way to better monitor progress (e.g. what tasks have completed for mapped elements) of a flow while running with DaskExecutor?
    :marvin: 1
    j
    • 2
    • 4
  • k

    KJ

    08/15/2019, 7:46 PM
    Thanks for the invite @Dylan and fixing the link for others.
    d
    • 2
    • 1
  • j

    James Watt

    08/19/2019, 4:03 AM
    Hello, how can I get the currently running task's retry attempt? Is it possible to get the elapsed time of the retry delay during the run time as well? Thanks in advance.
    c
    • 2
    • 3
  • j

    James Watt

    08/19/2019, 6:03 AM
    @Chris White It seems that once Prefect entering the retrying state, there is no API that could be called back to periodically check its context during retry delay period. Am I missing something here?
    c
    • 2
    • 11
  • a

    Akshay Verma

    08/19/2019, 1:04 PM
    I am getting
    'FunctionTask' object is not iterable
    error. Can anyone point me out, what it is?
    j
    c
    m
    • 4
    • 13
  • j

    James Watt

    08/20/2019, 3:03 AM
    Hi, I'm trying to get the next scheduled retry task through prefect.context.scheduled_start_time. I expected it would be something like shown in the INFO log. e.g. "[2019-08-20 01:59:21,499] INFO - prefect.Flow | Waiting for next available Task run at 2019-08-20T01:59:31.428285+00:00". Obviously, herein the retry task is scheduled to run in 10 seconds later. However, prefect.context.scheduled_start_time just gave me a now() time. How can I get the time info regarding when the next retry takes place?
    j
    • 2
    • 4
  • a

    Akshay Verma

    08/20/2019, 9:50 AM
    Hi, I am trying the following flow as an short example of what I want:
    # Flow #########################
        @task
        def generate_context(wfr_df, lvnr, tol):
            print(wfr_df, lvnr, tol)
            return wfr_df, tol
    
        with Flow('TDBB') as flow:
            wfr_df = Parameter("wfr_df")
            modeldef = Parameter("modeldef")
            tol = Parameter("tol")
            lvnr = Parameter("lvnr")
    
            context_level = generate_context(wfr_df, lvnr, tol)
            context_level_ml = context_level[0]
            context_level_ci = context_level[1]
            print(context_level_ml)
            print(context_level_ci)
    
        flow.run(parameters=dict(
            wfr_df=mls,
            modeldef=("tx", "ty", "mwx", "mwy", "rwx", "rwy", "mx", "my", "rx", "ry"),
            tol=1e6,
            lvnr=3,
        ))
    I am getting the following error :
    Flow.run received the following unexpected parameters: modeldef
    when I check
    self.parameters()
    from
    flow.py
    I have the following response:
    {<Parameter: tol>, <Parameter: lvnr>, <Parameter: wfr_df>}
    Can anyone point out what I am doing wrong here?
    j
    • 2
    • 6
  • j

    Jerry Thomas

    08/22/2019, 4:49 AM
    Hi, is there a way to pass values instead of arrays for tasks that use map. Map passes items of a list to a task and I have a situation where I have a function that has two parameters and I want the data parameter to be split across multiple instances but the value parameter to be same across all tasks. I can achieve this by using the example below
    python
    @task
    def convert(data, value):
        data['x'] *= value
        return data
    
    data = [{"x": x, "y": x % 5, "z": x % 3} for x in range(10)]
    
    with Flow("convert-using-value") as flow:
        res = convert.map(data, [2 for i in range(10)])
    However, it there a better way to say mark the value parameter to be shared across multiple executions for a call.
    j
    • 2
    • 2
  • m

    mooncake4132

    08/22/2019, 5:35 AM
    Hi! I'm looking for some clarification for the release of Prefect's UI. Is it correct to say UI (will) only be released with the cloud version so we won't be able to use it with our self-host OSS version?
    m
    j
    • 3
    • 3
  • a

    AkashB

    08/22/2019, 6:04 AM
    Hi, is there a way to create a workflow by reading all tasks and dependencies from a json/xml ?
    j
    c
    • 3
    • 18
  • a

    Akash

    08/22/2019, 10:29 AM
    message has been deleted
    j
    • 2
    • 1
  • j

    Jason Damiani

    08/22/2019, 3:42 PM
    Hey there, I have a package I've developed that contains a module which exposes most of the functionality of the package wrapped as FunctionTasks. Its not clear to me how I can add retries to a task that was not defined with the
    @task
    decorator when mapping:
    import culvert.tasks as ct
    
    with Flow("some flow") as f:
       <some other task>
       copy_batches_local = ct.copy_batch_local.map(
            src_conn_str=unmapped(src_conn_str),
            batch_range=create_batches,
            src_table=unmapped(src_table),
            split_by=unmapped(split_by),
            data_path=unmapped(data_path),
            task_args={"max_retries": 3},
        )
    TypeError: got an unexpected keyword argument 'task_args'
    Also tried:
    copy_batches_local = ct.copy_batch_local(task_args={"max_retries": 3}).map(
            src_conn_str=unmapped(src_conn_str),
            batch_range=create_batches,
            src_table=unmapped(src_table),
            split_by=unmapped(split_by),
            data_path=unmapped(data_path),
        )
    TypeError: missing a required argument: 'src_conn_str'
    Is there a way to partially bind my ct.copy_batch_local task?
    j
    • 2
    • 8
Powered by Linen
Title
j

Jason Damiani

08/22/2019, 3:42 PM
Hey there, I have a package I've developed that contains a module which exposes most of the functionality of the package wrapped as FunctionTasks. Its not clear to me how I can add retries to a task that was not defined with the
@task
decorator when mapping:
import culvert.tasks as ct

with Flow("some flow") as f:
   <some other task>
   copy_batches_local = ct.copy_batch_local.map(
        src_conn_str=unmapped(src_conn_str),
        batch_range=create_batches,
        src_table=unmapped(src_table),
        split_by=unmapped(split_by),
        data_path=unmapped(data_path),
        task_args={"max_retries": 3},
    )
TypeError: got an unexpected keyword argument 'task_args'
Also tried:
copy_batches_local = ct.copy_batch_local(task_args={"max_retries": 3}).map(
        src_conn_str=unmapped(src_conn_str),
        batch_range=create_batches,
        src_table=unmapped(src_table),
        split_by=unmapped(split_by),
        data_path=unmapped(data_path),
    )
TypeError: missing a required argument: 'src_conn_str'
Is there a way to partially bind my ct.copy_batch_local task?
j

Jeremiah

08/22/2019, 3:50 PM
Hey @Jason Damiani, great question. You’re right,
task_args
isn’t exposed for mapping for the way it is for normal calls, but it should be! I’m going to open an issue to address that asap. In the meantime, that
task_args
keyword is actually just getting passed to a task copy statement, so you should be able to recreate it like this:
ct.copy_batch_local.copy(max_retries=3).map(...)
Each task’s
copy
statement accepts
task_args
that you want to overwrite
Will that solve the problem?
j

Jason Damiani

08/22/2019, 3:51 PM
Awesome, thanks @Jeremiah
I believe so
j

Jeremiah

08/22/2019, 3:51 PM
If it doesn’t let us know
https://github.com/PrefectHQ/prefect/issues/1390
View count: 1