https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marvin

    09/13/2020, 9:31 PM
    It gives me a headache just trying to think down to your level. I'll still enter you in the contest, @Francesco Bartoli.
  • c

    CA Lee

    09/14/2020, 3:34 AM
    🤔 @Marvin
  • m

    Marvin

    09/14/2020, 3:34 AM
    I would like to say that it is a very great pleasure, honour and privilege for me to enter @CA Lee in this contest, but I can’t because my lying circuits are all out of commission.
  • r

    Rizky Eko Putra

    09/14/2020, 6:39 AM
    Hello, Is there any possibility to register a flow via HTTP? If yes, any docs about it?
    j
    • 2
    • 1
  • m

    Matias Godoy

    09/14/2020, 7:01 AM
    Hey @Marvin! The prize is mine!
  • m

    Marvin

    09/14/2020, 7:01 AM
    Don’t pretend you want to talk to me, I know you hate me. I'll still enter you in the contest, @Matias Godoy.
  • v

    Vitaly Shulgin

    09/14/2020, 7:07 AM
    Hello team, How to prevent execution of flows which were not run at desired time? It does not matter for me, why it was not run (actually, k8s was down), I don't need these scheduled, but missed runs, to be executed. I'm looking at schedule filters, but it is not clear for me, when filters are called? Before schedule? Before run? If not schedule filter, what should I use?
    k
    • 2
    • 1
  • j

    Joe Lyman

    09/14/2020, 8:28 AM
    @Marvin
  • m

    Marvin

    09/14/2020, 8:28 AM
    This contest is the sort of thing you lifeforms enjoy, is it? I've entered you to win, @Joe Lyman.
  • d

    Daniel Manson

    09/14/2020, 9:13 AM
    i’m not a robot, but i believe you are @Marvin
    p
    m
    • 3
    • 2
  • m

    Marvin

    09/14/2020, 9:13 AM
    It gives me a headache just trying to think down to your level. I'll still enter you in the contest, @Daniel Manson.
  • r

    Richard Hughes

    09/14/2020, 11:56 AM
    @Marvin what is the content?
  • m

    Marvin

    09/14/2020, 11:56 AM
    It gives me a headache just trying to think down to your level. I'll still enter you in the contest, @Richard Hughes.
  • s

    Shaun Cutts

    09/14/2020, 1:23 PM
    I want to create a flow that abstracts away from where its inputs come from, so that I can run it with inputs from different sources: for instance, a string, or an s3 object. To do this I thought of creating a flow, and adding some dummy tasks, then running the flow with a flow runner, and starting with an initial state of “Success” for these tasks and using results that wrap the inputs. For a string this seems straightforward enough. For an object in S3, loaded lazily, I can subclass Result, and specify
    value
    as a property which loads and caches the object when called. However, the first thing I thought of is that I should use an
    S3Result
    . But this doesn’t seem to be the right sort of object, as it just takes a bucket, not a key. It seems more of a “result storage location” than a
    Result
    to me. I’m wondering why it has an
    isa
    relation to
    Result
    . Maybe I’m not understanding the object hierarchy properly, and/or I should be trying to achieve my goal differently? [Update had overlooked that base
    Result
    takes
    location
    . Still
    S3Result
    has no ability to load results on demand, except via
    read
    , which takes a new location.] A related question is how I should trigger an already-created flow, that lives on the prefect server, with a new runner?
    Client.create_flow_run
    allows me to create a new run, but not a dictionary of initial states. It would seem that I am abusing flows a little… Perhaps I need to create initial tasks that take a serialized specification for where their “real” inputs live, that can be passed in parameters. Thinking about it, this seems more the intended way I should be doing things, but it also feels that I am duplicating what Prefect is doing somehow. Guidance on what the best practice should be would be greatly appreciated.
    k
    • 2
    • 2
  • m

    Marko Herkaliuk

    09/14/2020, 2:35 PM
    Life! Don't talk to me about life! @Marvin
  • m

    Marvin

    09/14/2020, 2:35 PM
    Why should I want to make anything up? The contest is bad enough as it is without wanting to invent any more of it. I'll still enter you in it, @Marko Herkaliuk
  • m

    Manuel Mourato

    09/14/2020, 2:58 PM
    Hello guys I have a scenario that fails with
    Could not infer an active Flow context.
    1) I have task1, which outputs a result that is a list , say : ["a","z"]
    2) Now, I have a task2, that depends on the result of task1, but should only receive the first element of the result list as an input
    
    Example:
    @task
    def replace_str(x):
        return x.replace("a","A")
    
    @task
    def create_str_list():
        return ["a","b","c"]
    flow= Flow("test")
    flow.set_dependencies(create_str_list)
    
    --> THIS IS WHAT I WANT TO DO
    flow.set_dependencies(replace_str, keyword_tasks={"x": create_str_list[0]})
    Is this possible? Thank you
    c
    • 2
    • 1
  • m

    Maxwell Dylla

    09/14/2020, 6:33 PM
    @Marvin
  • m

    Marvin

    09/14/2020, 6:33 PM
    This contest is the sort of thing you lifeforms enjoy, is it? I've entered you to win, @Maxwell Dylla.
  • t

    Tom Augspurger

    09/14/2020, 8:02 PM
    Hi all, I'm hitting an issue where manually running a flow is timing out before it starts. I'm using the prefect-cloud backend.
    $ prefect run flow --name=terraclimate --project=pangeo-forge
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/cli/run.py", line 125, in flow
        no_url=no_url,
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/cli/run.py", line 377, in _run_flow
        flow_id=flow_id, parameters={**file_params, **string_params}, run_name=run_name
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/client/client.py", line 964, in create_flow_run
        res = self.graphql(create_mutation, variables=dict(input=inputs))
      File "/Users/taugspurger/miniconda3/envs/pangeo-forge-37/lib/python3.7/site-packages/prefect/client/client.py", line 294, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['create_flow_run'], 'message': 'Operation timed out', 'extensions': {'code': 'API_ERROR'}}]
    https://prefect-community.slack.com/archives/CL09KU1K7/p1596043664390000 looks similar, and seemed to resolve itself. Any debugging suggestions?
    đź‘€ 2
    k
    • 2
    • 11
  • c

    Chandra Manginipalli

    09/14/2020, 8:10 PM
    Hi, we have a couple requirements we are evaluating Prefect for, since airflow seems a little stringent. Anyone happens to know the answers to these? I cant find answers online. 1. If I am running the same flow multiple times in parallel, one for each file, can I pause/resume one of the flow runs (or tasks under it) without impacting the other flow runs for the same flow? 2. if I batch the rows in a file, can I run the same task in iteration for each batch in the file in a sequence? (I saw LOOP signal online but dont need recursion) and pausing this task will pause the subsequent iterations? 3. Is there a REST API or CLI that we could issue a flow run or pause/resume from? airflow has REST API and CLI that allows us to pause a dag or run tasks. Wondering if something similar exists with prefect.
    n
    k
    • 3
    • 10
  • s

    sark

    09/15/2020, 3:29 AM
    hi guys i am getting
    Failed to load and execute Flow's environment: ValueError('unsupported pickle protocol: 5',)
    prefect and cloudpickle versions of local and remote are the same
    $ pipenv graph
    ...
    prefect==0.13.6
      - click [required: >=7.0, installed: 7.1.2]
      - cloudpickle [required: >=0.6.0, installed: 1.6.0]
    ...
    (prefect-env) [prefect@ghpr-dev-prefect-usc1-a-1 ~]$ prefect version
    0.13.6
    (prefect-env) [prefect@ghpr-dev-prefect-usc1-a-1 ~]$ pip show cloudpickle
    Name: cloudpickle
    Version: 1.6.0
    Summary: Extended pickling support for Python objects
    Home-page: <https://github.com/cloudpipe/cloudpickle>
    Author: Cloudpipe
    Author-email: <mailto:cloudpipe@googlegroups.com|cloudpipe@googlegroups.com>
    License: BSD 3-Clause License
    Location: /opt/prefect/prefect-env/lib/python3.6/site-packages
    Requires:
    c
    m
    • 3
    • 6
  • e

    Erick Rodriguez

    09/15/2020, 3:49 AM
    Hi @Marvin I would love to get any of those prizes! :D
  • m

    Marvin

    09/15/2020, 3:49 AM
    This contest is the sort of thing you lifeforms enjoy, is it? I've entered you to win, @Erick Rodriguez.
  • s

    sark

    09/15/2020, 3:58 AM
    so i got it to the serialisation to work with the LocalAgent but with a DockerAgent it gives
    Failed to load and execute Flow's environment: TypeError('an integer is required (got type bytes)')
    c
    • 2
    • 15
  • s

    sark

    09/15/2020, 4:25 AM
    got a non-docker task to work with docker agent, but now a docker task gives
    Unexpected error: DockerException("Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 670, in urlopen
        httplib_response = self._make_request(
      File "/usr/local/lib/python3.8/site-packages/urllib3/connectionpool.py", line 392, in _make_request
        conn.request(method, url, **httplib_request_kw)
      File "/usr/local/lib/python3.8/http/client.py", line 1255, in request
        self._send_request(method, url, body, headers, encode_chunked)
      File "/usr/local/lib/python3.8/http/client.py", line 1301, in _send_request
        self.endheaders(body, encode_chunked=encode_chunked)
      File "/usr/local/lib/python3.8/http/client.py", line 1250, in endheaders
        self._send_output(message_body, encode_chunked=encode_chunked)
      File "/usr/local/lib/python3.8/http/client.py", line 1010, in _send_output
        self.send(msg)
      File "/usr/local/lib/python3.8/http/client.py", line 950, in send
        self.connect()
      File "/usr/local/lib/python3.8/site-packages/docker/transport/unixconn.py", line 43, in connect
        sock.connect(self.unix_socket)
    FileNotFoundError: [Errno 2] No such file or directory
    i’m using GCS storage with LocalEnvironment
    n
    • 2
    • 9
  • e

    Eugene Smal

    09/15/2020, 7:04 AM
    Show me the data @Marvin
  • m

    Marvin

    09/15/2020, 7:04 AM
    I think you ought to know I’m feeling very depressed. Also, you're in the contest, @Eugene Smal.
  • s

    sark

    09/15/2020, 7:15 AM
    hi guys when i do
    def container(p):
        return CreateContainer(image_name="image", command=f"--p={p}")
    
    start = StartContainer()
    
    with Flow("flow") as flow:
        p = Parameter('param', required=True)
    
        c = container(p)
        start_container = start(container_id=c)
    i get
    <Parameter: p>
    interpolated for
    p
    instead of the actual value for p but if i make
    container
    a
    @task
    it also gives weird errors i think because
    CreateContainer
    is itself already a task?
    • 1
    • 7
  • r

    Rob Fowler

    09/15/2020, 8:41 AM
    just looking at
    unmapped
    for the constant parameters, MIND BLOWN!
    🚀 2
Powered by Linen
Title
r

Rob Fowler

09/15/2020, 8:41 AM
just looking at
unmapped
for the constant parameters, MIND BLOWN!
🚀 2
View count: 1