https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    Dilip Thiagarajan

    01/07/2021, 11:04 PM
    hi all, had a question re: how flows are run when registered. I noticed that when they are run on an agent after registering, the actual 
    flow.run
     method isn’t called. Does anyone know if that’s expected?
  • s

    Sai Srikanth

    01/07/2021, 11:07 PM
    Hey team, how do I get prefect CLI? I tried from docs but couldn't find it
    👀 1
    d
    j
    11 replies · 3 participants
  • v

    Verun Rahimtoola

    01/07/2021, 11:23 PM
    hi, may we assume that the output format of the prefect CLI will remain unchanged? basically we might have a need to grab the latest flow_run_id of a flow, and i see now that the cli returns this information in descending order (most recent flow run at the top). is it safe to write logic against the format of this output? we could also directly go against the graphql db, but i'm wondering if the data stored in the backend will have a relatively stable schema from one prefect version to the next? so as not to break any of our graphql queries
    d
    3 replies · 2 participants
  • i

    itay livni

    01/08/2021, 2:04 AM
    Hi - I got an error message in Prefect Cloud
    Failed to load and execute Flow's environment: SyntaxError("invalid or missing encoding declaration for '/home/.../MOC/etl_moc.py'")
    -After the flow began and executed some tasks successfully. The flow was run locally. Is this a bug or something not configured correctly on my end? Thanks
    c
    6 replies · 2 participants
  • j

    Joël Luijmes

    01/08/2021, 12:32 PM
    When I cancel a flow in the UI, it stops further execution of tasks right? I got two questions/remarks: • Is the active task killed in some way? Or will it wait until it completes? • When I use a resource manager, I noticed that unfortunately the cleanup will not be called 😕 . Is there some way I can cancel my flow, but still cleanup resources? Maybe force it with a state handler, if state handlers still are triggered?
    n
    2 replies · 2 participants
  • d

    Dan Corbiani

    01/08/2021, 2:31 PM
    Hi, I'm new to prefect and I'm trying to use it to evaluate the performance of code / parameter changes. I've created a basic flow but cannot figure out how to get historical metrics on each of the tasks. Is there something that I'm missing? My hope would be to get a table that looks like: | Flow Name | Parameters | Task Name | Start Time | End Time | Duration | I've tried going through the graphQL api and I've looked through the UI. Any direction would be appreciated.
    n
    6 replies · 2 participants
  • j

    Jeff Williams

    01/08/2021, 2:39 PM
    Is there a place where the exit codes are documented? When I stop the server on my GCP implementation, I see several components exit with code 137. This may be perfectly normal, but it would be nice to know what the exit codes mean. Also, the window that I started the server in does not return to a command prompt when it is stopped. Is that normal? It tells me that "tmp_ui_1 exited with code 137" and that is the last message I get.
    n
    1 reply · 2 participants
  • m

    Marwan Sarieddine

    01/08/2021, 2:58 PM
    Hi folks, we ran into an issue where one of our child tasks ran until Success then was restarted for some reason causing our flow to crash. Any idea why this happened ? and is there a way to prevent this ? We are running a kubernetes agent and a dask kubernetes execution environment on EKS...
    c
    10 replies · 2 participants
  • v

    Vipul

    01/08/2021, 3:27 PM
    I am trying to setup a project with Prefect and I do have setup and teardown process. I could register the flow and project during the setup and could delete the project during teardown but I am not sure how can I remove the entry for Prefect Agent from Prefect Server. Any suggestion? Thanks
    n
    6 replies · 2 participants
  • m

    Matt

    01/08/2021, 8:28 PM
    Has anyone used Prefect to manage Dataform runs and mind sharing with me how they designed it?
    👀 1
    :watching: 1
  • m

    matta

    01/09/2021, 2:46 AM
    Getting this whenever I try to run a Flow with a Mapped task on Prefect Server:
    Failed to retrieve task state with error: ValidationError({'_schema': 'Invalid data type: None'},)
    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 193, in initialize_run
        map_index=map_index,
      File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1387, in get_task_run_info
        state = prefect.engine.state.State.deserialize(task_run_info.serialized_state)
      File "/usr/local/lib/python3.6/site-packages/prefect/engine/state.py", line 362, in deserialize
        state = StateSchema().load(json_blob)
      File "/usr/local/lib/python3.6/site-packages/marshmallow_oneofschema/one_of_schema.py", line 144, in load
        raise exc
    marshmallow.exceptions.ValidationError: {'_schema': 'Invalid data type: None'}
    Using 0.13.19 right now Looks like some kind of serialization problem?
    d
    39 replies · 2 participants
  • d

    Dr. Jones

    01/09/2021, 11:26 AM
    I get this error message by the Prefect agent when a scheduled flow is about to start. Can you help me? 🙂
    [2021-01-09 11:20:53,118] ERROR - My_Agent | 400 Client Error: Bad Request for url: <http://apollo:4200/>
    
    The following error messages were provided by the GraphQL server:
    
        GRAPHQL_VALIDATION_FAILED: Cannot query field "run_config" on type "flow_run".
    
    The GraphQL query was:
    
        query {
                flow_run(where: { id: { _in: ["84b340e4-9f74-425b-9abd-27247fdc3759"] }, _or: [{ state: { _eq: "Scheduled" } }, { state: { _eq: "Running" }, task_runs: { state_start_time: { _lte: "2021-01-09T11:20:50.061124+00:00" } } }] }) {
                    id
                    version
                    state
                    serialized_state
                    parameters
                    scheduled_start_time
                    run_config
                    flow {
                        core_version
                        name
                        storage
                        id
                        environment
                        version
                }
                    task_runs(where: { state_start_time: { _lte: "2021-01-09T11:20:50.061124+00:00" } }) {
                        serialized_state
                        version
                        task_id
                        id
                }
            }
        }
    
    The passed variables were:
    
        null
    a
    2 replies · 2 participants
  • p

    Prash S

    01/09/2021, 6:41 PM
    Hello everyone 🙂 I'm trying to run flows on a Kubernetes cluster (Minikube for now) and I'm struggling to with an issue -- I see agents alive and responsive, but they don't pick up any tasks. I'm running on Prefect Cloud, using the DaskExecutor with Docker storage (passing in a Dockerfile which built fine). Code/more details in thread. Would appreciate any help, thanks!
    j
    7 replies · 2 participants
  • j

    jeff n

    01/10/2021, 12:02 AM
    Sorry if this is basic. I am trying to use GitHub as a storage location for my flow. However the flow has a requirement on another local python file. Since the agents are supposed to use the storage to pull down the flow code for running.
    n
    2 replies · 2 participants
  • j

    Josh Pitts

    01/10/2021, 2:17 AM
    Hi all, I’m new to Prefect, and we are hoping to use build out our data workflows using prefect. I am trying to run the local file example from the docs:
  • j

    Josh Pitts

    01/10/2021, 2:17 AM
    @task(target="func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
    def func_task():
        return 99
  • j

    Josh Pitts

    01/10/2021, 2:17 AM
    but no file is created.
  • j

    Josh Pitts

    01/10/2021, 2:18 AM
    when I run this in a flow via flow.run() Can anyone advise where to explore? The docs are not so clear on this.
    j
    4 replies · 2 participants
  • j

    Josh Pitts

    01/10/2021, 2:30 AM
    Though that works to set
    PREFECT__FLOWS__CHECKPOINTING=true
    , what is actually happening regarding
    target
    ,
    checkpoint=True
    , and
    result=LocalTarget()
    ? or are those just ignored?
    j
    7 replies · 2 participants
  • j

    Josh Pitts

    01/10/2021, 3:12 AM
    Also, should this pattern be used as a good practice for a final output that is a file system file? or should we just write our own file handling
    j
    4 replies · 2 participants
  • d

    Danny Vilela

    01/10/2021, 11:35 PM
    Hi everyone! I’m trying to better understand the Great Expectations plugin for Prefect. I currently have a ETL pipeline I’d like to Prefect-ify into tasks, which doesn’t sound too difficult. I would also want to use Great Expectations to declare expectations around any given task’s inputs and outputs (e.g., assert some input’s column is unique, or that the output table doesn’t drop any rows). My question is: is this when I’d use the Great Expectations plugin? As far as I can tell, I don’t need the heavier offerings from Great Expectations; just inline data assertions to make sure my pipeline is “correct”. Please let me know if this question is better suited for a different channel or the Great Expectations Slack group. Thank you!
    d
    4 replies · 2 participants
  • p

    Pedro Machado

    01/11/2021, 4:04 AM
    Hi there. I am having trouble with shell tasks and logging. For example, when I run
    DbtShellTask
    , it doesn't show output why it's running or when it succeeds. The only time I see the detailed output is when there is a failure. I tried passing these args and it still doesn't show streamed output in the logs, not even after the task completes.
    return_all=True,
    log_stdout=True,
    log_stderr=True,
    I also set the
    PREFECT__LOGGING__LEVEL=DEBUG
    everywhere I could think of: prefect server, agent, docker storage, run config. I see DEBUG level messages on Prefect Server, but nothing from the task. I am running 0.14.2 Any ideas?
    1 reply · 1 participant
  • m

    Maurits de Ruiter

    01/11/2021, 9:54 AM
    Quick question. I'm looking for a way to start and wait for a Fargate Task. In the docs I see possibilities to start Lamda's, but nothing about ECS and Fargate. Am I correct to assume this isn't possible by default?
    a
    d
    2 replies · 3 participants
  • j

    Jarek Piotrowski

    01/11/2021, 1:27 PM
    Hello Prefect Community! I’m working on a new tool that solves the main problem we currently see in documentation: it is very far from the codebase!  That is why we develop a tool that enables linking code with documentation, so that: • developers or technical writers responsible for Prefect documentation can get notifications on Github when a code that is linked to docs changes, • new contributors, while reading documentation, can quickly view source code connected to a given functionality so that they can quickly understand the codebase. Do you think that such a tool would be useful to you? More information and preview is on our website: https://www.hastydocs.com All feedback is very appreciated! Cheers, Jarek
    d
    3 replies · 2 participants
  • v

    Vincent

    01/11/2021, 2:24 PM
    I want to ask the Dask Experts whether they can help identify a scheduler bottleneck I have experienced. I have a flow that schedules 22 parallel mapped jobs (each with 65 items) allowing for 1430 parallel jobs. When I schedule the job on an k8s cluster with 375 nodes (each with a worker that runs 4 threads), my scheduler throws messages out like the below. The CPU utilization of the scheduler also goes to 100%. These jobs ultimately die when all the workers get disconnected.
    distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
    ...
    distributed.core - INFO - Event loop was unresponsive in Scheduler for 7.07s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
    ...
    distributed.comm.tcp - INFO - Connection closed before handshake completed
    ...
    distributed.scheduler - INFO - Close client connection: Client-worker-3c6d8642-53b5-11eb-800e-32b98c347770
    When I scale the job down to 250 nodes and 3 threads per worker, I still get 100% utilization, but it is slightly more stable. where the only warning messages is
    distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
    Thanks for any advice!
    m
    d
    21 replies · 3 participants
  • e

    Equipe AI HOC

    01/11/2021, 3:06 PM
    Hello. Is there a IP whitelist for Prefect Cloud? Need to connect onprem to Cloud but I am getting blocked by firewall. Need to add Prefect to whitelist
    👀 1
    d
    e
    8 replies · 3 participants
  • k

    Kevin Weiler

    01/11/2021, 5:08 PM
    message has been deleted
  • b

    Brett Naul

    01/11/2021, 5:08 PM
    I might be missing something obvious but did the ability to filter in the UI to a given state (e.g. all Failed tasks) go away in the switch from "Gantt" to "Overview"?
    n
    6 replies · 2 participants
  • r

    Riley Hun

    01/11/2021, 6:43 PM
    Hello - Sorry if this is a bad question, but we want to use Dask Gateway as the executor for our Prefect Flows, but I believe the clusters generated by Dask Gateway delete themselves when in idle. So, if we have a flow that runs maybe once a day, the flow will likely fail the following day because there's no Dask Cluster to execute it (it shut down when not in use). Is there a workaround for this such that we can tell Prefect to start up a cluster from Dask Gateway each time it runs its flow, or do we have to ensure that the clusters from Dask Gateway do not delete themselves when in idle?
    k
    3 replies · 2 participants
  • b

    Billy McMonagle

    01/11/2021, 7:18 PM
    Hi there! Is there an environment variable that can be set on Kubernetes Agents in order to populate environment variables for flow runs managed by that agent? I see in the docs that you can run:
    prefect agent <AGENT TYPE> start --env KEY=VALUE --env KEY2=VALUE2
    Instead, is it possible to do this by setting an environment variable on the agent, similar to how labels are set?
    - name: PREFECT__CLOUD__AGENT__LABELS
      value: "[]"
    k
    5 replies · 2 participants
Powered by Linen
Title
b

Billy McMonagle

01/11/2021, 7:18 PM
Hi there! Is there an environment variable that can be set on Kubernetes Agents in order to populate environment variables for flow runs managed by that agent? I see in the docs that you can run:
prefect agent <AGENT TYPE> start --env KEY=VALUE --env KEY2=VALUE2
Instead, is it possible to do this by setting an environment variable on the agent, similar to how labels are set?
- name: PREFECT__CLOUD__AGENT__LABELS
  value: "[]"
k

Kyle Moon-Wright

01/11/2021, 7:35 PM
Hey @Billy McMonagle, I believe the means to set your environment variables can be through the CLI as you mentioned, a Python client, a Run Config, or in the environment itself. I believe you can also set them with a YAML file for a container installation of the Agent.
b

Billy McMonagle

01/11/2021, 7:39 PM
Thank you @Kyle Moon-Wright... this is exactly the kind of thing I had in mind:
Any lowercase Prefect configuration key can be set by environment variable. In order to do so, prefix the variable with
PREFECT__
and use two underscores (
__
) to separate each part of the key.
For example, if you set
PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4
, then
prefect.config.tasks.defaults.max_retries == 4
.
But it doesn't look like the flow environment variables are ever set like
prefect.config.agent.env
I probably just need to get more comfortable with helm templating and do the
--env Key=VALUE
thing.
k

Kyle Moon-Wright

01/11/2021, 7:49 PM
hmm, this could be a feature request as I believe environment variables defined like
PREFECT__TASKS__DEFAULTS__MAX_RETRIES=4
will apply to all your flows globally, rather than be delegated to an individual Agent, whose env vars would need to be configured independently by one of the means mentioned above.
I’ll keep thinking on this though! There may be a hidden way I’m not thinking of…
View count: 2