https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Marius Haberstock

    03/08/2022, 3:18 PM
    Hi, I have a question about
    apply_map
    . Details following in the thread 🙂
    :discourse: 1
    k
    a
    15 replies · 3 participants
  • m

    Max Kolasinski

    03/08/2022, 3:49 PM
    I’m trying to write a flow that creates a number of sub-flows and running into some issues getting Prefect to register it- would appreciate some guidance (example in thread)
    :discourse: 1
    k
    3 replies · 2 participants
  • r

    Roger Webb

    03/08/2022, 3:52 PM
    Hey All, Im wondering the best way to make a flow force a failure. For instance, if you have a task that fails, and a task that runs a trigger for "any failed" toward the end of your flow... the flow succeeds if the "onfailure" task succeeds. However I want the OnFailure task to do what it needs to .. then make the entire flow fail.
    :discourse: 1
    k
    4 replies · 2 participants
  • a

    Adam Roderick

    03/08/2022, 3:59 PM
    Have you seen this issue when launching jobs from the ECS Agent? I'm troubleshooting and not sure where to start with it. botocore doesn't like something about the parameters. Prefect version is 1.0
    k
    11 replies · 2 participants
  • b

    Bruno Nunes

    03/08/2022, 4:21 PM
    Hello, Do you know when is the prefect 2.0 expected to be released for production use?
    k
    5 replies · 2 participants
  • b

    ben

    03/08/2022, 5:14 PM
    Does there happen to be any kind of white paper, or similar, available related to Prefect Cloud Secrets management? (I know that it uses Vault on the back-end, but our security folks tend to like more detail than that). Thanks!
    k
    3 replies · 2 participants
  • m

    Martha Edwards

    03/08/2022, 6:01 PM
    Hi! Is it possible in Prefect to create a sub-flow that works as follows: • task A runs and may fail. Retry A until success. • task B runs and may fail (or the task itself succeeds, but it returns a result indicating failure of the sub-flow). In that case, loop back to retry A. This seems like it could be possible to do using looping on a nested flow as described in this stackoverflow post.
    :discourse: 1
    k
    11 replies · 2 participants
  • m

    Martha Edwards

    03/08/2022, 6:16 PM
    Another question, is it possible to have an already-running Flow Run update when the flow itself gets updated? Suppose a task is added, or modified, or removed? Or to cancel a Flow Run and then have it restart from cancelled, but the new run would include the recent updates to the flow?
    :discourse: 1
    k
    6 replies · 2 participants
  • r

    Roger Webb

    03/08/2022, 9:42 PM
    Im setting up an automation for "When any run from <MyFlowName> changes to Failed, then <<My Action>>". But when I save the automation... it changes to "When any run from <MyFlowName> changes to any of these states, then <<My Action>>". is this an issue with the GUI.. or is my automation not saving the correct values in the wizard?
    j
    3 replies · 2 participants
  • m

    Moises Vera

    03/09/2022, 2:23 AM
    I'm experiencing a weird issue when I register my flows • I had only one flow with this dir structure ◦
    extract_data.py
    flow file (config for the flow) ▪︎ here I import the tasks directory with a simple
    import tasks
    ◦
    tasks
    at the same level, a directory with my tasks ◦ When I register this flow it works correctly in schedule basis • I just added a new flow
    calculate_something.py
    • The tasks for this flow are in the
    tasks
    directory too • now when I want to run this new flow I get
    FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'tasks\'")
    What I don't get is... Why is it working for the first flow and not for this new one? Any ideas? I appreciate it
    k
    a
    6 replies · 3 participants
  • s

    Suresh R

    03/09/2022, 8:00 AM
    Hi, I have a flow like this A -> List -> Dict -> B, Result of A is persisted in S3, Whenever B fails and i try to restart the flow Dict task supplies empty data to B since its result is not persisted. Dict is an internal task which we don’t have any control, How we can overcome this?
    s
    a
    5 replies · 3 participants
  • t

    Toby Rahloff

    03/09/2022, 8:21 AM
    Hi, we are currently evaluation Prefect Orion (and we are pretty amazed to be honest). One challenge I cannot wrap my had around at the moment is manual interaction (e.g., "wait for approval"). We are crunching large visual data sets that often have unexpected features. Because of this, we need some kind of manual sanity check by a human expert before we start the compute/time intensive tasks. In Prefect 1.0 this was possible via the
    manual_only
    trigger. Is it possible to do the same with Orion (in the future)?
    a
    4 replies · 2 participants
  • b

    Bruno Nunes

    03/09/2022, 8:46 AM
    Hello, I would like to upgrade my prefect server in my k8s cluster to prefect orion. Is there any guidelines on how to do this?
    a
    m
    5 replies · 3 participants
  • t

    Tomer Cagan

    03/09/2022, 10:16 AM
    Hello, I am a bit confused about how task results are stored when I use kubernetes. Can I define a mount to network storage (in the job template) and then use local storage so it is saved there? Would server be able to read it (assuming is also mounted on the same network storage)? Alternatively, we have sonatype nexus repository - can I create a result type based on that (implemented the interface?) If I do so, how can I ensure this code is available to the system (task / server)
    a
    5 replies · 2 participants
  • a

    Anna Geller

    03/09/2022, 10:51 AM
    FYI: if you face some issues with Slack today, you can post your question on Discourse or GitHub. 😒lack: status is all green but e.g. search doesn't seem to work today.
    1 reply · 1 participant
  • v

    Vadym Dytyniak

    03/09/2022, 11:28 AM
    Hi. I am trying to add checkpointing in my flows to checkpoint Dask dataframe that I pass between tasks: Tasks:
    @task(result=S3Result(bucket='bucket-name'))
    def checkpoint_data() -> dd.DataFrame:
        df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
        ddf = dd.from_pandas(df, npartitions=1)
    
        return ddf
    
    
    @task()
    def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
        raise ValueError("Checkpoint testing...")
    Flow:
    ddf = checkpoint_data()
    accept_checkpointed_data(ddf)
    How to be sure that on restart after failure(ValueError I added) accept_checkpointed_data task loads data from S3 and not using cached_inputs? Thanks
    a
    k
    33 replies · 3 participants
  • m

    Martin T

    03/09/2022, 12:22 PM
    Hi! Using
    PrefectSecret()
    to load secrets from Prefect Cloud. Is it possible to load secret from cloud into
    prefect.context.secrets
    during flow start so all tasks can access them when needed? Most of our tasks variables are auth-related, and the code/layout is getting overly complex.
    a
    k
    4 replies · 3 participants
  • p

    Prasanth Kothuri

    03/09/2022, 1:05 PM
    Hello, for a prefect task (which queries mysql database with pymysql) I am getting this error, is there a way to increase the timeout ?
    No heartbeat detected from the remote task; marking the run as failed.
    a
    1 reply · 2 participants
  • m

    Michał

    03/09/2022, 1:48 PM
    Hey do you have any idea of a prefect prometheus exporter which goal was for example to monitor unhealthy agents?
    a
    2 replies · 2 participants
  • w

    Wesley Jin

    03/09/2022, 4:31 PM
    Hello, is it possible to dynamically set which
    project
    a
    create_flow_run
    call creates a Flow for depending on environment variables? Example in thread
    k
    7 replies · 2 participants
  • c

    Chris Reuter

    03/09/2022, 4:33 PM
    Hi all! Just a heads up, no PrefectLive this week. Join the #events channel to keep tabs on livestreams! https://prefect-community.slack.com/archives/C036FRC4KMW/p1646843588906869
    :marvin: 3
  • b

    Brian Phillips

    03/09/2022, 11:05 PM
    Is there any way to limit flow concurrency without affecting which agents a flow can be deployed to?
    k
    4 replies · 2 participants
  • b

    Brian Phillips

    03/09/2022, 11:33 PM
    Second question, I have a mapped flow pattern like below. How can I ensure this kicks off all sub-flows simultaneously before waiting for any individual run to complete? I am using a synchronous LocalDaskExecutor for this.
    child_ids_result = create_flow_run.map(
        flow_name=unmapped(...),
        project_name=unmapped(...),
        parameters=parameters,
        run_name=run_names,
    )
    wait_for_flow_run_result = wait_for_flow_run.map(
        flow_run_id=child_ids_result,
        stream_states=unmapped(True),
        stream_logs=unmapped(True),
        raise_final_state=unmapped(True),
    )
    final_task(..., upstream_tasks=[wait_for_flow_run_result])
    k
    5 replies · 2 participants
  • k

    Kevin Otte

    03/10/2022, 5:04 AM
    new here and trying this vs airflow for a side project! Can I have classes or methods defined in other files that can be referenced as tasks in a prefect flow? For example, lets say I have this file structure:
    scrapers
    > scraper1
    > scraper2
    prefect_flow.py
    and scraper1/2 have Classes or methods that scrape specific websites. Am I able to leave those defined in those files, while referencing them in my flow.py file? It would look something like this..
    import ...
    
    @task 
    def scrape_site_one():
        s = Scraper()
        return s.scrape_site('www...')
    ....
    
    with Flow("update historical pricing") as flow:
        scrape_site_one()
    m
    k
    5 replies · 3 participants
  • s

    Samuel Tober

    03/10/2022, 8:19 AM
    Hi! Since yesterday one of our flows has stopped running, it is marked as white in the UI (see screenshot). This is pretty urgent so it would be nice if I could get some quick help!
    s
    a
    +2
    31 replies · 5 participants
  • t

    Tomer Cagan

    03/10/2022, 8:38 AM
    Hi, Trying to understand how I can use prefect with dask while still debug locally I am getting an error. More details, code and errors inside thread
    a
    k
    +2
    31 replies · 5 participants
  • f

    Fredrik Blomgren

    03/10/2022, 9:47 AM
    Hello! I’m new here and evaluating a switch from Airflow to Prefect. We’re not doing any processing in Python/DataFrames/etc. but are using GCP with Beam/Dataflow (don’t see any specific Prefect integration, but I assume it can be easily be done by packaging it in a container and running with a RunNamespacedJob task or similar), BigQuery, Avro on GCS and DBT. Almost all examples I see using cloud storage works with pickled dataframes that are pulled locally when executing the task. I’m looking to use a setup that references Avro datasets on GCS and/or BQ tables and passes them as in/outputs to tasks running as external services (Dataflow jobs, BQ queries, DBT models), similar to Luigi’s Target class. Given Prefect’s focus on dataflow I would expect that to be a supported pattern but haven’t yet been able to find any guides/tutorials/discussions about this, does anyone have any tips? Preferably for Orion as we’re just getting started, but otherwise for 1.0 as a reference. Thanks a lot!
    a
    k
    10 replies · 3 participants
  • l

    Luuk

    03/10/2022, 12:28 PM
    Does anyone have advice for docker agents to stay logged in on the Azure ACR? I have the docker agents running on my vm, it's logged in with an identity, but the
    az acr login --name acrname
    logs out after a couple hours and requires me to do it again. I also run the command within the docker image as well (for the docker agent) It's all working, but after a while I get logged out from the ACR and my flow starts to crash.
    Error message:
    500 Server Error for http+docker://localhost/v1.41/images/create?tag=latest&amp;fromImage=acrname.azurecr.io%2Fimage: Internal Server Error ("Head https://acrname.azurecr.io/v2/image/manifests/latest: unauthorized: authentication required")
    a
    4 replies · 2 participants
  • t

    Tomer Cagan

    03/10/2022, 12:49 PM
    Hi, A general question about how dask is used under the hood. In "regular" dask code there's usually a client (script or from REPL) which connects to a cluster (ephemeral or long-running) and then it can submit work to that cluster. How this is done in Prefect? Assuming I am using DaskExecutor and running on k8s with kubernetes agent - does the client code run in the agents itself? As part of the KubeCluster? Where should I look for the code running it - k8s agent?
    a
    9 replies · 2 participants
  • f

    FuETL

    03/10/2022, 12:59 PM
    Hey guys what is standard way to restart flows via client (library)? I'm trying set to
    set_flow_run_state
    Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
    client.set_flow_run_state(
            flow_run_id=flow_run_id,
            state=Scheduled()
    )
    a
    7 replies · 2 participants
