https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Joshua Greenhalgh

    11/25/2022, 11:30 AM
    Thanks @Kalise Richmond this was the case! In order to fix this I am trying to setup SLAs to kill flows stuck in running state via the automations section (using prefect 1) - doesn't seem to give me the option - I am on the standard plan which suggests this should be possible?
  • j

    James Zhang

    11/25/2022, 1:43 PM
    hey guys, I just read that in Prefect v1 there’s Prometheus integration for Tasks https://docs-v1.prefect.io/api/latest/tasks/prometheus.html#pushgaugetogateway, is the similar integration coming/available in Prefect v2?
    a
    d
    j
    • 4
    • 4
  • t

    Thuy Tran

    11/25/2022, 4:00 PM
    I have a function that has a task decorator:
    @task(cache_result_in_memory=False)
    But I'm getting the error below that it's an unexpected keyword. Not sure what I'm doing wrong. It's running on prem using version 2.6.9.
    Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "<frozen importlib._bootstrap_external>", line 883, in exec_module
      File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
      File "/opt/prefect/processor.py", line 3, in <module>
        from data_import import data_import_process
      File "/opt/prefect/data_import.py", line 8, in <module>
        from data_cleaning import cleaning_process
      File "/opt/prefect/data_cleaning.py", line 42, in <module>
        @task(cache_result_in_memory=False)
    TypeError: task() got an unexpected keyword argument 'cache_result_in_memory'
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/engine.py", line 256, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/client.py", line 103, in with_injected_client
        return await fn(*args, **kwargs)
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/deployments.py", line 69, in load_flow_from_flow_run
        flow = await run_sync_in_worker_thread(import_object, str(import_path))
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread
        return await anyio.to_thread.run_sync(call, cancellable=True)
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 193, in import_object
        module = load_script_as_module(script_path)
      File "/opt/conda/envs/prefect/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
        raise ScriptError(user_exc=exc, path=path) from exc
    prefect.exceptions.ScriptError: Script at 'processor.py' encountered an exception
    ✅ 1
    o
    • 2
    • 6
  • t

    Tibs

    11/25/2022, 5:03 PM
    Hey everyone, just want to confirm this: if I write (output1, output2) = task() in my flow, the task will be will run 2 times, and return the same output? (Prefect 1).
    a
    • 2
    • 4
  • d

    Deepak Pilligundla

    11/25/2022, 5:25 PM
    Hello All , I'm getting the following error while registering the flows ,the only change from the previous successful is we have installed the teradatasql connector on the docker image
    Traceback (most recent call last):
        File "/usr/local/lib/python3.7/site-packages/prefect/cli/build_register.py", line 134, in load_flows_from_script
        namespace = runpy.run_path(abs_path, run_name="<flow>")
        File "/usr/local/lib/python3.7/runpy.py", line 263, in run_path
        pkg_name=pkg_name, script_name=fname)
        File "/usr/local/lib/python3.7/runpy.py", line 96, in _run_module_code
        mod_name, mod_spec, pkg_name, script_name)
        File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
        exec(code, run_globals)
        File "/repo/src/flows/4i_ssp_bene_data_shrng/4i_ssp_bene_data_shrng.py", line 14, in <module>
        import snowflake.connector as sf
        File "/usr/local/lib/python3.7/site-packages/snowflake/connector/__init__.py", line 16, in <module>
        from .connection import SnowflakeConnection
        File "/usr/local/lib/python3.7/site-packages/snowflake/connector/connection.py", line 25, in <module>
        from . import errors, proxy
        File "/usr/local/lib/python3.7/site-packages/snowflake/connector/errors.py", line 18, in <module>
        from .telemetry_oob import TelemetryService
        File "/usr/local/lib/python3.7/site-packages/snowflake/connector/telemetry_oob.py", line 20, in <module>
        from .vendored import requests
        File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/requests/__init__.py", line 119, in <module>
        from ..urllib3.contrib import pyopenssl
        File "/usr/local/lib/python3.7/site-packages/snowflake/connector/vendored/urllib3/contrib/pyopenssl.py", line 50, in <module>
        import OpenSSL.SSL
        File "/usr/local/lib/python3.7/site-packages/OpenSSL/__init__.py", line 8, in <module>
        from OpenSSL import SSL, crypto
        File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 19, in <module>
        from OpenSSL.crypto import (
        File "/usr/local/lib/python3.7/site-packages/OpenSSL/crypto.py", line 3232, in <module>
        name="load_pkcs7_data",
      TypeError: deprecated() got an unexpected keyword argument 'name'
    a
    • 2
    • 5
  • e

    eddy davies

    11/25/2022, 6:10 PM
    Can prefect runs be triggered by some event rather just scheduled for a set time?
    a
    • 2
    • 6
  • t

    Thuy Tran

    11/25/2022, 7:23 PM
    I want to set a memory limit on the docker container so that it will terminate instead of consuming all the memory on the server and crashing the server. Where can I set
    --memory="[memory_limit]"
    argument to enable this?
    a
    • 2
    • 1
  • t

    Trevor Campbell

    11/25/2022, 8:11 PM
    Hi folks! I'm converting my code base to use orion instead of v1, and I noticed that the ability to raise a SKIP signal (or something like that) is missing. In my original code, I have a sequence of tasks
    A -> B -> C -> D
    . Any of them can raise a SKIP signal, and if that happens, I want to skip all downstream tasks. It isn't a failure or a success, it's more of a "I'm not ready to run this yet, so don't run anything that depends on my output yet" Is that possible to do in Orion? I saw one earlier thread here about it, but the outcome was inconclusive... • one option is just to return a cancelled state, but that seems to suggest failure (which in my case would prompt a message to the admin, which I definitely don't want to happen for SKIPs. SKIPs happen very often in my particular case -- far more common than any other outcome) • another is to return a completed state, but then I need annoying
    if
    statements everywhere checking the outcome of previous tasks (skip vs. was actually run successfully). Actually, the whole reason I started using Prefect in the first place was for its ability to easily control flows where things get skipped 😉
    ✅ 1
    t
    a
    • 3
    • 13
  • a

    Anqi Lu

    11/28/2022, 4:03 AM
    Hi team, I have a flow containing multiple tasks, which are intensive in both computation and memory. Due to the essence of the business I want to handle, in one flow run, one task might be called several times for the same parameters or different parameters. To avoid the waste of resource and take the full usage of task result cache, I intend to limit concurrency of the task run with same parameters to 1. Other task runs with same parameters should wait in Pending or AwaitRetrying state. I have implemented this behavior in Prefect 1.x, by wrapping the exact task function with a Redis lock context manager. If a task run acquires the lock, then it could continue with the Running state. Otherwise, it raises a Retry signal, enters Retrying state, retries until it acquires the lock, and returns with cache. My query: is that possible to do this with Prefect 2.x? Can I manually turn a task run's state to Pending or Scheduled during flow runtime? (I read the code and I know Prefect uses a set of rules and core policies to manage the transition between states. I wonder if there is anyway to overwrite/inject rules from client side.) Also, I know we have concurrency limit control by tags in Prefect 2. But is there a feasible way to create/destroy the tags and concurrency limits attached to the tags based on "task name + parameters" during runtime? Much appreciated for any advice.
    a
    v
    • 3
    • 10
  • m

    Mahesh

    11/28/2022, 8:27 AM
    Hi Team, with prefect2.0 how to send task's error log to the slack on failure
    j
    m
    • 3
    • 6
  • s

    Sylvain Hazard

    11/28/2022, 8:43 AM
    Hi there ! Go some weird issue trying out flows locally. I'm running a local orion with
    prefect orion start --port 5000
    as well as the very simple flow copied below. Running the flow with
    python log_flow.py
    randomly ends up crashing with this error :
    RuntimeError: The connection pool was closed while 2 HTTP requests/responses were still in-flight
    . Is this an issue related to the server ? Am I forgetting to await something ? Anything I could do to fix it ?
    👀 1
    p
    m
    m
    • 4
    • 28
  • m

    Malavika S Menon

    11/28/2022, 10:35 AM
    How can I give a custom name to a flow-run when the flows are being created and called dynamically?
    ✅ 1
    j
    • 2
    • 1
  • p

    Puneetjindal 11

    11/28/2022, 10:58 AM
    Hello Team , i wanted to know what is the pricing for per flow run in Personal and Organization Plan
    ✅ 1
    j
    • 2
    • 1
  • a

    Andreas Nord

    11/28/2022, 11:09 AM
    Hi! I want to access my Prefect Cloud secrets in a jupyter notebook:
    Secret.load("secret")
    this was working on Prefect 1 but on 2 I get this error:
    RuntimeError: A 'sync_compatible' method was called from a context that was previously async but is now sync. The sync call must be changed to run in a worker thread to support sending the coroutine for 'load' to the main thread.
    Any ideas? Thanks
    ✅ 1
    j
    • 2
    • 13
  • d

    Ditlev Stjerne

    11/28/2022, 11:13 AM
    Hello Prefect community! I’m trying to figure out if Prefect is the correct service for my use-case and would like to have some input. My currect setup is using a AWS ECS cluster. I want to have serverless setup of flow runs. This means that I want to be able to schedule a flow to run but when the work is done close down the ecs task running on the cluster as these workflows will only run once a day and will run for less than an hour. Correct if my assumptions are wrong. The way that prefect works is be creating an agent running on ECS and listening on the work queue for a flow to be executed. As I want to create a serverless approach of this my solution would be to create a single ECS agent listening for work and the workflows defines for this ECS task is to invoke the boto3 API and run a different ECS task, which executes my specified workflow. When the executing is done close the ECS task. This for me seems like a workaround. Is there a smarter way to go about this setup?
    ✅ 2
    :aws: 2
    t
    a
    r
    • 4
    • 10
  • e

    Esdras Lopes Nani

    11/28/2022, 1:15 PM
    Hi! My queue was unhealthy with flow lates. I opened tbe agent log and got this error below.
    The above exception was the direct cause of the following exception:
    Traceback (most recent call last):
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/agent.py", line 154, in get_and_submit_f>
        queue_runs = await self.client.get_runs_in_work_queue(
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/client/orion.py", line 763, in get_runs_>
        response = await <http://self._client.post|self._client.post>(
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1842, in post
        return await self.request(
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1527, in request
        return await self.send(request, auth=auth, follow_redirects=follow_redirects)
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/prefect/client/base.py", line 160, in send
        await super().send(*args, **kwargs)
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1614, in send
        response = await self._send_handling_auth(
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1642, in _send_handling_>
        response = await self._send_handling_redirects(
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1679, in _send_handling_>
        response = await self._send_single_request(request)
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_client.py", line 1716, in _send_single_re>
        response = await transport.handle_async_request(request)
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_transports/default.py", line 353, in hand>
        resp = await self._pool.handle_async_request(req)
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/contextlib.py", line 131, in __exit__
        self.gen.throw(type, value, traceback)
      File "/home/ubuntu/anaconda3/envs/prefect/lib/python3.8/site-packages/httpx/_transports/default.py", line 77, in map_h>
        raise mapped_exc(message) from exc
    httpx.LocalProtocolError: Invalid input ConnectionInputs.SEND_HEADERS in state ConnectionState.CLOSED
    Currently Prefect is deployed on EC2 with agent running as process with systemd. Prefect version is 2.6.9 Tnks!
    ✅ 1
    m
    • 2
    • 1
  • b

    Bigya Man Pradhan

    11/28/2022, 1:47 PM
    Hi all, Just a random question. Is it possible for a task to know if the flow was scheduled or manually triggered ? Both in prefect v1 and v2. Example goal: Some notifications needs to be sent only if the flow was a scheduled task and not manually triggered.
    ✅ 1
    j
    • 2
    • 2
  • n

    Nic

    11/28/2022, 2:56 PM
    Can i somehow create tasks in azure devops whenever a flow fails? I've setup slack notifitations. Possible i could intercept emails or slack notifs and create tasks based on these, but is there a smarter option?
    ✅ 1
    :azure: 1
    s
    • 2
    • 4
  • w

    Wojciech Kieliszek

    11/28/2022, 2:59 PM
    Hi, in Box.com as a part of the deployment pipeline for Prefect v1 flows we use a flow’s
    serialized_hash()
    value as an
    idempotency_key
    and as a docker storage tag. That way we don’t bump the versions when there is no change in a flow “schema”. But this mechanism allows us also to redeploy a docker image for the same flow version (no change in
    idempotency_key
    ) to change logic of particular tasks when there is no change of the “contract”. So for long running flows runs we can change their behaviours to some extent during their execution or resume them after a failure with new bug-free logic. We are now in transition to Prefect v2. Is there any kind of similar mechanism available in Prefect v2?
    m
    • 2
    • 1
  • j

    Joshua Grant

    11/28/2022, 3:24 PM
    How do you run a flow locally with a custom context in Prefect v2?
    ✅ 1
    k
    • 2
    • 5
  • t

    Thomas Opsomer

    11/28/2022, 3:41 PM
    Hello 🙂 We regularly experience a strange behavious in prefect 1.0 where some tasks fails or get stuck but prefect doesn't report them as failed. The logs look like this:
    ...
    some logs from the running task
    ...
    Downloading flow-name/2022-11-25t14-47-58-727718-00-00 from bucket
    Beginning Flow run for 'flow-name'
    Task 'accounts.link': Starting task run...
    ...
    Flow run RUNNING: terminal tasks are incomplete.
    b
    • 2
    • 35
  • s

    Sean Conroy

    11/28/2022, 3:52 PM
    Hello everyone, I'm using Prefect 2.6.8 on GCP, and I'm seeing sqlite errors like this pretty frequently:
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table flow_run has no column named state_timestamp"
    prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
    File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 267, in on_shutdown
    EVENT_LOOP_GC_REFS.pop(key) KeyError: 140157491380864
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: flow
    Has anyone see this kind of things before?
    b
    a
    • 3
    • 3
  • d

    Denis

    11/28/2022, 4:10 PM
    Hello, Can someone explain how ConcurrentTaskRunner.submit works. example given:
    # len(param_dict) => 100
    for params in param_dict:
        copy_files.submit(params)
    will this create 100 threads or is it using async to run it concurrently in one thread. The reason I am asking is because we're facing issues of hanging flown runs when we submit a larger number of tasks, (in last attempt we had cca 48 tasks and flow just hanged) Any clarification on how it works would be appreciated.
    j
    m
    +4
    • 7
    • 26
  • j

    Jon Young

    11/28/2022, 5:30 PM
    hi all, is there a way to type cast parameters in prefect 1? im hopping around a couple github issues that address this but seems no solution was ever implemented? https://github.com/PrefectHQ/prefect/pull/1682
    k
    • 2
    • 2
  • j

    Jon Young

    11/28/2022, 6:19 PM
    in prefect 1, I am trying to use
    get_task_run_result
    to get the results of a task called within a
    resource_manager
    . Seems the resource manager cleans up the task's result so it is unavailable to another flow?
    └── 12:43:06 | ERROR   | Task 'get_task_run_result': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/tasks/prefect/flow_run.py", line 239, in get_task_run_result
        return task_run.get_result()
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 79, in get_result
        self._result = self._load_result()
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 85, in _load_result
        self._load_child_results()
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 139, in _load_child_results
        task_run._assert_result_type_is_okay()
      File "/Users/jonyoung/Dev/workflows/.venv/lib/python3.9/site-packages/prefect/backend/task_run.py", line 162, in _assert_result_type_is_okay
        raise ValueError(
    ValueError: The task result has no `location` so the result cannot be loaded. This often means that your task result has not been configured or has been configured incorrectly.
    m
    • 2
    • 9
  • j

    Joshua Grant

    11/28/2022, 6:36 PM
    What is the purpose in Prefect v2 of the
    context
    key for the payload in https://orion-docs.prefect.io/api-ref/rest-api/#/Deployments/create_flow_run_from_deployment_deployments__id__create_flow_run_post? Specifically interested in use cases for configuring this.
    ✅ 1
    m
    • 2
    • 3
  • m

    Mihai H

    11/28/2022, 9:11 PM
    Just quick question... is it possible to control SQLAlchemy/PostgreSQL connections pool size from prefect?
    m
    j
    +3
    • 6
    • 32
  • p

    Philip MacMenamin

    11/28/2022, 10:01 PM
    https://docs-v1.prefect.io/core/concepts/configuration.html#user-configuration mentions an environment variable:
    PREFECT__USER_CONFIG_PATH
    Is the expected behavior once this is exported, eg
    export PREFECT__USER_CONFIG_PATH=/a/b/c
    the
    .prefect
    dir should be at
    /a/b/c/.prefect
    ?
    By default, Prefect will look for a user configuration file at
    $HOME/.prefect/config.toml
    , but you can change that location by setting the environment variable
    PREFECT__USER_CONFIG_PATH
    appropriately. Please note the double-underscore (
    __
    ) in the variable name
    • 1
    • 4
  • a

    An Ninh Vũ

    11/29/2022, 2:07 AM
    Hello everyone, I have a problem with running tasks with parameters. Can someone help me :>> Lately, I tried to run a flow consisting of parameters. The parsing parameters are of type Parameter (
    from prefect import Parameter
    ). Let's say I have the parameter "`account_id`", with an int value type. This parameter is parsed when I run the flow with
    account_id = Parameter("account_id", default=None)
    . After that I have a task that is running with a timeout. And in that task's state handler, I reuse the parameter
    account_id
    and parse that parameter into a dictionary (code is below). Here is where the error comes up, as `account_id`'s type is Parameter, but it doesn't accept a parameter in a dictionary if you want to
    json.dumps
    that dict. My question is (also TL;DR): How can I turn this parameter's type into an int type (originally prefect.Parameter type) to use in
    json.dumps
    ? P/s: Is there another way to do what I'm trying to achieve? Thank you so much! P/ss: My Prefect version: 0.14.22
    def timeout_state_handler(task, old_state, new_state):
        if isinstance(new_state, state.TimedOut):
            logger.error("rerun this flow")
            flow = StartFlowRun(flow_name='rerun_flow', project_name='test')
            run_config = DockerRun(
                image="mydocker/test-project:latest",
                env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"}
            )
            flow.run(
                parameters=dict(account_id=account_id, dates=dates),
                run_config=run_config, run_name="RERUN_FLOW",
                idempotency_key=gen_idempotency_key(account_id)
            )
            raise signals.SUCCESS("This task timed out with status SUCCESS. Re-run this flow.")
    • 1
    • 1
  • k

    Kishan

    11/29/2022, 3:21 AM
    Im having issues with executing a flow run from the Prefect UI (local server) deployment. I can execute a deployment from python successfuly, but for some reason can't from the UI (it says the flow crashed). Below is the error I am getting. It looks like it is not able to retrieve the flow from the storage block. I got the error before setting up any storage block then I setup a GCS storage block and I'm still getting the same error. Any thoughts here?
    Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
    10:18:46 PM
    Crash detected! Execution was interrupted by an unexpected exception: TypeError: object NoneType can't be used in 'await' expression
    10:18:46 PM
    Crash details:
    Traceback (most recent call last):
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 1332, in report_flow_run_crashes
        yield
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 357, in begin_flow_run
        flow_run_context.result_factory = await ResultFactory.from_flow(
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 161, in from_flow
        return await cls.default_factory(
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 123, in default_factory
        return await cls.from_settings(**kwargs, client=client)
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 229, in from_settings
        storage_block_id, storage_block = await cls.resolve_storage_block(
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 257, in resolve_storage_block
        or await storage_block._save(is_anonymous=True, overwrite=True)
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/opt/homebrew/lib/python3.10/site-packages/prefect/blocks/core.py", line 751, in _save
        await self.register_type_and_schema(client=client)
    TypeError: object NoneType can't be used in 'await' expression
    k
    • 2
    • 1
Powered by Linen
Title
k

Kishan

11/29/2022, 3:21 AM
Im having issues with executing a flow run from the Prefect UI (local server) deployment. I can execute a deployment from python successfuly, but for some reason can't from the UI (it says the flow crashed). Below is the error I am getting. It looks like it is not able to retrieve the flow from the storage block. I got the error before setting up any storage block then I setup a GCS storage block and I'm still getting the same error. Any thoughts here?
Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
10:18:46 PM
Crash detected! Execution was interrupted by an unexpected exception: TypeError: object NoneType can't be used in 'await' expression
10:18:46 PM
Crash details:
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 1332, in report_flow_run_crashes
    yield
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/engine.py", line 357, in begin_flow_run
    flow_run_context.result_factory = await ResultFactory.from_flow(
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 161, in from_flow
    return await cls.default_factory(
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 123, in default_factory
    return await cls.from_settings(**kwargs, client=client)
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 229, in from_settings
    storage_block_id, storage_block = await cls.resolve_storage_block(
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/results.py", line 257, in resolve_storage_block
    or await storage_block._save(is_anonymous=True, overwrite=True)
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/homebrew/lib/python3.10/site-packages/prefect/blocks/core.py", line 751, in _save
    await self.register_type_and_schema(client=client)
TypeError: object NoneType can't be used in 'await' expression
k

Khuyen Tran

11/29/2022, 4:43 PM
Can you show the example of your flow code?
View count: 5