https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • f

    Fred Israel

    01/30/2020, 6:56 PM
    is there a way to do it? Or should i just move this attribute accessing inside my train_model task?
    c
    • 2
    • 1
  • f

    Fred Israel

    01/30/2020, 7:06 PM
    message has been deleted
  • f

    Fred Israel

    01/30/2020, 7:12 PM
    While we are at it, just asked Prefect to visualize my pipeline, and its beautiful 😢 😲 Thanks for the good work 🙂
    ⬆️ 1
    💯 2
    :marvin: 3
  • j

    John Ramirez

    01/30/2020, 8:28 PM
    Does anyone know how to optimize a mapping task with a 5000+ list of values. The task stalls ands all the memory on the dask worker
    j
    • 2
    • 1
  • n

    Nayeentara

    01/31/2020, 7:36 AM
    Hi All, I am new to Prefect and our team just started using it. I am working on a POC to integrate our Prefect Cloud to GCP Kubernetes Engine, Can you direct me to appropriate documentation or any suggestions to help me get started?
    n
    • 2
    • 2
  • a

    Alex Cano

    01/31/2020, 6:12 PM
    Hey guys, I think I’m going insane. I’m trying to set up a little introduction flow (code at the bottom), and I keep getting an error when trying to run this of:
    AttributeError: Context object has no attribute dict
    . Can anyone spot something wrong with the flow (like structurally or conceptually)?
    from prefect import Flow, task
    from prefect.environments.storage import Local
    
    @task
    def extract():
        return [1, 2, 3]
    
    
    @task
    def always_process_and_pass_downstream(element: int) -> int:
        return element * 2
    
    
    @task
    def load(elements: List[int]) -> None:
        total = sum(elements)
    
    
    @task
    def process_and_finish(element: int) -> None:
        avg = sum(element) / len(element)
        pass
    
    
    with Flow("test client register flow 1", storage=Local()) as flow:
        extracted = extract()
        processed = always_process_and_pass_downstream(element=extracted, mapped=True)
        process_and_finish(element=processed)
        load(elements=processed)
    
    result = flow.run()
    Thanks!
    👀 2
    c
    • 2
    • 8
  • d

    Dharhas Pothina

    02/01/2020, 4:53 PM
    Hi all. Bit confused on how to use prefect with dask dataframes Does the following stub code make sense:
    from prefect import Flow, task
    
    @task
    def read_parquet(src_path)
        return df # returns dask df
    
    
    @task
    def retrieve_img(x):
        return img
    
    @task
    def model(img):
        return result
    
    @task
    def write_db(result):
        # write to db
    
    @task
    def write_parquet(dst_path, df):
        # write to disk
    
    with Flow('Test') as flow:
        df = read_parquet(path)
        images = retrieve_img.map(df.iterrows())
        results = model.map(images)
        write_db.map(results)
        write_parquet(df.join(dd.from_sequence(results)))
    essentially each row of my input dask dataframe needs to be processed and the results appended to the dataframe and saved in a database.
    j
    • 2
    • 5
  • i

    itay livni

    02/02/2020, 6:43 PM
    Hello again - I would like to parallelize a
    task, make_node
    in a
    LOOP
    . The current implementation is a
    task
    that sits inside a
    Flow
    wrapped up in a
    looped
    task
    . I tried some quick stabs at the
    Dask Executor
    . Errors arose. Is there a better pattern to achieve parallelizing
    make_node
    ? Thanks
    Untitled
  • r

    Romain

    02/03/2020, 3:15 PM
    Hi, When using tags for dask resources, I observe a strange behavior while using mapping. Let's consider the following flow:
    map_fn = FunctionTask(lambda x: x + 1, tags=["dask-resource:GPU=1"])
    
    with Flow('test') as flow:
        list = [1]
        map_fn.map(list)
    While running such a flow on a dask cluster with 1 scheduler, and 1 worker (with
    --resources GPU=1
    ), it turns out that the flow is blocked because there are 2 tasks that "reserved" the gpu resources (see attached pics from the dask dashboard). It looks like the map itself reserved 1 GPU, and then the map_fn map on element from numbers cannot be processed because dask wait for the resource to be released. Is that the expected behavior of the mapping with dask resources?
    c
    • 2
    • 1
  • j

    Jackson Maxfield Brown

    02/03/2020, 10:20 PM
    Question: is it possible to use the @task decorator on a class method?
    c
    • 2
    • 4
  • t

    Tsang Yong

    02/04/2020, 1:28 AM
    does shelltask support timeout?
    c
    • 2
    • 2
  • f

    Fabian Thomas

    02/04/2020, 1:51 PM
    Hey, Prefect looks great! One important thing for me would be a UI for monitoring Flow execution and scheduling. When do you plan to release your UI and can you recommend any intermediate solutions?
  • f

    Fabian Thomas

    02/04/2020, 2:27 PM
    The docs say "Prefect’s UI is not yet available to the public" but telling by this issue (https://github.com/PrefectHQ/prefect/issues/1404) you don't plan to release an open-source tool for managing multiple workflows. What could be a good starting point for building a homegrown solution?
    j
    • 2
    • 2
  • d

    Daniel Gazit

    02/04/2020, 2:52 PM
    Hi :)) Question: how does one scale tasks vertically? I have a pipeline that maps some initial values to multiple tasks. I want the entire pipeline to be performed on each initial value before moving to the next value (or that each worker will process the entire pipeline before moving to the next one)
    j
    • 2
    • 3
  • r

    RyanB

    02/04/2020, 9:37 PM
    after my flow completes successfully
    [2020-02-04 05:17:43,587] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    I try and terminate the dask distrbuted cluster cleanly, but always get :
    2020-02-04 05:17:44,467 INFO stopping Dask cluster
    distributed.scheduler - INFO - Scheduler closing...
    distributed.scheduler - INFO - Scheduler closing all comms
    distributed.scheduler - INFO - Remove worker <Worker '<tcp://100.96.120.2:37227>', name: <tcp://100.96.120.2:37227>, memory: 0, processing: 0>
    distributed.core - INFO - Removing comms to <tcp://100.96.120.2:37227>
    distributed.batched - INFO - Batched Comm Closed: in <closed TCP>: Stream is closed
    distributed.scheduler - INFO - Remove worker <Worker '<tcp://100.96.120.2:43873>', name: <tcp://100.96.120.2:43873>, memory: 0, processing: 0>
    followed by:
    distributed.scheduler - INFO - Lost all workers
    2020-02-04 05:17:45,604 WARNING Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f3856212470>: Failed to establish a new connection: [Errno 111] Connection refused',)': /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot
    2020-02-04 05:17:45,604 WARNING Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f384f59eb38>: Failed to establish a new connection: [Errno 111] Connection refused',)': /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot
    2020-02-04 05:17:45,605 WARNING Retrying (Retry(total=0, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f384f59e0b8>: Failed to establish a new connection: [Errno 111] Connection refused',)': /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot
    Traceback (most recent call last):
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connection.py", line 157, in _new_conn
    (self._dns_host, self.port), self.timeout, **extra_kw
    File "/usr/local/lib/python3.6/dist-packages/urllib3/util/connection.py", line 84, in create_connection
    raise err
    File "/usr/local/lib/python3.6/dist-packages/urllib3/util/connection.py", line 74, in create_connection
    sock.connect(sa)
    ConnectionRefusedError: [Errno 111] Connection refused
    During handling of the above exception, another exception occurred:
    Traceback (most recent call last):
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 376, in _make_request
    self._validate_conn(conn)
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 994, in _validate_conn
    conn.connect()
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connection.py", line 300, in connect
    conn = self._new_conn()
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connection.py", line 169, in _new_conn
    self, "Failed to establish a new connection: %s" % e
    urllib3.exceptions.NewConnectionError: <urllib3.connection.VerifiedHTTPSConnection object at 0x7f3874be2fd0>: Failed to establish a new connection: [Errno 111] Connection refused
    During handling of the above exception, another exception occurred:
    Traceback (most recent call last):
    File "/usr/lib/python3.6/weakref.py", line 624, in _exitfunc
    f()
    File "/usr/lib/python3.6/weakref.py", line 548, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
    File "/usr/local/lib/python3.6/dist-packages/dask_kubernetes/core.py", line 623, in _cleanup_resources
    pods = core_api.list_namespaced_pod(namespace, label_selector=format_labels(labels))
    File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/apis/core_v1_api.py", line 12372, in list_namespaced_pod
    (data) = self.list_namespaced_pod_with_http_info(namespace, **kwargs)
    File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/apis/core_v1_api.py", line 12472, in list_namespaced_pod_with_http_info
    collection_formats=collection_formats)
    File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/api_client.py", line 334, in call_api
    _return_http_data_only, collection_formats, _preload_content, _request_timeout)
    File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/api_client.py", line 168, in __call_api
    _request_timeout=_request_timeout)
    File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/api_client.py", line 355, in request
    headers=headers)
    File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/rest.py", line 231, in GET
    query_params=query_params)
    File "/usr/local/lib/python3.6/dist-packages/kubernetes/client/rest.py", line 205, in request
    headers=headers)
    File "/usr/local/lib/python3.6/dist-packages/urllib3/request.py", line 76, in request
    method, url, fields=fields, headers=headers, **urlopen_kw
    File "/usr/local/lib/python3.6/dist-packages/urllib3/request.py", line 97, in request_encode_url
    return self.urlopen(method, url, **extra_kw)
    File "/usr/local/lib/python3.6/dist-packages/urllib3/poolmanager.py", line 330, in urlopen
    response = conn.urlopen(method, u.request_uri, **kw)
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 760, in urlopen
    **response_kw
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 760, in urlopen
    **response_kw
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 760, in urlopen
    **response_kw
    File "/usr/local/lib/python3.6/dist-packages/urllib3/connectionpool.py", line 720, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
    File "/usr/local/lib/python3.6/dist-packages/urllib3/util/retry.py", line 436, in increment
    raise MaxRetryError(_pool, url, error or ResponseError(cause))
    urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='localhost', port=443): Max retries exceeded with url: /api/v1/namespaces/logflow/pods?labelSelector=app%3Ddask%2Ccomponent%3Dworker%<http://2Cdask.org|2Cdask.org>%2Fcluster-name%3Ddask-root-b81ddee0-5%2Cuser%3Droot (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f3874be2fd0>: Failed to establish a new connection: [Errno 111] Connection refused',))
    is there something specific I need to do to properly tear down the dask cluster?
    c
    • 2
    • 2
  • a

    Artem Andrienko

    02/05/2020, 2:00 PM
    Hi all! Trying to construct some simple pipeline. One of tasks return dict, that I want to unpack in another one:
    with Flow("Data collection") as flow:
        params = task(params)
        another_task(**params)
    but python raise an error
    TypeError: FunctionTask object argument after ** must be a mapping, not FunctionTask
    Is there any way to unpack dict from task?
    j
    i
    • 3
    • 4
  • i

    itay livni

    02/06/2020, 5:42 AM
    Hi - I am getting lost in the documentation regarding deployment. What is the right path to deploy a flow to Fargate? Agent or Task? Thanks
    c
    j
    • 3
    • 5
  • p

    Preston Marshall

    02/06/2020, 2:47 PM
    Is there anything written up on using core alone with dask-kubernetes?
    j
    j
    • 3
    • 29
  • p

    Preston Marshall

    02/06/2020, 3:31 PM
    Ok, another question. Would you mind describing at a high level how I would, on a schedule: reach out to an SFTP server, list the files (dynamic workflows!), copy them somewhere, and send a slack notification? I think the dynamic part has me the most confused right now, the rest is just stringing tasks together from the library
    j
    • 2
    • 2
  • j

    John Ramirez

    02/06/2020, 8:01 PM
    Hey - is there a way in the task
    map
    to run a cross product. For example, task_one with inputs [a, b] and [c, d] will run {a,c] [a,d] [b, c] [b,d]
    c
    • 2
    • 1
  • p

    Preston Marshall

    02/06/2020, 10:36 PM
    It seems I've hit a limitation. I'm working on my sftp thing I mentioned earlier and figured I'd use namedtuples for the parameters. It seems that's not possible?
    c
    • 2
    • 3
  • p

    Preston Marshall

    02/06/2020, 10:58 PM
    Also is there a way to pass streams across task boundaries? I'd like to stream data from sftp to GCS if possible, that way it doesn't need to be all downloaded to disk first
    c
    • 2
    • 6
  • p

    Preston Marshall

    02/07/2020, 3:44 PM
    New question: can you run tasks inside of other tasks, or can you only call tasks from the flow itself?
    j
    • 2
    • 1
  • n

    Nate Atkins

    02/07/2020, 7:43 PM
    I've been working through the tutorial and ran into a Dask KilledWorker error and was hoping someone might be able to point me to a solution or give me some pointers on how to debug this. I can start dask-scheduler and 2 dask-workers to successfully run the "Deployment: Dask" example. https://docs.prefect.io/core/tutorials/dask-cluster.html I can run the "Advanced Features" example through the DaskExecutor section where the DaskExecutor starts up the dask-cluster and process the URLs in 20% of the single threaded mode. https://docs.prefect.io/core/tutorials/advanced-mapping.html#outline If I add the scheduler address to the executor parameters
    executor = DaskExecutor(address="<tcp://192.168.86.23:8786>")
    flow.run(executor=executor)
    I get the KilledWorker error. ERROR - prefect.Flow: x-files | Unexpected error occured in FlowRunner: KilledWorker('retrieve_url-a032a3d8-fa89-4131-ad4c-4f3c5fbd282c', <Worker 'tcp://192.168.86.23:58923', name: tcp://192.168.86.23:58923, memory: 0, processing: 1>) By commenting things out I'm down to the offending line being
    html = requests.get(url)
    in the retrieve_url function. Any thoughts?
    j
    • 2
    • 1
  • n

    Nate Atkins

    02/07/2020, 7:59 PM
    This runs with line 8 commented out, but raises the KilledWorker when it is run.
    Minimum code to reproduce error
    j
    c
    m
    • 4
    • 7
  • j

    jorwoods

    02/07/2020, 9:13 PM
    I am interested in using Prefect core, but have a few questions about it: 1. Do I need to start each flow via
    python flow_x.py
    ? 2. If I update the flow, I then need to stop the existing process running said flow, and restart it with the updated version. Is that correct? Is there a way to refresh this automatically? (Seems like this is part of prefect cloud, but wanted to check) 3. What if I have a branch in a flow where several mapped tasks share some common dependency that has costly start-up/tear-down? (E.G. Several tasks in the flow can run, then start a shared EMR cluster, use the EMR cluster, then when all are complete, tear the cluster down)
    c
    • 2
    • 1
  • p

    Preston Marshall

    02/07/2020, 11:14 PM
    So I'm using the DaskExecutor with a task that has a map() over a list of files. I would expect this to be done in parallel but it runs serially. How do I allow a flow to run in parallel?
    c
    • 2
    • 32
  • p

    Preston Marshall

    02/08/2020, 1:30 AM
    Any reason not to spin up a whole dask cluster per flow you're running?
    c
    j
    j
    • 4
    • 6
  • p

    Preston Marshall

    02/08/2020, 1:31 AM
    Also can I have flows within flows
    c
    • 2
    • 2
  • i

    itay livni

    02/08/2020, 4:35 PM
    Hi - When I use the DaskExecutor without a host address the program compiles. When I spin up a scheduler and workers ahead of time and pass it to the DaskExecutor, I get two errors.
    Killworker
    and
    dask-worker: error: unrecognized arguments: <tcp://127.0.0.5:8786>
    with the second error always being any arguments I give the scheduler. Any thoughts on how to profile the DaskExecutor?
    c
    • 2
    • 16
Powered by Linen
Title
i

itay livni

02/08/2020, 4:35 PM
Hi - When I use the DaskExecutor without a host address the program compiles. When I spin up a scheduler and workers ahead of time and pass it to the DaskExecutor, I get two errors.
Killworker
and
dask-worker: error: unrecognized arguments: <tcp://127.0.0.5:8786>
with the second error always being any arguments I give the scheduler. Any thoughts on how to profile the DaskExecutor?
c

Chris White

02/08/2020, 4:36 PM
Hey itay - could you share the code you use to initialize the dask executor?
i

itay livni

02/08/2020, 4:38 PM
@Chris White
executor = DaskExecutor(address="<tcp://127.0.0.5:8786>", debug=True)
or debug=False
if make_nodes_executor=='dask':
    from prefect.engine.executors import DaskExecutor
    #client = Client()
    executor = DaskExecutor(address="<tcp://127.0.0.5:8786>", debug=True) # Does not work
else:# make_nodes_executor=="local":
    from prefect.engine.executors import LocalExecutor
    executor = LocalExecutor()
c

Chris White

02/08/2020, 4:47 PM
Hm that code looks fine - how are you creating your dask scheduler / workers?
i

itay livni

02/08/2020, 5:21 PM
The context to this is: There is a
Flow
within a
LOOP
task
. That
task
is triggered by another
flow
run locally.
c

Chris White

02/08/2020, 5:25 PM
Hmmm I’ve never seen this before but it could be caused by running flows within tasks especially if both flows use a dask executor
i

itay livni

02/08/2020, 5:27 PM
Actual code looks like this
Untitled
c

Chris White

02/08/2020, 5:29 PM
Yea I think your problem is running a flow within a task here
i

itay livni

02/08/2020, 5:33 PM
That is what I was thinking.... If that is the case it's good enough for now, meaning the program is being parallelized where it's largest choke point is. Is there a way to get the IP of the dask executor without specifying it from prefect?
I am going to try it as a local agent in the clod and try to get insight
c

Chris White

02/08/2020, 5:35 PM
Yea you should be able to a access a dask worker client since this task will be running on a worker but this is intended for submitting single pieces of work, not entire flows
i

itay livni

02/08/2020, 5:39 PM
Is dask the right tool for this or should I use
multiprocessing
or some other library/method?
inside the LOOP
@Chris White I can safely say prefect does not like how
generate_knowledge_graph
is put together. It works but.. running it in the cloud or using storage does not. Basically anything that has a
cloudpickle
fails. I need to learn how to chain and update flows.
c

Chris White

02/08/2020, 7:46 PM
Both would be appropriate I think you just will need to access their apis directly and not through prefect
i

itay livni

02/08/2020, 9:30 PM
@Chris White Thank you
👍 1
View count: 1