https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
best-practices-coordination-plane
  • f

    Felix Horvat

    08/11/2022, 1:03 PM
    Is is possible to have a DockerContainer Block point to a private Container registry - say on gitlab? If so how can I pass the authentication parameters? Or is the assumption that the the Docker service that an agent has access to is logged in to the required container registries?
    💯 1
    a
    b
    • 3
    • 15
  • d

    Darren

    08/11/2022, 7:19 PM
    Not really a good practice using something like pandas in a task function is it?
    j
    a
    t
    • 4
    • 4
  • e

    Edmondo Porcu

    08/11/2022, 11:36 PM
    What is the difference between flow.set_dependency and flow.add_edge in Prefect 1.0?
    a
    • 2
    • 7
  • m

    matt_innerspace.io

    08/16/2022, 5:08 AM
    running prefect v1.0 on aks and can't figure out how to use multiple nodepools - any tips or tricks here? I'd like to have an agent per nodepool, where nodepools have different sized nodes. Any insight or help here would be great! Currently I have 2 aks clusters, one with a nodepool of small nodes (4 cores, 16gb memory) and one with a nodepool of medium nodes (16 cores, 64gb memory).
    a
    m
    h
    • 4
    • 3
  • j

    Jack Prominski

    08/16/2022, 1:38 PM
    Hi! I'm looking for some details about specifying which virtual environment a flow/deployment should be executed in. I was following the virtual environment tutorial but got an error running it on my own, and then found these release notes saying that flow runners are deprecated. The infrastructure docs aren't super clear to me -- can/should a conda/venv be specified in a Process infrastructure block? Would just starting the agent after activating the correct venv work? The pattern I'm following is this: I have several different projects that each have their own venv, and I would like to orchestrate them all using prefect. Prefect is installed in each venv, and again in a "prefect venv" in which I launch Orion. When I launch deployments/queues/agents, I want to make sure that the flows within are executed in the proper venv. Is this the right way to think about/approach this?
    k
    m
    +2
    • 5
    • 13
  • c

    Caio Rogério Silva dos Santos

    08/16/2022, 7:23 PM
    Hello all! I'm currently developing some high update frequency pipelines with prefect v1.0 and have stumbled on a question: for each data capture that runs every minute, we log to a BigQuery table if that run was successful or not, this data is used as input for a hourly recapture, that goes and try to request the data for each uncaptured minute in the past 24 hours. The thing is, the way we do it now is by, basically, copying and pasting the code for a single capture and use
    .map
    , mapping each task in the pipeline to one of the failed timestamps. This generates some issues, as all the data for each timestamp remains in memory during the flow run, exploding our resources limits and taking forever to run each task sequentially. Talking with a coworker, we arrived at a quite elegant solution, which would be to create a capture flow run for each timestamp, distributing the work through multiple pods and cleaning up the code quite nicely. The problem is the task
    create_flow_run
    does not work with
    .map
    as it's giving me the error:
    TypeError: Cannot map over unsubscriptable object of type <class 'pipelines.utils.custom.CustomFlow'>
    So I'm here to ask, can someone point me to the direction of what would be the best practice to implement this idea? Before this, I've tried looking kinda deep into garbage collection to try and retrieve the memory used for each piece of data, to no avail, as we need to produce code with as low maintenance cost as possible and are looking for robustness where we can. Thanks!
    n
    • 2
    • 7
  • a

    Alex Wilcoxson

    08/18/2022, 7:30 PM
    Hi there! I'm writing a flow that is orchestrating external jobs via HTTP requests. The HTTP client supports async and thus I'm making my flow and tasks async functions. If using async is there a preference on still using
    .map
    vs
    asyncio.gather
    on a list of started async tasks? Thanks!
  • j

    Jimmy Le

    08/18/2022, 10:38 PM
    Here is a sample supervisor configuration file for running your agents in the background and making sure they restart in the event of a failure. I've also added some venv environments so you can have different virtual environments for different work queues. Add this to your
    /etc/supervisor/conf.d/
    folder. I've named mine `prefect.conf`:
    [program:prefect]
    command=/home/usr/venv/bin/python /home/usr/venv/bin/prefect agent start --work-queue "queue-name"
    autorestart=true
    autostart=true
    stderr_logfile=/var/log/prefect.err.log
    stdout_logfile=/var/log/prefect.out.log
    environment=PATH="/home/usr/venv/bin",PREFECT_API_URL="",PREFECT_API_KEY=""
    👍 3
    🙌 2
    :thank-you: 1
    j
    a
    • 3
    • 3
  • e

    eddy davies

    08/19/2022, 1:51 PM
    Are triggers still a feature in Prefect v2? Can't see them mentioned in docs anywhere, interested in running a prefect flow after a file lands in an s3 bucket
    👀 2
    b
    • 2
    • 4
  • s

    Sander

    08/24/2022, 6:00 PM
    Hi, does anyone have a best-practice code example on an ML pipeline from data ingestion from an external source to cleaning to feature generation to fitting and and subsequent use of the model using prefect? Wondering whether to split up these parts as tasks or individual flows and whether to store the interim results or not taking into account the possibility for (easier?) debugging in case they’re flows?
    s
    • 2
    • 2
  • j

    Jiri Klein

    08/26/2022, 3:31 PM
    Hi, I’m currently having some struggles with Mapped Tasks and
    LocalDaskExecutor
    in V1 - I have two sets of mapped tasks, each set executing in parallel. Is there a way of informing Prefect to wait for 1 set to fully finish? I tried Task triggers with
    all_successful
    flag, but to no avail, the second set of mapped tasks fails with
    TriggerFailed
    . Python pseudocode sample would be e.g.
    from prefect.triggers import all_successful
    from prefect import task, Flow
    
    
    @task(name="Task A")
    def task_a(input: Any):
        return embarrassingly_parallel_func_1()
    
    @task(name="Task B", trigger=all_successful)
    def task_b(input: Any):
        return embarrassingly_parallel_func_2(input)
        
    with Flow() as flow:
        flow.executor = "LocalDaskExecutor"
        _coll = [1, 2, 3]
    
        _a = task_a.map(input=_coll)
        _b = task_b.map(input=_a, upstream_tasks=[task_a])
    Unfortunately, for me some child tasks from
    task_b.map
    either begin executing BEFORE all
    task_a.map
    are finished OR they fail with
    TriggerFailed
    Does anyone have any experience on this?
    1️⃣ 1
    ✅ 1
    n
    • 2
    • 4
  • l

    Luca Vehbiu

    08/26/2022, 6:02 PM
    Hi guys, I have a task querying a Redshift database whose credentials are set outside the task. I use a docker storage and ecs run. Locally the flow runs fine, on the ecs it times out connection to redshift cluster. If I remove the task decorator the database is queried fine. I think only database credentials are not passed because loading to s3 works so its not aws credentials. Has this to do with mapping in tasks?
    r
    • 2
    • 2
  • j

    John Kang

    08/29/2022, 4:58 PM
    I have a large flow that I use to query data, transform it, test it, and insert it into a database (with each of these steps as either tasks or sub-flows). My question is what is a best practice regarding: 1. Failing the entire flow if one part of the flow (a sub flow or task) fails? I want the entire flow to fail if the testing discovers an issue with the data (I have custom Pandera data checks setup because our database frequently experiences failures). I've thought about passing an exception to each subsequent task/sub-flow but that seems cumbersome. 2. Emailing details regarding failures. I've setup a custom function to email exceptions if they occur. Should I just insert a try/except for each task/sub-flow? I think that would work, but wasn't sure if there was a better method.
    r
    • 2
    • 3
  • d

    Daniel Lomartra

    08/29/2022, 11:16 PM
    In Prefect 1.0, I want to use the env parameter of the DbtShellTask task to set environment variables that will be utilized by my dbt project. Does anyone know how this behaves if you pass env a dictionary with values that can be lists or other dictionaries? For example, if I pass it the following dictionary:
    {
    	'dbt_env_var1': 'value1',
    	'dbt_env_var2': ['value2','value3','value4'],
    	'dbt_env_var3': {
    		'another_key': 'value5'
    	}
    }
    Will prefect accept this? will dbt_env_var2 and dbt_env_var3 be coercible back to their original data type in jinja or just passed as a string?
    • 1
    • 1
  • g

    Ghislain Picard

    08/30/2022, 7:14 AM
    Hi, how to run a flow in a specific conda environment (using Process infrastructure). I see two options: - run a specific worker agent in a conda environment, and link the flow to that agent. - use the “command” option of the Process infrastructure to point to the specific python binary of that environment. What is the best practice ? Is there any other option ?
    ✅ 1
    j
    m
    • 3
    • 2
  • j

    Jordan Charlier

    08/30/2022, 2:26 PM
    Hi, in Prefect 1.0, can I automate the aws ecr auth ? Thanks !
    ✅ 1
    a
    • 2
    • 4
  • s

    Slackbot

    08/30/2022, 8:19 PM
    This message was deleted.
    👀 1
    b
    • 2
    • 1
  • s

    Stéphan Taljaard

    08/31/2022, 9:30 AM
    Hi I have a general Prefect 1.0 workflow question... Calling
    flow.register(..., _*idempotency_key=flow.serialized_hash()*_, ...)
    , only bumps the actual flow version if the hash changes. As expected. Bundling this with flow-based storage (instead of pickle-based), it should work well in CICD processes. It is then only required to bump the flow version if metadata changes, allowing behaviour changes inside tasks without having to re-register the flow. Let's say that you do change the innards of a task, the flow metadata does not change. Thus the serialized hash does not change. If you then run
    flow.register(..., idempotency_key=flow.serialized_hash(), ...)
    : 1. A new flow script file will be built and pushed to storage (e.g. my GCS Bucket) (because the default value for build=True). 2. The flow version does not get bumped, as expected. However, when creating a flow run, it still uses the previously uploaded flow script from GCS, not the new one just created (because the unchanged idempotency key prevents pointing to the new file in storage). Thus, you either have to: • force a re-registration anyway (e.g. by not specifying a idempotency key) • "manually" upload a new flow script file into Storage, with the same name as the previous one, replacing the old one. This does not feel as automated as reading the docs makes it sound like to be? Am I missing something?
    ✅ 1
    a
    • 2
    • 3
  • m

    Mike Kovetsky

    08/31/2022, 2:16 PM
    Hey, Prefect community! I am creating a prefect (v2.0.4) flow for hyperparams search. I use ray[tune]=2.0.0 as a library for tuning. Could you please assist in finding some answers: 1. Does anybody have a working example? I supposed this to be a common task (especially when i saw RayTaskRunner) so I expected to find some working example. 2. I’d like to use my prefect task as a ray trial. So that i can see tasks in the prefect dashboard. Unfortunately, tune.Tuner requires a function as an arg. When i try tune.Tuner(prefect_task), prefect gives the error:
    RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use `task.fn()`.
    When i try tune.Tuner(prefect_task.fn), the flow runs successfully. But the task registration is skipped, so I see no progress in the prefect dashboard 😞 This point is really crucial, because I cannot run parallel tasks with RayTaskRunner, as there are no prefect tasks registered. 3. I wonder what is the best practice to manage max concurrency. Should i specify concurrency-limit 2 on the prefect level? Or maybe on Ray level tune.TuneConfig(max_concurrent_trials=2)? Thank you in advance! 🙂
    k
    • 2
    • 1
  • a

    Abdullah Khan

    08/31/2022, 6:29 PM
    Is there a way to increase time out for connection to remote database in prefect2, it seems like initial connection times out before all tables are created in database
    ✅ 1
    a
    m
    j
    • 4
    • 3
  • j

    John Kang

    08/31/2022, 8:55 PM
    I just upgraded to 2.3.0 and tried to re-build a remote GCS deployment (which was working in 2.2.0), but I'm having an issue where it is not uploading enough files to GCS. The previous prefect deployment build command uploaded ~200 files (which are necessary), but now it is only uploading around ~26. I'm not sure what is going on... I have not changed anything in the prefect deployment build command nor the .prefectignore file...
    👀 2
    m
    • 2
    • 4
  • y

    Young Ho Shin

    09/05/2022, 11:47 AM
    Hello all. I have what I think is a very standard command-line workflow that I would like to implement with Prefect 2.0. Here's what I want to do: - Process many files with the same shell script in parallel - Track progress of overall processing and individual files on Orion UI - Overall progress: Number of current running/failed/completed tasks - Individual progress: View stdout/stderr logs of tasks that are complete, or are currently running - Optionally: Automatically retry failed files, or notify user Is this something that's well-supported in Prefect? Is there maybe a well-documented example somewhere that I can follow? Note: I've played around with [prefect-shell](https://github.com/PrefectHQ/prefect-shell), but I'm having trouble finding a straightforward way to view task logs for tasks that are currently running.
    a
    • 2
    • 7
  • n

    Naila Chennit

    09/05/2022, 6:27 PM
    Hello everyone ! I updated prefect to
    2.1.1
    vesrion and when deploying my flows using kubernestes-job as infrastructure, im having this error below with
    manifest_path
    argument
    manifest_path
      none is not an allowed value (type=type_error.none.not_allowed)
    is it mendatory to set the
    manisfest_path
    ? Here is my deployment def
    deployment = Deployment.build_from_flow(
    flow=my_flow,
    name="tes-deployment",
    version="1",
    work_queue_name="kubernetes",
    tags=["dev"],
    ignorefile='.prefectignore',
    storage=storage,
    infrastructure=infra
    )
    ✅ 1
    a
    • 2
    • 3
  • p

    Patryk Kalinowski

    09/06/2022, 1:14 PM
    Hi All, I have a quick question as I'm trying to understand Prefect I need my own machine/server, to make it work, right? Prefect Cloud is only a scheduler and logger for scripts running somewhere else. Is there any full cloud offering where I can upload files, schedule execution and forget?
    ✅ 1
    k
    • 2
    • 4
  • l

    link89

    09/07/2022, 6:43 AM
    Hi all, does prefect 2.0 support class based task definition? The reason I ask is because I have serveral tasks that can share common steps and internal states, which could be done by inherit the same based class natually.
    ✅ 1
    k
    • 2
    • 2
  • z

    Zi Yuan

    09/07/2022, 11:26 AM
    Hi hi! regarding the PrefectSecret task in Prefect 1.0. Is it fine to directly use Secret(secret_name).get() (from prefect.client.secrets import Secret ) instead of the PrefectSecret if I don’t want let it be a task? Any downsides to do like this? Many thaanks!
    ✅ 1
    c
    • 2
    • 2
  • a

    Angel Acosta

    09/07/2022, 7:06 PM
    Hello, I am wanting to create a flow that spawns sub-flows for each file in a folder that will be processed. If one of the files fails to process I do not want it to interrupt the work of another sub-flow. I am not sure the best way to go about this is. Does anyone have a recommendation?
    ✅ 1
    s
    • 2
    • 4
  • s

    Stefan

    09/07/2022, 7:19 PM
    With 2.0 and the new deployment-methods, what is the best way to pass dynamic variables (equivalent to sys.args) if you want to start a flow run with different inputs? I've been reading the docs (here) on Flows and Deployments, and from what I gather, we pass the parameters as a dictionary. But I cannot see any examples on how to "receive" said parameters in the flow (akin to retrieving and assigning sys.args to variables)
    ✅ 1
    m
    • 2
    • 3
  • f

    Fady Khallaf

    09/07/2022, 11:20 PM
    Hello, I am using prefect open source I have deployed orion server on my k8s, I just want to figure out how I can authenticate my requests to the self hosted orion server via PREFECT_API_KEY to prevent anyone that knows my domain name to send requests to it and cause a workload for my k8s nodes (this our security concern)
    ✅ 1
    c
    d
    • 3
    • 7
  • g

    gertjan

    09/08/2022, 9:49 AM
    Is it correct that when you add new parameters to a flow you need to delete the current deployment and recreate the deployment to update these in the UI? As a result, is it best practice to always delete all deployments and recreate them on launch? What happens with for example the
    description
    field? As you cannot set this via the cli?
    m
    j
    • 3
    • 7
