https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • b

    Ben Muller

    11/17/2022, 12:57 AM
    Hey if I have a flow like so:
    @flow
    def my_flow(
        start: str = (datetime.utcnow() - timedelta(days=7)).strftime("%Y-%m-%d"),
        end: str = datetime.utcnow().strftime("%Y-%m-%d"),
    ):
    Will these default parameters be set at deploy time or will they update at run time ?
    ✅ 1
    r
    m
    • 3
    • 17
  • n

    Nick Batchelder

    11/17/2022, 1:03 AM
    Hey all, is it possible to add custom k8s labels to a flow run at the time a flow run is created? I see these three labels are set by default, but it would be super handy if I could set custom labels like
    username=nick
    at runtime.
    a
    • 2
    • 1
  • d

    Deepanshu Aggarwal

    11/17/2022, 8:15 AM
    hi! im running into this error
    httpx.RemoteProtocolError: <ConnectionTerminated error_code:ErrorCodes.NO_ERROR, last_stream_id:19999, additional_data:None>
    anyone who faced similar issue ? adding the trace stack in comments
    k
    • 2
    • 6
  • b

    Ben Muller

    11/17/2022, 8:31 AM
    Hey - what does it mean when we get a message saying:
    lock document has schema checksum sha256:96937dc8f6bd314a1cbeae7623159848872249e742b3bac8d7af39da4e2dfe79 which does not match the schema checksum for class 'ECSTask'. This indicates the schema has changed and this block may not load.
    How do i debug and or make sure everything is all good?
    ➕ 1
    ✅ 1
    j
    • 2
    • 1
  • m

    Mohit Singhal

    11/17/2022, 8:41 AM
    Hi team, I am new to prefect and I have a question. If I am running a flow daily and it failed for yesterday and now when I will be retrying it today, is there any date parameter which gives me yesterday's date instead of today's date as I will be fetching some values based on date from the api in my flow.
    ✅ 1
    r
    • 2
    • 9
  • m

    Manuel Garrido Peña

    11/17/2022, 9:32 AM
    hey folks, maybe this is a stupid question, but Why isnt there a way to trigger a dag from the CLI with custom parameters in prefect 2.0? in 1.0, you could do
    params= X=y
    , why did we remove such awesome feature?
    ✅ 1
    p
    • 2
    • 2
  • z

    Zinovev Daniil

    11/17/2022, 11:26 AM
    Hi all. I had a problem right after installing prefect orion. UI is not loading. I am getting a blank page with a title. JS is not fully loaded. It breaks at 1.1 mb. Perhaps someone solved such a problem?
    ✅ 1
    t
    b
    • 3
    • 7
  • r

    Rajeshwar Agrawal

    11/17/2022, 1:27 PM
    Hey @Prefect, One of the persistent issues that we were facing with prefect V1 was the unreliability around heartbeats and lazarus issues. This was promised to be fixed in v2, for which I recently did a quick evaluation of release 2.6.7 (community). Unfortunately, my initial investigation doesn’t seem to give great results. For example, any changes to k8s pod running a KubernetesJob type flow (such as pod restarts, pod terminated due k8s operations etc) do not show up as appropriate status updates on prefect flow. For instance, if a pod is terminated, the flow status remains Running indefinitely (flow timeouts wont solve the issue here, as our flow execution times follows the bell curve). I believe a combination of hearbeat and lazarus in v1 worked around this by marking flows which didn’t report back for a long time as crashed, which were retried by Lazarus. A similar functionality seems to be completely missing in v2. A github issue for this also exists https://github.com/PrefectHQ/prefect/issues/7371#issuecomment-1295604573 Is this something that’s already on the roadmap, which can expect to be released in near future? Are there any other flow execution patterns, that would work with k8s compute infrastructure, which solves the above mentioned problem? Here are some other observations that I found in v2, which are different from v1 1. It’s not possible to Cancel a flow run. We can delete it, but that does not terminate the underlying k8s job associated with the run. It seems that delete will only stop tracking the status of flow run. 2. The flows are no longer organised in projects. Previously, were able to do a separate deployment of a single flow into multiple projects, which allowed us to mimic multi-tenant capabilities, and manage different versions of same flow for different tenants. A similar functionality exists in v2, which allows multiple deployments for each flow, however this is less intuitive from multi-tenant perspective, as its more convenient to view and manage all flows of single project at once place, rather than manage different deployments of different tenants within the same flow 3. Date based filtering of logs of flow run is not supported. We need to keep scrolling down to reach the last line of log, this is not ideal for long-running flows with large log output 4. UI Issue - While working with Kubernetes Job Block, I observed that its not possible to unset any optional fields. 5. The time elapsed shown on UI remains at 0 secs unless flow reaches a terminal state
    🎯 3
    💅 1
    b
    • 2
    • 1
  • j

    Jared Robbins

    11/17/2022, 2:06 PM
    I’m looking at the code and don’t see any reason my mattermost webhook wont work for notications. I’m able to use the webhook from the container that prefect is running in, but it just doesn’t send anything for flow state changes. Where could i investigate to see if there is an error log somewhere? I’m labeling it as a slack webhook Edit: i tested apprise and found the issue is there.
    ✅ 1
    b
    • 2
    • 4
  • j

    Jon Young

    11/17/2022, 2:19 PM
    hi, I'm on prefect 1 using this pattern to pass a task's result from a child flow to a parent flow: https://discourse.prefect.io/t/how-to-pass-data-from-a-child-flow-to-another-task-in-a-parent-flow/126 the child flow is mapped with
    create_flow_run.map
    . How to get the results of the mapped flow? I'm getting this error:
    └── 09:11:14 | ERROR   | Task 'get_task_run_result': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 220, in get_task_run_result
        flow_run = FlowRunView.from_flow_run_id(flow_run_id)
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/flow_run.py", line 585, in from_flow_run_id
        flow_run_data = cls._query_for_flow_run(where={"id": {"_eq": flow_run_id}})
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/flow_run.py", line 627, in _query_for_flow_run
        result = client.graphql(flow_run_query)
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/client/client.py", line 464, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'message': 'parsing UUID failed, expected String, but encountered Array', 'locations': [{'line': 2, 'column': 5}], 'path': ['flow_run'], 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
    ✅ 1
    b
    • 2
    • 3
  • t

    Taylor Curran

    11/17/2022, 2:47 PM
    @Kehinde Adewusi Has the question: When using Prefect 1 databricks integration, is it possible return the logs from the databricks jobs so that they show up in the Prefect logs in the Prefect UI? Can flow run logs be sent from Prefect to elastic search?
    👍 2
    1️⃣ 1
    k
    k
    • 3
    • 2
  • k

    Kirill Popov

    11/17/2022, 3:21 PM
    hi folks say if I want to delete ALL crashed flow runs or filter by some other condition & delete all flow runs how do I do it? now I am only able to get 200 flow runs in python… not even sure how does it decide which 200 setting up filters for flows in the query is virtual impossible for me (FlowRunFilter, FlowRunFilterState, …. a giant mess doesn’t work) please tell me I am doing it all wrong and there is an easy way? prefect 2 + prefect cloud
    k
    a
    • 3
    • 8
  • j

    Jai P

    11/17/2022, 4:18 PM
    👋 curious if
    retry_on
    functionality is going to be added to Prefect 2 tasks as well? I see it was added in prefect 1.2
    ✅ 1
    k
    • 2
    • 2
  • j

    jack

    11/17/2022, 5:58 PM
    With prefect 2, where does one go in the UI to see which agents are connected? (I started an agent and want to make sure it's pointed at the right place.)
    k
    j
    e
    • 4
    • 4
  • f

    Fernando Silveira

    11/17/2022, 6:21 PM
    👋 Hi all I'm evaluating Prefect 2 for a few ML data pipelines in my team and I'm a little confused about what the right way to achieve what I need. This feels like a really basic use-case so I'm sure I'm just missing something obvious. To make things simple, I have an initial data ingestion flow that brings data into S3 (let's call it
    ingestion_flow
    ). A couple of notes about this: •
    ingestion_flow
    runs every day and to ingests a data partition for that date. The flow is parameterized with the
    date
    to ingest. • the flow is organized into a few tasks to ingest data from different clients. So the flow just runs a for loop running
    client_ingestion_task(client, date)
    for each
    client
    Once ingested this data is useful for a bunch of downstream pipelines. So I'd like to re-use
    ingestion_flow
    as a starting point for two other flows, say,
    training_flow_1
    and
    training_flow_2
    . The way I thought this could be accomplished was by simply running
    ingestion_flow
    as a subflow at the beginning of both
    training_flow_{1,2}
    . Obviously I don't want to ingest the same data twice. I created a caching key for
    client_ingestion_task
    using the
    client
    and
    date
    parameters. However I quickly stumbled upon the fact that task caches only rely on local storage. I'm running these flows in kubernetes and I was hoping for flows to run concurrently ensuring that ingestion only happens once and the cached status gets re-used in the second flow to run. The fact that task caches are restricted to local storage tells me this is not the way to implement what I wanted. Can someone point me in the right direction here? Happy to discuss more details if that's needed.
    ✅ 1
    k
    • 2
    • 2
  • h

    Heather DeHaven

    11/17/2022, 6:41 PM
    Hi, Is there a way to cancel all running flow runs of a flow from the Prefect 1.0 UI? Thank you.
    d
    • 2
    • 2
  • k

    Kevin Wang

    11/17/2022, 7:14 PM
    I find myself running scripts that set 'stuck' flow runs to Cancelled or Crashed.. Is there some more automatic way to recover from that? It can cause jams, because stuck jobs eat up queue concurrency. Let me know if Discourse or elsewhere is the right place to ask. I also found this Github issue.. how do people handle this in cases of crashed Kubernetes or ECS tasks?
    • 1
    • 1
  • v

    Vishy ganesh

    11/17/2022, 8:53 PM
    I was trying to use the prefect-aws bundle to upload into the S3 Bucket. I see the S3Bucket object has a
    basepath
    option but, s3_upload has no basepath.. How do we associate a basepath to the s3_upload option Essentially I don't want the files to land in the bucket root
    ✅ 1
    • 1
    • 1
  • j

    Jean-Michel Provencher

    11/17/2022, 9:24 PM
    For those interested on trying Prefect 2, here's a quick repository a did to follow the prefect tutorial more easily on your computer. Simply do a
    git clone <https://github.com/jmprovencher/prefect-tutorial>
    and follow the steps in
    README.md
    and
    TUTORIAL.md
    :party-parrot: 1
    👍 1
    k
    r
    • 3
    • 2
  • e

    Erik Mathis

    11/17/2022, 9:37 PM
    howdy! Is there a way to kick off a flow run via the prefect libs like outside of a flow
    ✅ 1
    a
    n
    • 3
    • 5
  • k

    Kalise Richmond

    11/17/2022, 9:59 PM
    Anyone else loving the dark mode on Prefect Docs? 🙌 🌃 https://prefect-community.slack.com/archives/C03D12VV4NN/p1668187885409289
    🔥 1
    🌌 1
  • m

    Madison Schott

    11/17/2022, 10:55 PM
    Getting this error with the fivetran syncs when I move
    await
    from the function, any ideas?
    A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
    n
    • 2
    • 11
  • b

    Ben Muller

    11/18/2022, 12:51 AM
    Hey Prefect - I have one agent and one queue, I ran a loop over some parameters to break up some work and called
    run_deployment
    to essentially launch 30 concurrent ecs tasks. What I notice is that 5 of the deployment runs start and the other 25 are pending. How do I make this behave similar to prefect 1.0 where I was able to have all of these run at the same time ? Is this a limitation of my agent ? I was under the impression all the agent does is orchestrate the ecs tasks in my aws environment?
    a
    • 2
    • 39
  • h

    Howard

    11/18/2022, 6:05 AM
    Hello Prefect! I followed the contributing page (newest prefect 2.6.8), but when I run "prefect dev start", I got some error like "AttributeError: 'ArgumentInfo' object has no attribute 'replace'"
    m
    • 2
    • 9
  • a

    Andreas Nigg

    11/18/2022, 7:27 AM
    Hey folks, I just had a flow run failing because of "prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API." This flow runs several times per day and always succeeded before and also after this incident. The good thing for me: The flow entered failed state and als the logs show an exception - but the flow itself continued after the exception and all the "results" of the pipeline were achieved. Looking at the Exception (in the Thread), is there something I'm missing, or is this maybe something for a github issue? (prefect 2.6.6, prefect cloud)
    t
    m
    • 3
    • 4
  • t

    Tim-Oliver

    11/18/2022, 11:03 AM
    Hi, I am testing logging from
    DaskTaskRunner
    workers with the current main-branch version of Prefect, which works now locally thanks to some very recent changes (thanks a lot 💐). However, I am having troubles getting the logs from tasks which are running on a
    DaskTaskRunner
    which uses
    dask_jobqueue.SLURMCluster
    . The logs from tasks are written into the slurm-output file, but not propagated back to the flow-log or the cloud UI. Happy, to test some things if it would be helpful.
    t
    m
    • 3
    • 17
  • j

    Joshua Greenhalgh

    11/18/2022, 12:02 PM
    Is there an issue with the cloud API today? I have agent running which has suddenly (since ~9.20 UTC) stopped picking up flow runs - the agent is up and running - it did have the following error (but killed the agent and restarted since)
    WARNING:urllib3.connectionpool:Retrying (Retry(total=5, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ReadTimeoutError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Read timed out. (read timeout=15)")': /
    k
    • 2
    • 6
  • t

    Tushar Gupta

    11/18/2022, 12:50 PM
    Hi Everyone! I need your help . I want to run my code as mentioned below: 1. Expectations: It takes 3 Parameters config_name,table_name,table_key and when I pass it to get_config(get_config is a Prefect Task) it should fetch some data from dynamoDb then pass it to run_config_processor(also a prefect Task) 2. Actual: It is calling run_config_processor without getting the data from get_config. It is running fine on my local.
    👀 1
    m
    • 2
    • 5
  • v

    Vadym Dytyniak

    11/18/2022, 1:53 PM
    Hi. We are trying new agent flag released in Prefect 2.6.8 -
    prefetch-seconds
    . It doesn't work like we expected. Details in the thread.
    a
    m
    • 3
    • 23
  • d

    Dave

    11/18/2022, 3:19 PM
    Hello, thank you for the amazing work on Prefect! I'm wondering whether there's a simple way to set the logLevel for all tasks and flows within Python. Specifically, I'm seeking a way to do this that doesn't leverage logging configuration files or use
    get_run_logger().setLevel(level)
    within each task and flow. Would this require overriding flow and task initiation, or similar? Thank you in advance for any help!
    ✅ 1
    r
    r
    • 3
    • 9
Powered by Linen
Title
d

Dave

11/18/2022, 3:19 PM
Hello, thank you for the amazing work on Prefect! I'm wondering whether there's a simple way to set the logLevel for all tasks and flows within Python. Specifically, I'm seeking a way to do this that doesn't leverage logging configuration files or use
get_run_logger().setLevel(level)
within each task and flow. Would this require overriding flow and task initiation, or similar? Thank you in advance for any help!
✅ 1
r

redsquare

11/18/2022, 3:21 PM
you can use PREFECT_LOGGING_LEVEL env var
👍 1
d

Dave

11/18/2022, 3:24 PM
Thanks @redsquare! When I've attempted to use the environment variable it sometimes appears to be ignored or disregarded. My usecase involves importing a Prefect flow as a package for use within sometimes unpredictable environments.
r

Ryan Peden

11/18/2022, 3:26 PM
You can also customize by creating a custom logging configuration based on this example on GitHub. This section of the docs discusses a couple of ways to get Prefect to use your custom logging config.
You'll see in that file that you can set different levels per logger, so you might find that an ideal way to get exactly the log levels you need
That logging.yml file is also the default Prefect uses. It looks like the various loggers get their levels from a couple of different environment variables, which might explain the behavior you're seeing when you set
PREFECT_LOGGING_LEVEL
. Some loggers use that, and some use
PREFECT_LOGGING_SERVER_LEVEL
.
👍 1
d

Dave

11/18/2022, 3:38 PM
Thank you @Ryan Peden! My latest attempts used the following block to attempt to force environment variable-based logging acknowledgement. I noticed that direct python execution portrayed no logs correctly, but that other environments (conda / jupyter lab) refused to obey the logging settings. I missed the
PREFECT_LOGGING_SERVER_LEVEL
, which might explain the inconsistent behavior, unsure. I'll give that a try. Open to other suggestions as well.
import os
os.environ["PREFECT_LOGGING_LEVEL"] = log_level
os.environ["PREFECT_LOGGING_ROOT_LEVEL"] = log_level
os.environ["PREFECT_LOGGING_HANDLERS_CONSOLE_LEVEL"] = log_level
os.environ["PREFECT_LOGGING_HANDLERS_CONSOLE_FLOW_RUNS_LEVEL"] = log_level
os.environ["PREFECT_LOGGING_HANDLERS_CONSOLE_TASK_RUNS_LEVEL"] = log_level
r

Ryan Peden

11/18/2022, 3:53 PM
Setting them via
os.environ
may or may not work depending on when you run those commands. I think it will work if you run them before you import Prefect. Otherwise, I believe Prefect reads the settings when its module initializes - but I'm going to go double check to code to verify that.
🙌 1
d

Dave

11/18/2022, 4:10 PM
Thanks! In testing again using the
PREFECT_LOGGING_SERVER_LEVEL
environment variable forced within Python things seemed to work as expected. Appreciate the help!
r

Ryan Peden

11/18/2022, 4:10 PM
You're welcome! I'm happy to hear that does what you need.
View count: 2