• Aiden Price

    Aiden Price

    3 years ago
    Hi folks, what is the best way to reference and mutate a variable between flow.run()s? Should it be a
    Parameter
    ? My actual use case is to have a dict which is a copy of one of my database tables which the incoming data needs to refer to to find its foreign key each time. If I find a new name that I don’t have in my dict I’ll need to update the table in the database and mutate my dictionary, then reference the mutated version in subsequent flow.run()s. I’m only new to Prefect but I have to say I love your work, thank you!
    Aiden Price
    Jeremiah
    +1
    12 replies
    Copy to Clipboard
  • j

    Jason

    3 years ago
    Hi, are there ways to invoke a non-Python step in Prefect?
    j
    j
    2 replies
    Copy to Clipboard
  • j

    Jason

    3 years ago
    Would the Docker task be the one to use?
  • g

    Gary Liao

    3 years ago

    It seems that Prefect doesn't support multiple return, or some Python syntax?

    @task def hello(x): a=1 b=2 return a, b with Flow('test flow') as flow: a, b = hello(1) #<---- 'NoneType' object is not callable hello(a) hello(b)______________________________
    g
    Jeremiah
    +2
    16 replies
    Copy to Clipboard
  • d

    David Ojeda

    3 years ago
    Hi everyone, quick question: I am building my doc with Sphinx and wanted to refer to the Prefect docs with intersphinx. This is relatively easy, as long as there is a public
    objects.inv
    but I cannot find it for Prefect… Is this supported ?
    d
    Chris White
    6 replies
    Copy to Clipboard
  • g

    Gary Liao

    3 years ago
    A Flow's parameters need to be passed to at least one task's run() function, or error would be raised, right?__________________________________________________ from prefect import task, Flow, Parameter from prefect.core.task import Task class print_plus_one(Task): def init(self, x): super().init() self.x = x def run(self): print(self.x + 1) with Flow('Parameterized Flow') as flow: x = Parameter('x') print_plus_one(x)() flow.run(parameters=dict(x=1)) # prints 2 flow.run(parameters=dict(x=100)) # prints 101__________________________________________________ ValueError: Flow.run received the following unexpected parameters: x
    g
    Chris White
    8 replies
    Copy to Clipboard
  • j

    Joe Schmid

    3 years ago
    This isn't 100% Prefect related but thought it would be relevant for some users. We have a need for different types of Dask workers, e.g. high memory, GPU, etc. This would work really well with Prefect's support for task tagging to specify Dask worker resources: https://docs.prefect.io/api/unreleased/engine/executors.html#daskexecutor We use dask-kubernetes (on AWS EKS) which currently supports just a single k8s pod spec for all Dask workers. I'm about to start on a potential PR in dask-kubernetes that would allow for different worker types, but only in non-adaptive mode. Before I dive in, I thought I'd ask if anyone knows of existing work in this area of creating and scaling different Dask worker types, especially on k8s. I see this old issue: https://github.com/dask/distributed/issues/2118 but haven't found much else. (I've also asked on the Dask users gitter channel.)
    j
    j
    2 replies
    Copy to Clipboard
  • Aiden Price

    Aiden Price

    3 years ago
    Quick question, can you
    task.map()
    over a Pandas DataFrame? So you could get each row as a Series for each task call. Or does it only work with generic iterables? Thank you.
    Aiden Price
    Chris White
    3 replies
    Copy to Clipboard
  • t

    Tobias Schmidt

    2 years ago
    Can't quite figure this out from the docs: is there a way to limit the number of tasks in a map that are run in parallel?
    t
    j
    +1
    4 replies
    Copy to Clipboard
  • c

    Christopher Stokes

    2 years ago
    Seeing something a bit odd - unsure if I'm using the library wrong. When using
    with raise_on_exception():
       state = flow.run(parameters)
    I get raised exceptions on conditional misses - like
    switch
    statements missing. The
    raise signals.SKIP('Provided value "{}" did not match "{}"'.format(value, self.value)
    call ends up firing and stopping execution