https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • n

    Nicolas Gastaldi

    12/12/2022, 7:24 PM
    Hello community, absolute prefect noob here, so far i like all i have seen! kudos for the team! one silly questions; i have a python script i use to fetch data from an api and save in a remote storage, the script has a lot of recurrency and takes about 1 hour to complete i took the same script added the necessary decorators and minimal adjustment to fit prefect pattern, and when i run the script in prefect it takes 11 hours to complete is there any red flag related to performance i should be aware of that i may have completely ignored? the script is too simply for that amount of time its literally request api -> convert json to pandas -> calculate one column — repeat until its over --> save to storage as csv
    ✅ 1
    s
    a
    +2
    5 replies · 5 participants
  • s

    Scott Chamberlain

    12/12/2022, 8:32 PM
    I don’t see any docs on what the process is for updating code for flows deployed to Prefect Cloud? Is there something like
    prefect deployment update
    ? Do you just nuke your deployment.yaml file, and rerun
    prefect deployment build
    then
    prefect deployment apply foo-deployment.yaml
    ? Maybe it depends on where storage is located? Im using GCS storage.
    ✅ 1
    k
    5 replies · 2 participants
  • t

    Tomas Moreno

    12/12/2022, 10:58 PM
    in the prefect 1 graphql api, is there any way to query all tenants using one api key? or is one api key only able to query one tenant? I'd like to make a table that logs all of our flow runs and want to make sure I structure things correctly
    ✅ 1
    b
    2 replies · 2 participants
  • n

    Nikhil Jain

    12/12/2022, 11:22 PM
    Are there any examples/recipes for ECS deployment implemented in python?
    m
    5 replies · 2 participants
  • d

    Dan Marling

    12/12/2022, 11:55 PM
    Hello all - I like the radar view of tasks from within a single flow. Is there any way to visualize the tasks from subflows? I find stepping into one or many sub-flows becomes a large mental exercise to remember where I’ve navigated from within the UI. How should I go about visualizing or querying upstream or downstream dependencies with Prefect 2?
    👀 1
    ✅ 1
    k
    2 replies · 2 participants
  • a

    Ashley Felber

    12/12/2022, 11:59 PM
    I have a new issue with a deployment. I get the following error: prefect.exceptions.ParameterTypeError: Flow run received invalid parameters: - start_date: field required - end_date: field required - initial_url: field required - cursor_url: field required However, when I run the flow locally, there are no errors. For reference, this is the last line of the flow and as you can see the parameters are populated. Is there something different I need to do in the deployment?
    if __name__ == "__main__":
        extract_load_events ('20221030','20221030','<https://us1.api.clevertap.com/1/events.json','https://us1.api.clevertap.com/1/events.json?cursor=>')
    ✅ 1
    r
    3 replies · 2 participants
  • s

    Scott Chamberlain

    12/13/2022, 12:01 AM
    For running
    prefect agent …
    on a VM as a service, does anyone have a setup for injecting secrets into the
    supervisord.conf
    file for the prefect url and api key? A solution with
    chamber
    would be ideal, but curious about any solution that works
    c
    6 replies · 2 participants
  • a

    Andy Yeung

    12/13/2022, 1:22 AM
    Hi, I am trying to write the flow in local first before deploying to cloud. I found that the local db can lock occasionally if the flow is terminated in the middle. I need to remove the ~/.prefect/orion.db to recover. I wonder if there are properly way to release the lock when I terminate the flow run execution.
  • d

    Deepanshu Aggarwal

    12/13/2022, 9:12 AM
    hi! i was trying out notification on orion, i can see an option of tags there, how do i add tags to flow run ? do i add it while building the deployment or is there a parameter that can be added in the flow wrapper
    ✅ 1
    r
    6 replies · 2 participants
  • a

    Andy Yeung

    12/13/2022, 9:35 AM
    After some troubleshooting, I found that its due to the change to running subflow which propose to use asyncio.gather on the subflow. When changing the flow to async, we can not use task.submit anymore or else it will face strange behaviour which some task won’t be run. It would be good that the document put emphasis on this part. Right now the closest message I got is “Note, if you are not using
    asyncio.gather
    , calling
    submit
    is required for asynchronous execution on the
    ConcurrentTaskRunner
    .”
  • m

    Mark Nuttall-Smith

    12/13/2022, 9:56 AM
    Hey gang, I am trying to save a DbtCliProfile block that contains DatabaseCredentials for a postgres db, but am running into an error:
    dbt_cli_profile = DbtCliProfile(
        name="dbt_dwh",
        target=DEFAULT_BLOCK,
        target_configs=PostgresTargetConfigs(
            credentials=DatabaseCredentials.load(DEFAULT_BLOCK), schema="public"
        ),
    )
    dbt_cli_profile.save(name="dbt_dwh")
    TypeError: Type is not JSON serializable: URL
    Has anyone else seen this?
    1 reply · 1 participant
  • r

    redsquare

    12/13/2022, 1:46 PM
    Hi - Just upgraded to 2.7.1 and my k8's job runners are reporting the following:
    /usr/local/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
    j
    2 replies · 2 participants
  • c

    Charalampos Malathounis

    12/13/2022, 1:50 PM
    Hello I am getting the following error in job runs that take place in kubernetes. I guess that the error happens when the job fails to get the status of a task. After that error in the job, the task is being killed even though it was running without problems. I think that this error occurs when I run 3 or more jobs concurrently but this should not be a load to my nodes. Does anybody know what could be its cause and how could it be fixed?
    Task 'TaskPreprocessing': Exception encountered during task execution!
    Traceback (most recent call last):
      File "my-file.py", line 105, in run_job
        job.RunNamespacedJob(
      File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 456, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/tasks/kubernetes/job.py", line 730, in run
        job = api_client_job.read_namespaced_job_status(
      File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1393, in read_namespaced_job_status
        return self.read_namespaced_job_status_with_http_info(name, namespace, **kwargs)  # noqa: E501
      File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api/batch_v1_api.py", line 1480, in read_namespaced_job_status_with_http_info
        return self.api_client.call_api(
      File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 348, in call_api
        return self.__call_api(resource_path, method,
      File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
        response_data = self.request(
      File "/usr/local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 373, in request
        return self.rest_client.GET(url,
      File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 239, in GET
        return self.request("GET", url,
      File "/usr/local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 233, in request
        raise ApiException(http_resp=r)
    kubernetes.client.exceptions.ApiException: (500)
    Reason: Internal Server Error
    HTTP response headers: HTTPHeaderDict({'Audit-Id': '<Audit-Id>', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '<X-Kubernetes-Pf-Flowschema-Uid>', 'X-Kubernetes-Pf-Prioritylevel-Uid': '<X-Kubernetes-Pf-Prioritylevel-Uid>', 'Date': 'Tue, 13 Dec 2022 11:02:40 GMT', 'Content-Length': '150'})
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"rpc error: code = Unavailable desc = transport is closing","code":500}
  • m

    Miel Vander Sande

    12/13/2022, 3:10 PM
    Hi all! My organization is among other things using Prefect to construct and maintain semantic Knowledge Graphs. Is there anybody else working with RDF, SPARQL and the works? We've been compiling a task library and depending on the interest, make it more accessible to others.
    🙌 2
    :blob-attention-gif: 4
    🕸️ 2
  • a

    Ashley Felber

    12/13/2022, 3:19 PM
    Hello, I am trying to define dependencies between my flows in Prefect 2.0 (e.g. don't run script B until script A finishes). I am running each flow as a Fargate task. Ideally, I want to be able to continue to run the flows as separate Fargate tasks but still define dependencies. The only method i've seen in the documentation is using sub flows. But I think in this case, all flows would need to be run from 1 image. Is there a recommendation on how to define dependencies while still building separate images/tasks?
    ✅ 1
    n
    2 replies · 2 participants
  • f

    Fred Birchall

    12/13/2022, 4:04 PM
    Hey Everyone! Does somebody know if we can
    .submit
    a task and use one of the parameters as the task name? I know we can currently do
    my_task.submit(name="My Task")
    and this will show up as
    my_task-a1b2c3-0
    in the UI, but what I would like to do is:
    from prefect import flow, task
    
    @flow(name="My Flow")
    def my_flow():
        my_task("Hello")
        my_task(name="World")
    
    @task(name_from_param="name")
    def my_task(name):
        pass
    Then in the Prefect UI I will see:
    Running Flow: My Flow
    Running Task: Hello-a1b2c3-0
    Running Task: World-a1b2c3-1
    A bit of background for context, we run all of our tasks either using AWS Lambda or DBT. For Lambda, I’ve built a central function (which is decorated as a task) with a signature
    run_lambda(lambda_name, payload={})
    , and most users are using it like
    run_lambda.submit("MyLambdaFunction")
    but then in the logs all the task names are
    run_lambda-[ascii]-0
    which is annoying when things go pop, as you have to click into the task run where I have added
    <http://logger.info|logger.info>("Lambda name: %s", lambda_name)
    for traceability. For all of my personal Flows I have the following convention
    run_lambda.with_options(name="Lambda MyLambdaFunction").submit("MyLambdaFunction")
    , which gives me the desired Prefect Task names in the UI, but getting everyone else to follow suit is another matter. I’ve attempted many solutions to dynamically name a task mainly focused on custom decorators, but they have all failed in one way or another… So I wanted to see if the Prefect community has any ideas or solutions! Thanks
    k
    r
    +1
    8 replies · 4 participants
  • t

    Thomas Opsomer

    12/13/2022, 5:19 PM
    Hello, I have 2 questions for prefect v1: • Given a flow running on k8s, is it possible to retrieve its k8s Job name somewhere in prefect ? • How is the heartbeat / zombi killer feature working ? I'm still trying to fix our issue with flows forever stuck in a running state while actually failed, and I don't understand why this feature isn't triggered in our case.
    1 reply · 1 participant
  • s

    Slackbot

    12/13/2022, 5:35 PM
    This message was deleted.
  • j

    Josh

    12/13/2022, 5:35 PM
    Kind of a networking question: • I have the prefect v1 server running locally on my Macbook and can connect to it as normal using localhost:8000 or localhost:8080. • I have a separate computer (raspberry pi4) running that I want to build and run flows on • I want the raspberry pi to be able to register flows to the server running on my macbook over the local network What I need to figure out is how to make it possible for the raspberry pi to make network requests to my prefect server that I’m hosting on my mac book pro.
    2 replies · 1 participant
  • j

    Jason

    12/13/2022, 5:40 PM
    Hi gang - when migrating to 2, it looks like the deployments to an S3 block upload the entire directory? Whereas the previous behavior with 1's register was just to upload the flow. Can someone clarify this?
    ✅ 1
    r
    j
    2 replies · 3 participants
  • c

    Claire Herdeman

    12/13/2022, 5:46 PM
    Hi there! Quick question regarding Notifications, is it possible to set notification settings on a deployment programmatically, or only in the UI?
    s
    n
    3 replies · 3 participants
  • a

    Ashley Felber

    12/13/2022, 5:54 PM
    Hello, can you share documentation on how to do a deployment build from github actions?
    c
    t
    7 replies · 3 participants
  • a

    Ashley Felber

    12/13/2022, 7:21 PM
    I am trying to run a stitch job using the code exactly how it shows here. https://alessandrolollo.github.io/prefect-stitch/. I'm getting an error that says "TypeError: missing a required argument: 'credentials'".
    n
    6 replies · 2 participants
  • s

    Senthil M.K.

    12/13/2022, 8:14 PM
    Hello there, are there any documents or tools which showcase Prefect 1.x to 2.x migrations?
    ✅ 1
    c
    3 replies · 2 participants
  • k

    Kalise Richmond

    12/13/2022, 8:27 PM
    Hi Prefect Community! :marvin: We'd like to introduce you to @ask-marvin, our new GPT3-powered finely tuned chat bot where you can ask them any questions in #ask-marvin channel. Marvin is still acquiring knowledge and understanding, despite their advanced intelligence and vast experience. Big shout out to @Serina , @Nate, and @Rob Freedy for their incredible engineering skills building Marvin. Here's a few words from Marvin:
    :gratitude-thank-you: 1
    🙌 1
    👍 1
  • k

    Kalise Richmond

    12/13/2022, 8:39 PM
    EDIT: Join the #ask-marvin to use the @ask-marvin bot. This will help keep the community channel clean while Marvin is still learning :marvin:
    :marvin: 8
    :party-parrot: 1
    :blob-attention-gif: 1
    :prefect: 2
    🚀 2
    🔥 3
    ✅ 1
  • s

    Scott Chamberlain

    12/13/2022, 9:40 PM
    Is there a Prefect 2.0 equivalent for
    create_flow_run
    in Prefect 1.0 https://docs-v1.prefect.io/api/latest/tasks/prefect.html#create-flow-run ? Trying to translate to prefect 2, and this method
    create_flow_run
    is somewhat confusing - a task that creates a flow within a flow?
    ✅ 1
    m
    3 replies · 2 participants
  • p

    Parwez Noori

    12/14/2022, 3:14 AM
    We have been running prefect for quite some time now, but suddenly our kubernetes cluster keeps showing "unhealthy". I have tried the following: 1. Restarted deployment 2. Scaled the number of nodes manually 3. Scaled the size of the nodepool Has this happened to anybody else?
    c
    4 replies · 2 participants
  • e

    Edmund Tian

    12/14/2022, 4:49 AM
    What’s the minimum delay I could expect between triggering a Flow run and the first Task starting execution? And what’s the minimum delay I could expect between a Task finishing execution and the next Task in the Flow starting execution? Context: Every time a new user of my application is created, I want to trigger a Flow run that will scrape this user’s data from multiple sources and populate our database. I need to populate my database within 30 seconds of user creation. This is achievable with my current implementation of orchestrating this pipeline via a python function. I want to make sure that migrating my orchestration logic to Prefect will not add significant time to this process. Ideally I’d keep the additional time under 5 seconds. Is this achievable with Prefect?
    p
    7 replies · 2 participants
  • y

    Young Ho Shin

    12/14/2022, 5:41 AM
    Hello all. I have a question about logging: Is there a way to disable console/terminal logging while keeping logs visible on the prefect web UI? I thought something like
    PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=999 run_flow.py
    would do the job, but this seems to turn off the logs on the web UI as well as the console.
    👀 1
    ✅ 1
    b
    8 replies · 2 participants
Powered by Linen
Title
y

Young Ho Shin

12/14/2022, 5:41 AM
Hello all. I have a question about logging: Is there a way to disable console/terminal logging while keeping logs visible on the prefect web UI? I thought something like
PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=999 run_flow.py
would do the job, but this seems to turn off the logs on the web UI as well as the console.
👀 1
✅ 1
b

Bianca Hoch

12/14/2022, 8:50 PM
Hello Young, I was able to accomplish this by creating a modified logging.yml file, and storing it in
~/.prefect/
These are the contents of the file:
# Prefect logging config file.
#
# Any item in this file can be overridden with an environment variable:
#    `PREFECT_LOGGING_[PATH]_[TO]_[KEY]=VALUE`
#
# Templated values can be used to insert values from the Prefect settings at runtime.

version: 1
disable_existing_loggers: False

formatters:
    simple:
        format: "%(asctime)s.%(msecs)03d | %(message)s"
        datefmt: "%H:%M:%S"

    standard:
        (): prefect.logging.formatters.PrefectFormatter
        format: "%(asctime)s.%(msecs)03d | %(levelname)-7s | %(name)s - %(message)s"
        flow_run_fmt: "%(asctime)s.%(msecs)03d | %(levelname)-7s | Flow run %(flow_run_name)r - %(message)s"
        task_run_fmt: "%(asctime)s.%(msecs)03d | %(levelname)-7s | Task run %(task_run_name)r - %(message)s"
        datefmt: "%H:%M:%S"

    json:
        class: prefect.logging.formatters.JsonFormatter
        format: "default"

# filters:
    # Define any custom filters to drops records containing
    # sensitive information
    # my_filter:
        # class: your_module.FilterClass

handlers:

    # The handlers we define here will output all logs they receieve by default
    # but we include the `level` so it can be overridden by environment

    orion:
        level: 0
        class: prefect.logging.handlers.OrionHandler

loggers:
    prefect:
        level: "${PREFECT_LOGGING_LEVEL}"

    prefect.extra:
        level: "${PREFECT_LOGGING_LEVEL}"
        handlers: [orion]

    prefect.flow_runs:
        level: NOTSET
        handlers: [orion]

    prefect.task_runs:
        level: NOTSET
        handlers: [orion]

    prefect.orion:
        level: "${PREFECT_LOGGING_SERVER_LEVEL}"

    prefect.client:
        level: "${PREFECT_LOGGING_LEVEL}"

    prefect.infrastructure:
        level: "${PREFECT_LOGGING_LEVEL}"

    uvicorn:
        level: "${PREFECT_LOGGING_SERVER_LEVEL}"

    fastapi:
        level: "${PREFECT_LOGGING_SERVER_LEVEL}"
Essentially, I removed the console handler completely from the original default yml file. This yml file is specific to prefect versions 2.6.9 and up.
This resulted in logs being disabled in the terminal, while still sending logs to cloud
Here's a test script if you'd like to try it out as well:
import logging
from prefect import flow, task, get_run_logger

@task
def test_task():
    logger = get_run_logger()
    <http://logger.info|logger.info>("abc")

@flow
def test_logging():
    test_task()

if __name__ == "__main__":
    test_logging()

#Deployment commands to enter in the terminal
#prefect deployment build ./disable_console_log.py:test_logging -n test-logging -q logging-queue
#prefect deployment apply ./test_logging-deployment.yaml
I am interested as to why you'd like to do this, though. 👀
y

Young Ho Shin

12/15/2022, 3:05 AM
@Bianca Hoch Hi Bianca. Thanks so much for your help. Your
logging.yml
does seem to work. I was also able to get the behavior I want (i.e. INFO levels logs on orion, but ERROR logs on console) by changing to
level: ERROR
for the console handler in the default yml file. I am still a bit confused why my original try of
PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=999 run_flow.py
does not work. From the [docs](https://docs.prefect.io/concepts/logs/#logging-configuration) it seems like it should. I've tried
PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL=ERROR
and
PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL="ERROR"
, but this doesn't work either.
To answer your question about my use case: This may be more of a concern for me because my flow has ~5000 sub-tasks and the info-level logs were too verbose at the console, but still useful for debugging purposes so that I wanted them saved on orion. For example, I have a flow submission/monitoring script that displays flow/sub-task progress using tqdm while the flow is running and the logs were messing up the progress bar display.
View count: 1