https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-server
  • t

    Thomas Pedersen

    04/19/2022, 7:36 AM
    When we stop our agent, it seems that flows keep running - which is fine, but is there any way to wait for all flows to finish when stopping the agent? Would like to know that nothing is running on our agent before upgrading it.
    a
    • 2
    • 12
  • d

    Daniel Nilsen

    04/19/2022, 10:24 AM
    is there a way to hide parameters? Let's say a flow requires a secret to work, and that secret is given as a param. Can I hide that from the logs and UI?
    a
    k
    • 3
    • 9
  • a

    Anthony Harris

    04/19/2022, 5:46 PM
    Hello! Does anyone know the priority that is placed upon flow labels? For example, if we have 2 servers, each with an agent (say agent
    A
    and agent
    B
    for simplicity), and we specify that a flow can run on either
    A
    or
    B
    , how is an agent selected to execute the flow run?
    k
    • 2
    • 8
  • l

    Lana Dann

    04/19/2022, 6:49 PM
    i’d like to tag my flows but was wondering the best way to do it. i see that flows support labels but it seems like labels exist so that agents know which flows to run, but i’d also just like to tag my flows in general based on other things and be able to filter flows based on tags. i assume this isn’t supported right now but wanted to ask
    k
    • 2
    • 3
  • l

    Lana Dann

    04/19/2022, 10:43 PM
    is it possible to upload an artifact from a flow? we would like to upload the
    dbt.log
    file and
    manifest.json
    file that dbt generates after we run each flow. but i only see link artifacts and markdown artifacts in the source code
    k
    a
    • 3
    • 4
  • s

    Sharath Chandra

    04/20/2022, 5:46 AM
    Hi, I am using prefect to run multiple spark jobs on k8s. Each of these jobs can run long running with some of them have execution times of more than an hour. The jobs are mapped and executed. However in some instances I can see that only few of the jobs in the map are executing. I am using
    LocalExecutor
    which these mapped jobs are running sequentially. Are there instances where the LocalExecutor is not able to track the spark job running on k8s and thus not able to trigger the subsequent tasks in the map ?
    a
    • 2
    • 9
  • s

    Saurabh Sharma

    04/20/2022, 7:00 AM
    Hi, I am trying to use S3 storage for the flow code as follows
    STORAGE = S3(
        bucket="prefect-pipelines",
        key=f"flows/forecaster/flow.py",
        stored_as_script=True,
        # this will ensure to upload the Flow script to S3 during registration
        local_script_path=f"flow.py",
    )
    The following command is used for registering the flow:
    prefect register --project forecaster -p forecaster/
    But for some reason, all the modules under the
    forecaster
    directory is not getting uploaded under the S3 key specified. Need help in solving this. Thanks in advance!
    a
    • 2
    • 10
  • m

    Marc Lipoff

    04/20/2022, 4:19 PM
    Whats the best way to have complex trigger logic? For example, I have 3 upstream tasks. I want the target task to run if: • upstream A. is a mapped task. it must have at least 1 successful child task • upstream B is success. • upstream C can be state.
    k
    • 2
    • 7
  • l

    Lana Dann

    04/20/2022, 7:37 PM
    is there a way to always trigger a downstream task after its upstream task has completed (regardless of its state)?
    a
    • 2
    • 2
  • k

    kevin

    04/20/2022, 9:15 PM
    Can I parameterize checkpointing at the task level? ex:
    with Flow('foo') as f:
        chck = Parameter('checkpointing') # Let's say we pass in False
        do_stuff = task(some_python_callable, checkpoint=chck)()
    k
    • 2
    • 2
  • s

    Saurabh Indoria

    04/21/2022, 4:23 AM
    Hi All, Our stack: Prefect Cloud + K8s + LocalDask When trying to trigger a flow using GraphQL API (within our Python backend service), we are seeing these errors sometimes..
    requests.exceptions.RetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by ResponseError('too many 502 error responses'))
    And
    urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Read timed out. (read timeout=15)",))
    Are there any API limits, throttling, etc?
    k
    a
    • 3
    • 5
  • s

    Saurabh Indoria

    04/21/2022, 4:44 AM
    Hi All, We are recently seeing these issues while running some of our flows:
    Failed to retrieve task state with error: ClientError([{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}])
    Traceback (most recent call last):
      File "/var/www/.venv/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
        task_run_info = self.client.get_task_run_info(
      File "/var/www/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 1479, in get_task_run_info
        result = self.graphql(mutation)  # type: Any
      File "/var/www/.venv/lib/python3.8/site-packages/prefect/client/client.py", line 473, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
    Can someone please help with this? Flow run: https://cloud.prefect.io/quiltai/flow-run/f17f2795-af8f-415e-9db7-c6c0643111bd
    a
    • 2
    • 2
  • c

    Chu Lục Ninh

    04/22/2022, 8:40 AM
    Hi, I faced the problem with
    kubernetes.RunNamespacedJob
    , when run a long-running job (in my case half of the day). I got huge memory leak. My suspect is this job use
    ThreadPoolExecutor
    to poll job log every 5s using
    ReadNamespacedPodLogs
    , since ReadNamespacedPodLogs executed in other thread context, its refCount didn't reduce to 0 to trigger GC when we assign new value to
    read_pod_logs
    , and that leads to orphan object and leak the memory https://github.com/PrefectHQ/prefect/blob/14644fa5fe129b7fa8385d58df82e6f27332b04a/src/prefect/tasks/kubernetes/job.py#L773
    a
    • 2
    • 9
  • d

    Daniel Nilsen

    04/22/2022, 9:03 AM
    Is it possible to create a tenant with the graphql api?
    ✅ 1
    a
    • 2
    • 2
  • t

    Tomás Emilio Silva Ebensperger

    04/22/2022, 5:28 PM
    HI everyone, I was wondering about slack notifications (or any other type of notification), when i set up a number of retries > 0, is there a way to only trigger the notification if the task fails after all the retries.
    k
    • 2
    • 2
  • p

    Pinakpani Mukherjee

    04/23/2022, 10:32 AM
    Hello. I was trying to run prefect locally with a local postgres install. I set up the config.toml
    [server.database]
    connection_url = "<postgres://prefect:prefect@localhost:5432/postgres>"
    The problem that I am currently having now is that after putting in
    prefect server start -ep
    , Hasura is not able to connect properly and throws errors. Is there anyway I can rectify and solve this issue?
    a
    • 2
    • 1
  • s

    Sharath Chandra

    04/23/2022, 3:01 PM
    Hi, I am using prefect master flow to orchestrate multiple flows. I wanted to check if I can have a trigger the downstream flow once my upstream flow is completed irrespective of Success or Failure
    k
    • 2
    • 8
  • s

    Sharath Chandra

    04/25/2022, 4:08 AM
    Hi, I am submitting
    spark-submit
    jobs. The jobs are submitted on fire & forget mode (
    spark.kubernetes.submission.waitAppCompletion=false
    ). I have a separate monitoring task to monitor status of each job. However as number of jobs increases, the cluster gets overwhelmed. Is there a way to throttle the task submits on the mapped items?
    a
    k
    • 3
    • 3
  • a

    Alexis Lucido

    04/25/2022, 2:30 PM
    Hi all, is there a way to cancel late runs, but not going through the UI?
    k
    • 2
    • 2
  • d

    Darren Burton

    04/26/2022, 2:24 AM
    How is the best way to "chain" or execute multiple flows together within the Orion version based on completion of existing flows?
    ✅ 1
    k
    • 2
    • 1
  • s

    Sharath Chandra

    04/26/2022, 10:02 AM
    Hi, Does the state of the subflow define the state of the main flow ? I have received an error from my customers environment stating that one of subflows have failed, but the main flow still shows as successful. Unfortunately I don’t have access to environment yet.
    a
    • 2
    • 2
  • r

    Ron Meshulam

    04/26/2022, 12:16 PM
    Hi, We have a Prefect server (version 2022.04.14) deployed on a K8s cluster in GCP with Helm. I'm trying to run a flow (flow of flows) for my tests - multiple times (with different inputs). So the situation is that I'm triggering 11 flows that each of them is running a maximum of 3 flows (these flows have 3-4 tasks) Sometimes I get this error:
    requests.exceptions.ReadTimeout: HTTPConnectionPool(host='prefect-apollo.prefect', port=4200): Read timed out. (read timeout=15)
    I thought this was a scale issue so I've replicated the services as follows: • agent: 3 • UI: 2 • apollo: 3 • graphql: 2 • hasura: 2 • towel: 2 we are using an external Postgress (GCP managed) I've seen in earlier massages that I should configure: 1. PREFECT__CLOUD__REQUEST_TIMEOUT = 60 (configured it on the env in the apollo pod) 2. PREFECT_SERVER__TELEMETRY__ENABLED = false (configured it on the env in the agent pod) 3. PREFECT__CLOUD__HEARTBEAT_MODE = thread (configured it on the env in the agent pod) I've attached the values.yaml Can anyone help me what else can I do/ what could be the problem?
    values.yaml
    a
    k
    +2
    • 5
    • 41
  • l

    Lauri Makinen

    04/26/2022, 1:23 PM
    Hi, I am running prefect version 1.2.0 (server is hosted by us). I get following error when trying to fetch flows with cli. Also the prefect ui does not display any flows.
    prefect get flows                                                                                                          
    Traceback (most recent call last):
    ...
    prefect.exceptions.ClientError: [{'message': 'unexpected null value for type "String"', 'locations': [{'line': 2, 'column': 5}], 'path': ['flow'], 'extensions': {'path': '$.selectionSet.flow.args.where._and[0].project.name._eq', 'code': 'validation-failed', 'exception': {'message': 'unexpected null value for type "String"'}}}]
    :discourse: 1
    ✅ 1
    k
    a
    • 3
    • 5
  • b

    Bo

    04/26/2022, 4:17 PM
    Hi, I am storing code in GitHub, and I have some scheduled flows erroring out semi-regularly due to a github api timeout. Is there a way to increase the timeout limit and/or increase or add an exponential backoff. It also doesn't seem like the flow tries to rerun. Thanks!
    ConnectTimeout(MaxRetryError("HTTPSConnectionPool(host='<http://api.github.com|api.github.com>', port=443): Max retries exceeded with url: *redacted* (Caused by ConnectTimeoutError(, 'Connection to <http://api.github.com|api.github.com> timed out. (connect timeout=15)'))
    k
    a
    • 3
    • 16
  • m

    madhav

    04/26/2022, 8:45 PM
    Hi All, figured I'd ask here in case anyone has experience with this. We are deploying ECR containers that have all of our flows using github actions, however the github action fails running the
    aws-actions/configure-aws-credentials
    action with
    Error: Credentials could not be loaded, please check your action inputs: Could not load credentials from any providers
    . Has anyone seen this error before when trying to package their flows into a container?
    a
    j
    • 3
    • 12
  • d

    Daniel Nilsen

    04/27/2022, 10:58 AM
    Hi! I am building using the Docker storage. In the console I see these steps. In step 7, where does the file
    my-flow.flow
    come from? I do have a flow named
    my-flow
    but no file with that name.
    INFO - prefect.Docker | Building the flow's Docker storage...
    Step 1/18 : FROM prefecthq/prefect:1.0.0-python3.7
     ---> 2005944ce1c0
    Step 2/18 : ENV PYTHONPATH='$PYTHONPATH:modules/'   PREFECT__USER_CONFIG_PATH='/opt/prefect/config.toml'
     ---> Using cache
     ---> 749cf2ee64e6
    Step 3/18 : RUN pip install pip --upgrade
     ---> Using cache
     ---> 9f046e7adbe1
    Step 4/18 : RUN pip show prefect || pip install git+<https://github.com/PrefectHQ/prefect.git@1.0.0#egg=prefect[all_orchestration_extras]>
     ---> Using cache
     ---> 27acea17a687
    Step 5/18 : RUN pip install graphql-core
     ---> Using cache
     ---> 2c30c81c1d46
    Step 6/18 : RUN mkdir -p /opt/prefect/
     ---> Using cache
     ---> 5976f2fd06b9
    Step 7/18 : COPY my-flow.flow /opt/prefect/flows/my-flow.prefect
    a
    • 2
    • 1
  • g

    Greg Wyne

    04/28/2022, 10:17 PM
    Hi there! I’m trying to start up a new prefect server in kubernetes (1.2.1) and after getting everything wired up it seems to be missing a decent number of the graphql query definitions, but it does seem to have the mutations defined. For example when running this against the graphql interfact I get an error:
    query {
                tenant {
                    id
            }
        }
    
    With error: 
        GRAPHQL_VALIDATION_FAILED: Cannot query field "tenant" on type "Query".
    However I can run the mutation to create a tenant. Did I miss a step or a service somewhere? Thanks!
    a
    • 2
    • 7
  • y

    YD

    04/29/2022, 5:33 PM
    What is the best practice when several users running flows/tasks on the same machine (hadoop node) while having different users permissions then the user running the prefect agent? do we need to run multiple agents on the same machine, or is there some way to specify which user the task should run as ?
    a
    • 2
    • 6
  • e

    Egil Bugge

    05/03/2022, 7:18 PM
    Hey all! I've been playing around with setting up a Kubernetes agent in Google Kubernetes Engine which can spin up a ephemeral Dask cluster on demand. This all seems to work rather smoothly (thanks to the amazing work done by the Prefect team and others), but I'm having some issue getting the autoscaler to remove the nodes after the flow has run. I get the following error messages on my Kubernetes cluster after my flow has run: "Pod is blocking scale down because it’s not backed by a controller" "Pod is blocking scale down because it doesn’t have enough Pod Disruption Budget (PDB)" I'm pretty inexperienced with Kubernetes so I was wondering if anyone has any pointers to how I might configure the KubeCluster so that it works with autoscaling? We're thinking of using the cluster to hyperparameter tune a model. We do not use Kubernetes for anything else and have no need for the resources in between training runs so getting the node pool to autoscale down to zero (the agent will stay in a different node pool) would save us some money. My run code is below:
    a
    • 2
    • 3
  • l

    Lana Dann

    05/04/2022, 7:23 PM
    what’s the best way to call a task within a prefect flow (for integration tests)?
    flow.<method name>(<method arguments>)
    doesn’t work, so i assume we’d need to grab the task from
    flow.tasks
    and then run the task object. so my follow up question would be, what is the best way to retrieve a task by name from a
    Flow
    object?
    k
    • 2
    • 5
Powered by Linen
Title
l

Lana Dann

05/04/2022, 7:23 PM
what’s the best way to call a task within a prefect flow (for integration tests)?
flow.<method name>(<method arguments>)
doesn’t work, so i assume we’d need to grab the task from
flow.tasks
and then run the task object. so my follow up question would be, what is the best way to retrieve a task by name from a
Flow
object?
k

Kevin Kho

05/04/2022, 7:50 PM
Let me try it out
Have you seen this page though?
Like this maybe?
from prefect import Flow, task

@task
def abc(x):
    return x

@task
def bcd(x):
    return x

with Flow("..") as flow:
    a = abc(1)
    b = bcd(1)

print(flow.tasks)
print(list(flow.tasks)[0].name)
And then find the one with the name you want? But I think you can just test the task directly?
l

Lana Dann

05/04/2022, 8:02 PM
hmm, i know the name of the task i want to test but i don’t know how to test running that task specifically. i guess i can just import the task directly and run it? if you have
@task
def abc(x):
    return x
and you import
from myflow import abc
then is that a task object or just a method?
k

Kevin Kho

05/04/2022, 8:23 PM
That is a task so you can do
abc.run()
🙌 2
View count: 7