Powered by Linen
Title
f

FuETL

03/10/2022, 12:59 PM
Hey guys what is standard way to restart flows via client (library)? I'm trying set to
set_flow_run_state
Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
client.set_flow_run_state(
        flow_run_id=flow_run_id,
        state=Scheduled()
)
a

Anna Geller

03/10/2022, 1:44 PM
This is a bit more complicated. Restarts is effectively a UI feature that does quite a lot. If you are interested in implementing this in your Client code, check out this PR that added Restarts in the UI repo: https://github.com/PrefectHQ/ui/pull/285/files My understanding of the process: it queries for Failed task runs, queries for downstream tasks of failed task runs, restarts the task runs in the right order (respecting all the dependencies) and sets new task run states after completion. So it's really not a simple and straightforward process because Restarts don't create a new flow run, instead they restart failed task runs, and update their states after completion. The states of restarted task runs are updated within the history of the original flow run. Perhaps it would be easier for you to use
retries
and
triggers
to automatically perform some recomputation of task runs depending on what you try to accomplish?
You can do a lot by combining conditional tasks (e.g.
case
),
retries
and
triggers
to automatically perform some recomputation - check out those examples: • https://discourse.prefect.io/t/how-can-i-trigger-downstream-tasks-based-on-upstream-task-s-state/106#prefect-10-2 • https://discourse.prefect.io/t/how-can-i-stop-the-task-run-based-on-a-custom-logic/83#prefect-10-2 • https://discourse.prefect.io/t/how-can-i-stop-a-flow-run-execution-based-on-a-condition/105#prefect-10-2 • https://discourse.prefect.io/t/how-to-build-a-conditional-flow-of-flows-i-e-trigger-a-different-child-flow-depending-on-the-upstream-flows-state/202
f

