https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • t

    Tim Enders

    01/24/2022, 7:10 PM
    What is the best way to use a single client object across some mapped tasks?
    k
    m
    p
    • 4
    • 12
  • k

    Keith Veleba

    01/24/2022, 7:39 PM
    Hello. I'm looking for some information on how to call ecs Task Defintions from within a Flow. I've been pointed to AWSClientWait as a method, but I can't find any illustrative examples on how to kick it off. Any help or guidance would be greatly appreciated.
    k
    • 2
    • 3
  • m

    Muddassir Shaikh

    01/25/2022, 6:24 AM
    The child task is stuck in pending state even if its upstream tasks are all successful. How to fix this ?
    a
    • 2
    • 2
  • e

    Emma Rizzi

    01/25/2022, 9:11 AM
    Hello! I'm having a little bug with automation, a few days ago I was struggling with emails notifications when sending to an alias, so I switched to slack notifications We realized it was due to restrictions on the alias (only mails from our organization were allowed) We removed this restriction and now prefect sends us notifications by email, BUT I deleted all the email automation and the mail keeps coming, what should i do ?
    a
    • 2
    • 5
  • a

    Andreas Tsangarides

    01/25/2022, 10:55 AM
    hi all, trying to schedule ML predictions using prefect. this involves loading a pre trained pickled ML model, refitting, and saving new model daily. The very first model can be saved either using a pickle/joblib backend I know how to write manually the code for reading/writing the model locally/to s3, but has anyone done this using
    LocalResult
    and
    S3Result
    ?
    a
    • 2
    • 7
  • v

    Vincent Chéry

    01/25/2022, 11:07 AM
    Hi all 👋 I'm a little confused regarding using regular functions (not tasks) in flows and cannot find information about this in the docs. Maybe I did not look in the write place ? For instance, in
    with Flow("my-flow") as f:
        a = make_a()
        b = make_b()
        c  = a + b
    • Do
    make_a
    and
    make_b
    need to be tasks or can they be regular functions ? • In particular, when using builtin functions (for instance,
    datetime.utcnow()
    ), can we use it directly or must we wrap it in a task ? • Is
    c = a + b
    allowed ? Thanks a lot !
    a
    • 2
    • 2
  • j

    Jons Cyriac

    01/25/2022, 1:23 PM
    Can context secrets be passed from agent to dask executors during flow run? Sorry if im repeating this question. Im trying to get some credentials available on the dask workers, I have set envs like
    PREFECT__CONTEXT__SECRETS__DB_URI
    on the agent. Is this supposed to be available as prefect.context.secrets["DB_URI"] when the flow is run on dask worker without any other setup?
    a
    • 2
    • 21
  • j

    Jean-Baptiste Six

    01/25/2022, 2:25 PM
    Hi 🙂 I have an issue to use _*GCSResult, t*_he results are not stored (but there isn't any error raised). First, I tried to use environment variable :
    from dotenv import load_dotenv
    load_dotenv()
    
    @task()
    def add(x, y=1):
        return x + y
    
    with Flow("Test GCSResult", result=GCSResult(bucket='thread-prefect-flows')) as GCSResult_flow:
        first_result = add(1, y=2)
        second_result = add(x=first_result, y=100)
    Without success, however :
    from dotenv import load_dotenv
    load_dotenv()
    
    def implicit():
        from google.cloud import storage
    
        # If you don't specify credentials when constructing the client, the
        # client library will look for credentials in the environment.
        storage_client = storage.Client()
    
        # Make an authenticated API request
        buckets = list(storage_client.list_buckets())
        print(buckets)
    
    implicit()
    works well, then I'm not sure to understand : "_Make sure your Prefect installation can authenticate to Google's Cloud API_" (https://docs.prefect.io/core/advanced_tutorials/using-results.html#example-running-a-flow-with-gcsresult) Then, I tried to use Local Secret (https://docs.prefect.io/core/advanced_tutorials/using-results.html#example-running-a-flow-with-gcsresult)
    [context.secrets]
    GOOGLE_APPLICATION_CREDENTIALS="auth/token.json"
    GCP_CREDENTIALS="auth/token.json"
    (don't know what to use between GCP_CREDENTIALS (prefect.utiliities.gcp) or GOOGLE_APPLICATION_CREDENTIALS (in the documentation of GCP), anyway I tried both and it didn't work neither. What did I do wrong? Please help 🙏
    a
    k
    • 3
    • 20
  • m

    Mukamisha jocelyne

    01/25/2022, 2:36 PM
    Hi, I am getting the following error on prefect cloud, No heartbeat detected from the remote task; marking the run as failed. I tried adding [cloud] heartbeat_mode = "thread" in my backend.toml file and this line, flow.run_config = UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}) in my code but I'm still getting the same error. Any help to what I am missing please?
    a
    • 2
    • 10
  • s

    Sam Werbalowsky

    01/25/2022, 4:57 PM
    Hello - has anyone successfully been able to pass parameter names into a flow name? I am trying to schedule a flow of flows by doing the following within a flow, but it always creates a flow titled
    My Flow - today - today
    rather than creating the date range
    start_date = Parameter("start_date", default="today")
    end_date = Parameter("end_date", default="today")
    
    # this function handles parameter typing and converts today to YYYY-MM-DD
    start_date_str = get_date_param_as_string.run(start_date)
    end_date_str = get_date_param_as_string.run(end_date)
    
    create_flow_run(
            run_name=f"My Flow - {start_date_str.run()}-{end_date_str.run()}",
            flow_name="My Flow Name",
            project_name=prefect_project,
            parameters={"start_date": start_date, "end_date": end_date, "file_type": "file_type"},
        )
    k
    • 2
    • 11
  • j

    Jason Motley

    01/25/2022, 4:58 PM
    What's the best way to incrementally load tables (e.x. a loop) but then also ensure that if a connection is lost, the task retry will re-establish the connection AND not create duplicates on previously loaded rows?
    k
    • 2
    • 2
  • s

    Suresh R

    01/25/2022, 5:33 PM
    Hi,How can we skip some tasks in a flow
    k
    • 2
    • 3
  • j

    Jons Cyriac

    01/25/2022, 5:43 PM
    Im using helm chart in k8s for running prefect server. In it's value.yaml, I've passed env (which gets populated from secrets). Problem is, context secrets gets populated only if I manually edit the agent deployment and set the --env argument in "agent start" command. just setting the env doesnt populate the context.secret in tasks. Any help?
    k
    • 2
    • 3
  • f

    Fina Silva-Santisteban

    01/25/2022, 5:55 PM
    Hi everyone! Is there a way to set a time limit for a flow run or a specific task? We’ve updated our python dependencies and some packages don’t seem to play nicely with each other anymore, one of the external apis we use somehow got stuck in an infinite loop, which made one of our flows run for 16hrs+ and probably longer if I hadn’t noticed and manually canceled it. 😓
    k
    • 2
    • 4
  • a

    Adi Gandra

    01/25/2022, 6:26 PM
    Hey, i’m wondering for
    prefect agent kubernetes install
    what are the default mem and cpu limits? I have my tasks running on kubernetes spun up on EKS, and setup the agent using that command. However, the task is a download FTP file and it just stalls. No progress for hours. I’m wondering if this could be a possible mem and cpu limit, and if I should basically run the install command again - and will that overwrite my old manifest?
    k
    • 2
    • 5
  • r

    Rob Landis

    01/25/2022, 9:53 PM
    Hi, quick question about this workflow, from the diagram it appears flow B and C are running 'in-parallel' but when I run this flow B and C are actually one after other with C running after they both complete. The flow of flows works great but does not appear to run the flow in true parallel... https://docs.prefect.io/core/idioms/flow-to-flow.html#scheduling-a-flow-of-flows
    k
    • 2
    • 3
  • m

    Mathijs Miermans

    01/25/2022, 10:33 PM
    Is it expected that the logs contain
    Starting task run...
    for tasks that are skipped because an upstream task fails? (Example code in thread.)
    k
    • 2
    • 3
  • l

    Leon Kozlowski

    01/25/2022, 11:39 PM
    When a parameter is changed thru the UI under settings for a flow, does this automatically propagate to future runs? I usually don’t see the change reflected in a pending flow run unless I flick the schedule off and on - I only do that to be pedantic but was wondering if it is neccesary
    k
    • 2
    • 1
  • j

    joshua mclellan

    01/26/2022, 2:27 AM
    hey so im working with some legacy code that has working directory requirements (ie needs next to or one level up from a number of dependent files). When i run a flow locally in a script using flow.run() is there a way to have it run from a specific directory irrespective of where the python session is initialized from?
    k
    • 2
    • 1
  • o

    Ovo Ojameruaye

    01/26/2022, 5:54 AM
    Hello, Is it possible to define dependencies between a flow scheduled to run on an agent on premise and an azure ML pipeline scheduled on the azureML that runs on a cluster managed by azure? I have some flows that perform some ETL that should be done before my AzureML pipeline runs. Sometimes the ETLs runs longer than it should and since the data is refreshed, my AzureML pipeline runs on stale data. Appreciate any help
    a
    • 2
    • 2
  • t

    Tomek Florek

    01/26/2022, 9:58 AM
    Hey all 🙂 I’m setting up Prefect with GE and can’t find the right way to implement the logic I have in mind. If the GE Validation fails (some data is wrong) I would like to be notified via Slack (Prefect Automation) as sorts of a warning, but the flow should carry on. What would be the correct way to implement that? 🧐
    a
    • 2
    • 21
  • h

    Henning Holgersen

    01/26/2022, 11:52 AM
    Hi, I’m using prefect cloud and an AKS (Azure Kubernetes) cluster running a vanilla k8s agent (from the prefect agent kubernetes install command). The agent connects to prefect Cloud, but when I run a flow I get an error saying
    'Connection aborted.', OSError(0, 'Error')
    . I have added the RBAC stuff to the deployment, and opened outbound traffic to api.prefect.io on port 443, but I have not been able to get anything to run on the AKS agent. Any ideas?
    a
    • 2
    • 25
  • r

    Romain

    01/26/2022, 12:17 PM
    Hello, We are facing an annoying issue while using prefect with prefect server. Here is the precise stack: • prefect 0.15.12 (but issue was also often observed with previous version) • prefect server 2022.01.12 • executor DaskExecutor with cluster_class being KubeCluster • using kubernetes runner • and azure as storage What we observe is that sometime our temporary dask cluster which is started by the prefect run is not tear down at the end of the run. It's kinda annoying because if we don't pay attention it means we keep using the VM resources for nothing. Has anyone faced a similar issue?
    a
    • 2
    • 14
  • s

    Sidney Fox

    01/26/2022, 2:48 PM
    Hi - looking to call a flow within another flow and return the results of the child flow to the parent flow. This works well using the Local agent and Local storage, but returns the following error when called with the Kubernetes agent and GitHub storage:
    ValueError: The task result has no `location` so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly.
    I’m not explicitly providing where or how results of the child flow should be stored, and I believe that is the root cause of the error thrown. Is that correct? If so, what is the preferred method for storing results? e.g. is PrefectResult preferred over S3Result or is it a matter of opinion?
    k
    • 2
    • 2
  • s

    Sophie Courtemanche-Martel

    01/26/2022, 4:09 PM
    Hi all ! New to Prefect here 🤟 I'm working on a flow that would send data retrieved using MySQLFetch() to an S3 bucket. What would be the best way to transform the tuple that returned from the MySQLFetch() so that it can be written into an S3 in the appropriate format ? Currently I'm getting this error if I'm sending the results directly to the S3Upload() function :
    stream = io.BytesIO(data.encode())
    AttributeError: 'tuple' object has no attribute 'encode'
    k
    a
    • 3
    • 9
  • d

    David Yang

    01/26/2022, 4:12 PM
    Hi all, How to use secret defined in prefect cloud in Azure storage? I have added "azureflowsecret" as secret in prefect cloud. This is my code: Storage=Azure("edwflows",connection_string_secret="azureflowsecret") but when I tried to register it to prefect cloud, it failed with the error: ValueError: Local Secret "azureflowsecret" was not found.
    k
    • 2
    • 4
  • k

    Kevin Kho

    01/26/2022, 4:36 PM
    Join us for the Prefect livestream at 3 pm today! Looking forward to seeing you guys. We will parallelize a machine learning model grid search over Dask with Prefect.
    :marvin: 1
    📺 2
    l
    c
    • 3
    • 5
  • e

    Eli Treuherz

    01/26/2022, 5:20 PM
    Say I’ve got two or more lists I’d like to map a task over. If I provide two mappable arguments to
    Task.map()
    it iterates through both of them together like
    zip()
    . Is there a way to loop over every combination instead, like
    itertools.product()
    ?
    k
    • 2
    • 10
  • h

    Heeje Cho

    01/26/2022, 6:04 PM
    Is there a prefect recommended way of checking length of tuples received from database fetches (PostgresFetch, SqlServerFetch)? I don't think you can do get length on a task. This is just to set a case statement depending on length of tuple.
    a
    • 2
    • 2
  • m

    Matthew Seligson

    01/26/2022, 7:03 PM
    What means are available for tracking task runs across different versions of a flow? If I have a flow with one task, and I reregister the flow, the task’s ID will change.
    k
    a
    • 3
    • 7
