https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Martim Lobao

    04/25/2022, 2:58 PM
    is there a way to register different flows so that they are run with different environment variables? something like
    Flow.register("foo", extra_env_variables={"ENV": "foo"})
    Flow.register("bar", extra_env_variables={"ENV": "bar"})
    k
    • 2
    • 2
  • x

    Xavier Babu

    04/25/2022, 3:44 PM
    For some reason, from today onwards in our Orion server, every new execute is going as scheduled by default, not running the flow immediately. Please shed some light.
    :discourse: 1
    k
    a
    • 3
    • 37
  • m

    Matthew Roeschke

    04/25/2022, 4:46 PM
    Is this heartbeat configuration only applicable to Prefect cloud setups? https://docs.prefect.io/orchestration/concepts/services.html#heartbeat-configuration I have a (expected) long running job on a Prefect server setup trying to not have the zombie killer terminate the job:
    No heartbeat detected from the remote task; marking the run as failed.
    k
    • 2
    • 15
  • s

    Slackbot

    04/25/2022, 6:48 PM
    This message was deleted.
    a
    k
    • 3
    • 5
  • a

    Andrey Tatarinov

    04/25/2022, 8:54 PM
    Hi, we're looking into ways to reduce latency between Flow Run submission and getting results back. At the moment we're using KubernetesAgent which spawns k8s Jobs for each flow run. Job initialization is quite slow in our case due to image size. Question: if we setup permanent Dask cluster with appropriate image and set executor to DaskExecutor - would we be able to skip Job initialization step? i.e. is it Agent who sends specific commands to Dask cluster or k8s Job cannot be avoided?
    k
    m
    • 3
    • 14
  • h

    Hafsa Junaid

    04/26/2022, 2:24 AM
    Hey team! I am trying to deploy prefect-server on gke At this point--> vim ~/.prefect/config.toml I entered apollo IP and pressing esc-> :wq or esc-->:x ... Then I am getting the following message. Any guide on how to deal with it?
    k
    • 2
    • 4
  • h

    Hafsa Junaid

    04/26/2022, 6:03 AM
    Hey.. I am registering my task on prefect-server to demo project. The task has spark and sparks context. The flow.run() is successful but flow.register() gives following error RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. Any guide on this spark context setting.
    r
    a
    • 3
    • 2
  • d

    Dave

    04/26/2022, 7:30 AM
    Hello, Have anyone experienced problems when using:
    prefect.engine.signals.LOOP
    , where the task is suddenly is missing the context on retries. e.g:
    index = prefect.context.get('task_loop_result', {}).get('index', 0)
    See more code in the thread đź§µ
    a
    • 2
    • 6
  • f

    Faheem Khan

    04/26/2022, 7:48 AM
    Hi, I'm running local prefect server in docker, and to register my flow I run a docker client. I am using this method(https://github.com/flavienbwk/prefect-docker-compose). I can see the code run successifully in docker container client and flow is registered but when I rerun the flow from UI I getting no module Pandas found. I have Pandas installed on host and also in the client container, my question is where else should the module be installed for the flow to run successfully from the UI?
    a
    k
    • 3
    • 3
  • r

    Rhys Mansal

    04/26/2022, 9:22 AM
    Hi all, I'm having difficulty with the following code.
    param_a = flow.add_task(Parameter("param_a", default=True))
    with case(param_a, False):
        task_a = some_task(arg)
    task_b = some_task(task_a)
    some_task
    is being run multiple times with different arguments. When
    param_a
    is false some of these should be skipped. If I do not use
    @task(skip_on_upstream_skip=False)
    in the task decorator on
    some_task
    then all downstream tasks from the first skipped task are skipped, whether they are inside a case or not. If I do, none of them are skipped regardless of what
    param_a
    is set to. Does anyone have any idea how to get only the tasks inside the with block to skip (and only when
    param_a
    is true)?
    a
    • 2
    • 2
  • f

    Florian Guily

    04/26/2022, 10:20 AM
    Hey guys, i have a quick question on good practice. How scalable is the mapping task ? are the children tasks "queued" if the map is pretty big ?
    e
    • 2
    • 11
  • x

    xyzz

    04/26/2022, 11:13 AM
    Regarding storage the Orion docs say "you can also configure a self-hosted key-value store for testing.".
    But what kind of key-value store is compatible and how to set it up for testing purposes?
  • x

    xyzz

    04/26/2022, 11:17 AM
    I'd like to test orion cloud together with kubernetes running on my laptop via docker
  • x

    xyzz

    04/26/2022, 11:20 AM
    would redis (https://hub.docker.com/r/ubuntu/redis) be compatible?
    a
    d
    • 3
    • 18
  • m

    Malthe Karbo

    04/26/2022, 12:41 PM
    Hi, when trying to create a deployment to orion cloud, I get "422 unknown" exceptions being thrown
    a
    • 2
    • 3
  • n

    Nikhil Joseph

    04/26/2022, 1:15 PM
    Is it possible to stop a task map if any of the calls fail?
    a
    k
    • 3
    • 7
  • m

    Marwan Sarieddine

    04/26/2022, 1:28 PM
    Hi folks, we have run into this error message:
    {'_schema': 'Invalid data type: None'}
    twice over the last week over our many flow runs. It seems other folks have encountered this due to a version mismatch between the agent and their execution environment. However that is not the case for us - additionally the same flow run will proceed to run successfully for future runs without any changes from our end. See more details in the thread
    k
    m
    • 3
    • 19
  • l

    Leon Kozlowski

    04/26/2022, 1:44 PM
    Hi all - I’m seeing some flows being executed past their scheduled start time by roughly 15 minutes, when I check logs I see some Lazarus logs:
    Rescheduled by a Lazarus process. This is attempt 1.
    What would be the best way to me to root cause this issue?
    k
    • 2
    • 1
  • a

    Andrey Tatarinov

    04/26/2022, 2:01 PM
    Question about the future of Prefect. I did not find references to Dask in Orion flow runners documentation. Does it mean that Dask cluster executor will be obsolete in the Orion?
    âś… 1
    k
    • 2
    • 3
  • s

    Shuchita Tripathi

    04/26/2022, 2:28 PM
    Hi, I have a scenario where I need to modify the output of one task and pass it to the other. Is modification of task output and then passing possible? I am asking this because defining the tasks within flow does not execute them. So the output is not available. eg (is something like below possible): with Flow("flow") as f: abc = task1() # some modification required for 'abc' (return value of task1()). Modification like extracting a part of string from a longer string. modified_abc = # "string extraction from abc" def = task1(modified_abc)
    k
    • 2
    • 1
  • j

    Jason

    04/26/2022, 2:29 PM
    I'm having trouble locating the local_script_path:
    return os.stat(filename).st_size
          FileNotFoundError: [Errno 2] No such file or directory: 'hello-flow.py'
    The S3 storage class is configured in a shared module between flows as such:
    storage = S3(
                    bucket="EDITED-prod-platform-prefect",
                    key=f"{project}/flows/{flow_name}.py",
                    stored_as_script=True,
                    local_script_path=f"projects/Examples/flows/{flow_name}.py",
                )
    k
    • 2
    • 7
  • b

    Bruno Murino

    04/26/2022, 2:33 PM
    Hi everyone — I’m trying to use the Prefect API to “clone” a flow and give it a different name, does anyone know if that’s possible at all ?
    âś… 1
    k
    • 2
    • 3
  • m

    Milton

    04/26/2022, 3:02 PM
    hi how far back can one go in the audit logs?
    k
    • 2
    • 8
  • b

    Bruno Murino

    04/26/2022, 4:09 PM
    Hi everyone — are flow schedules preserved across flow versions ?
    k
    • 2
    • 6
  • l

    Lukáš Pravda

    04/26/2022, 4:19 PM
    Hello community, I’d like to test a rather simple scenario whether a task defined in a flow was run, but I dont seem to understand how to mock it properly. In simplicity my setup is similar to: file.py
    @task
    def foo():
        ....
    
    @task
    def bar():
        ....
    
    with Flow() as flow:
       try:
           foo()
       except:
           bar()
    test.py
    from file import flow
    from unittest.mock import patch
    
    @patch("file1.bar")
    @patch("file.foo")
    def test_flow(mock1,  mock2):
        mock.side_effect = Exception("throw an error")
    
       flow.run()
    
       assert mock2.assert_called_once()
    but the mock is never called, have found this:
    <https://github.com/PrefectHQ/prefect/issues/1801>
    , but could not really mount that solution to my exact problem. What am I missing? Thank you
    k
    • 2
    • 3
  • t

    Tom Manterfield

    04/26/2022, 4:37 PM
    On the Orion beta, I can’t see any route to adding GCP credentials to a
    KubernetesFlowRunner
    instance other than building the keys into the image itself. Has anyone else found a solution for this?
    m
    • 2
    • 7
  • x

    Xavier Babu

    04/26/2022, 4:44 PM
    We are not using any deployment container like Docker. We use the storage just mounted in the Linux Server. How can we create a local storage for prefect instead of using /tmp. When I run the the command "prefect storage create" I don't get any list to choose for storage type and whatever value I put, it says "invalid selection". Please help. Found the following storage types: Select a storage type to create: 4 Invalid selection 4
    k
    a
    • 3
    • 104
  • k

    Kathryn Klarich

    04/26/2022, 5:14 PM
    Hi all, I’m not sure where the best place to post this is (here versus discourse), but I am running into an error when trying to register a prefect flow:
    AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK'
    - it seems to be happening during this step
    RUN pip install pip --upgrade
    - has anyone come across this before and know how to fix it? I was able to successfully register this flow a few days ago and haven’t changed much in the requirements since then.
    :discourse: 1
    k
    a
    • 3
    • 27
  • b

    Bradley Hurley

    04/26/2022, 5:21 PM
    Hi Prefect Experts - Is it possible if I have a flow run thats in a failed/cancelled state, to clear the state of a task thats already succeeded so that when the flow is restarted the previously successful task will re-execute? I know you can do that sort of thing with Airflow, but wasn't able to replicate with Prefect.
    k
    s
    • 3
    • 13
  • j

    Jai P

    04/26/2022, 5:39 PM
    hey all! question about the eventual introduction of the
    case
    statement in prefect 2.0: is there a rough timeline for when that may be introduced? Also, are there any major differences that are planned between how they work in prefect 1.0, where i think you can only conditionally go between tasks (to, say, possibly supporting subflows)?
    a
    k
    • 3
    • 17
Powered by Linen
Title
j

Jai P

04/26/2022, 5:39 PM
hey all! question about the eventual introduction of the
case
statement in prefect 2.0: is there a rough timeline for when that may be introduced? Also, are there any major differences that are planned between how they work in prefect 1.0, where i think you can only conditionally go between tasks (to, say, possibly supporting subflows)?
a

Anna Geller

04/26/2022, 5:41 PM
there is no need for that - you can run any if/else statements both in your tasks and in your flows in Prefect 2.0 🎉
k

Kevin Kho

04/26/2022, 5:41 PM
You don’t necessarily need case because you can use the native Python
if
inside a flow now.
@flow
def myflow():
    a = task_one()
    if a.result() == ...
        ....
j

Jai P

04/26/2022, 5:44 PM
ah yes! i saw that was possible, but i could've sworn in the documentation it said
case
was coming, and i wasn't sure in what case i'd use that over just native
if
but now i can't find said quote in the documentation..
k

Kevin Kho

04/26/2022, 5:47 PM
I don’t quite see a use case for
case
because
if
is more flexible. The previous
case
only tested for equality. Not greater than or less than so you needed an intermediate task to achieve that.
j

Jai P

04/26/2022, 8:04 PM
hmmm, ok! i mean i like using
if
statements more, but im curious if that has the same implication that it did in prefect 1.0 (meaning tasks/subflows not run due to an
if
are marked as
Skipped
)
but maybe that's not the intended experience, and the tasks aren't really "Skipped", they're not even considered as part of the execution path so they're never included
k

Kevin Kho

04/26/2022, 8:20 PM
Yeah exactly so they aren’t “Skipped” cuz they aren’t added
j

Jai P

04/26/2022, 9:49 PM
got it! so i guess the one interesting thing is...because now you're essentially forced to call
.result()
, does that possibly introduce some weirdness in execution/dependencies?
as an example:
@flow
def my_flow():
    a = task_one()
    if a.result():
        task_two()
    else:
        task_three()
i won't necessarily see that
task_one
was `wait_for`d on
task_two
or
task_three
k

Kevin Kho

04/26/2022, 9:54 PM
You can still use
wait_for
in
task_two
and
task_three
but they will wait for
a
by default.
.result()
is actually
.wait().result()
implicitly
j

Jai P

04/26/2022, 9:54 PM
yup! i guess it was just pointing out that
orion
wouldn't necessarily reflect the waiting
so as an engineer if you want to ensure that the dependency properly shows up in orion, you need the
wait_for
. if you don't care about that particular thing...then it is fine because it implicitly happens anyways
k

Kevin Kho

04/26/2022, 9:56 PM
Yes that is completely right. Just wondering, how important is it for you to have all of those connected in the Radar plot?
j

Jai P

04/26/2022, 10:03 PM
I don't actually know that it's that important for us yet, though i can imagine situations where someone wants to understand the flow execution visually, but can't because everything is laid out on the radar view at the same level
But given there's a simple workaround that isn't that code intensive, I think we can work around that
k

Kevin Kho

04/26/2022, 10:06 PM
That’s true but I noticed this as well when doing something like A -> B -> C and all are mapped and then some number of A raise SKIP. Lineage gets broken. But we will explicit create a SKIP state though in the future that connects it
👍 1
View count: 5