FuETL

03/10/2022, 2:20 PM
Thanks for the @Anna Geller, basically i have a process that could fail because of missing of information and because of this we need a manual intervention in the database that we are querying the data the idea is after the fix my human the flow restart and now can have the missing information. The easy way could be start a new flow run right?
Or i could do the same query that is done on UI to archive the same result as well?
Reading what you sent instead of FAIL i could raise a PAUSE state?
and then trigger a resume ?
a

Anna Geller

03/10/2022, 2:37 PM
Here is how I would tackle this:
import random
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.tasks.notifications import SlackTask
from prefect.triggers import all_successful
from typing import cast


def post_to_slack_and_cancel_run_on_task_failure(task, old_state, new_state):
    if new_state.is_failed():
        flow_run_id = prefect.context.flow_run_id
        if isinstance(new_state.result, Exception):
            value = "```{}```".format(repr(new_state.result))
        else:
            value = cast(str, new_state.message)
        msg = (
            f"The task `{prefect.context.task_name}` failed "
            f"in a flow run {flow_run_id} "
            f"with an exception {value}. Cancelling the flow run and manually resuming later."
        )
        SlackTask(message=msg).run()
        client = Client()
        client.cancel_flow_run(flow_run_id)
    return new_state


@task(state_handlers=[post_to_slack_and_cancel_run_on_task_failure])
def run_process_that_may_fail():
    if random.random() > 0.5:
        raise ValueError("Failing due to missing information")


@task(trigger=all_successful)
def run_if_success():
    print("Success")


with Flow("state_handler_ex") as flow:
    first_task = run_process_that_may_fail()
    run_if_success(upstream_tasks=[first_task])

if __name__ == "__main__":
    flow.run()
Note that: • attaching
all_successful
trigger is not needed explicitly since this is the default • cancelling a flow run within this state handler is not necessary because due to downstream task being triggered only on Success, this task wouldn't be triggered if this flaky task fails. But wanted to show it in case you need something like that in your logic I wouldn't recommend rebuilding the Restart logic since this is hard to implement and frankly a bit overkill for the problem at hand. I think you can totally solve your issue using: • retries • triggers • conditional tasks • state handlers • and optionally subflows
View count: 5