Powered by Linen
Title
m

Matthew Seligson

01/26/2022, 7:03 PM
What means are available for tracking task runs across different versions of a flow? If I have a flow with one task, and I reregister the flow, the task’s ID will change.
k

Kevin Kho

01/26/2022, 7:05 PM
Maybe the task name would help?
m

Matthew Seligson

01/26/2022, 7:12 PM
I’m wondering what facilities are built into Prefect to support this. For example I’d like to be able to change the task name. If I do that, the task id will change. What do I have available to determine the previous ID?
k

Kevin Kho

01/26/2022, 7:17 PM
We don’t know the task content though so I don’t think we can support that other than the
name
or
slug
. We also have
tags
if that helps.
Could you tell me more about the use case? Are you trying to do something like load all task runs across the Flow run history?
m

Matthew Seligson

01/26/2022, 7:32 PM
Thanks Kevin. In a simple case I have a flow with one task that checks network activity and either fails or succeeds, and I run the flow every day for a year. The next year I add a new task to the flow to send an email indicating the result (success or failure) of the first task. My new flow has two tasks, the network task and the email task. However, the task run history for the network task is empty because it is technically a different task with a different task ID. I want to see the full run history of the network task.
a

Anna Geller

01/26/2022, 7:52 PM
I second Kevin’s suggestion to use
tags
for such use case - imo it’s probably the cleanest approach and it will work nicely in Orion. But even using the task name or slug can be used in your use case. It’s all a matter of combining data from multiple queries. For example, this is how you can get all the versions of a specific flow:
query {
  flow(where: { name: { _ilike: "%dask%" } }) {
    id
    name
    version
    tasks {
      name
      id
      slug
      tags
    }
  }
}
you can replace the “dask” text with some part of your flow name. And then to get the task run history for that, you would need another query. But note that storing logs and the history of your flow and task runs over such a long period of time (you want to analyze a period of potentially two years of run history?) would be difficult or at least expensive. For example, Prefect Cloud standard plan has a 2 weeks of log retention.
in short, even if your task id changes, you can still retrieve this data via name/slug/tags to get the history, as long as your backend still has that history (based on your retention period settings)
View count: 3