Powered by Linen
Title
g

gertjan

09/08/2022, 9:49 AM
Is it correct that when you add new parameters to a flow you need to delete the current deployment and recreate the deployment to update these in the UI? As a result, is it best practice to always delete all deployments and recreate them on launch? What happens with for example the
description
field? As you cannot set this via the cli?
m

Michael Adkins

09/08/2022, 5:48 PM
Creating a deployment updates an existing one with the same name, so no deletion is necessary.
👍 2
Only passed fields should be updated, but this has been changed recently so I’m not sure.
g

gertjan

09/09/2022, 6:29 AM
Hmm.. If I change the parameters, they are not updated in the UI.. (on latest versions of prefect)
j

Jeff Hale

09/09/2022, 11:27 AM
Adding a few follow on questions. Are you changing parameter fields or parameter values? There was a bug where additional parameter fields were not being added if some already existed, but I thought that was fixed recently. Will have to look at the issues/PRs. Are you applying the change?
g

gertjan

09/09/2022, 11:30 AM
I’m changing parameter fields. So imagine a flow without parameters and you update it with parameters, build and apply deployment, UI doesn’t find those parameters If you delete the deployment, build and apply deployment, UI finds those parameters
j

Jeff Hale

09/09/2022, 12:32 PM
Gotcha. Yes, an issue was filed to this effect last week and I confirmed the bug. A PR that should have fixed it was merged 6 days ago and 2.3.2 was released on Tuesday, so it should work in 2.3.2. Is that the version you’re using?
g

gertjan

09/29/2022, 7:52 AM
I can confirm this is fixed.
👍 1
🙌 1
:thank-you: 1
View count: 4