https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Antti Tupamäki

    10/18/2022, 10:53 AM
    Hi is there issue how prefect 1.4 .0 handles newest dask 2022.10.0 and python3.9 becouse ci/cd pipeline does not work now when I am using those but local machine with python 3.9.5 works fine.
    r
    • 2
    • 8
  • t

    Tim-Oliver

    10/18/2022, 1:21 PM
    Hi, I'm having some issues with accessing prefect 2.0 cloud from behind a proxy. I can login and switch workspaces but
    prefect block ls
    times out. If I switch to a network without proxy everything works. Any suggestions?
    ✅ 1
    c
    m
    • 3
    • 15
  • i

    Iuliia Volkova

    10/18/2022, 2:02 PM
    hi everyone, question about Prefect 2.6.0, what is analog 'with case' in Prefect 2 or there is not anything similar? how to define a branch logic based on some check?
    ✅ 1
    r
    • 2
    • 2
  • k

    Kyle McChesney

    10/18/2022, 2:29 PM
    Rather interesting task failure. I have the following resource manager (some functions removed to keep the example simple):
    @resource_manager
    class LocalFileManager:
    
        ...
    
        def setup(self) -> 'LocalFileManager':
            self.temp_dir = Path(mkdtemp())
            self.logger = prefect.context['logger']
            <http://self.logger.info|self.logger.info>('Set up LocalFileManager @ %s', self.temp_dir)
            self._lock = RLock()
            self._cache = {}
            return self
    
        def cleanup(self, _) -> None:
            with self._lock:
                <http://self.logger.info|self.logger.info>('Cleaning up: %s', self.temp_dir)
                shutil.rmtree(self.temp_dir)
    I had a flow fail, the setup task is the root failure. Here are the logs
    17 October 2022,06:09:30 	prefect.CloudTaskRunner	INFO	Task 'LocalFileManager.setup': Starting task run...
    17 October 2022,06:09:30 	prefect.LocalFileManager.setup	INFO	Set up LocalFileManager @ /tmp/tmp1b3_t4xs
    17 October 2022,06:09:31 	prefect.CloudTaskRunner	INFO	Task 'LocalFileManager.setup': Finished task run for task with final state: 'Success'
    17 October 2022,08:55:01 	prefect.CloudTaskRunner	INFO	Task 'LocalFileManager.setup': Starting task run...
    17 October 2022,08:55:01 	prefect.CloudTaskRunner	ERROR	Task 'LocalFileManager.setup': Exception encountered during task execution!
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 884, in get_task_run_state
        logger=self.logger,
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "/usr/local/lib/python3.7/site-packages/prefect/tasks/core/resource_manager.py", line 25, in run
        return mgr.setup()
    AttributeError: 'NoneType' object has no attribute 'setup'
    17 October 2022,08:55:01 	prefect.CloudTaskRunner	INFO	Task 'LocalFileManager.setup': Finished task run for task with final state: 'Failed'
    It seems like the task ran twice and fails for the second run. This exact flow has run multiple times without issue, and this is a common resource manager used in multiple other flows.
    1️⃣ 1
  • m

    Mansour Zayer

    10/18/2022, 2:50 PM
    Hello. In Prefect 1 GraphQL, I want to query flow runs with a certain flow run parameter, but when I try to do
    parameters{...}
    I get this error
    Field parameters must not have a selection since type "jsonb" has no subfields.
    Any ideas on how I can access the subfields in parameters and use them in a where condition? Thank you
    ✅ 1
    • 1
    • 2
  • s

    Sam Garvis

    10/18/2022, 3:12 PM
    Does anyone know where I can always see the most recent version of the helm chart prefect-agent https://github.com/PrefectHQ/prefect-helm/tree/main/charts/prefect-agent It shows version 0.0.0 on this page but I don't believe that's right.
    ✅ 1
    c
    j
    • 3
    • 8
  • n

    Nace Plesko

    10/18/2022, 3:33 PM
    Hi, In Prefect 1, is it possible to somehow export the whole log file for the task as opposed to only what's shown on the screen?
    ✅ 1
    k
    • 2
    • 1
  • i

    iñigo

    10/18/2022, 3:57 PM
    Hello, Is it possible to launch Orion within python?
    ✅ 1
    k
    m
    j
    • 4
    • 5
  • j

    Jason Thomas

    10/18/2022, 4:09 PM
    #prefect-2-orion
    Hi all, question. Anything we can do in a
    @task
    can also be done in a
    @flow
    , but tasks have a limitation: they can’t call other tasks. In a complex/nested job, for every function except the top-level flow I have to think about whether it is atomic and can be a task, or whether I will want to split it out further into multiple tasks, in which case it needs to be a flow. But I realized I could just make everything a flow so I don’t have to worry about it. If I decide to split it out I can. If I don’t, no problem. Given the power and flexibility of flows, and that one arbitrary limitation of tasks, why use tasks at all? So, what am I missing here? Are tasks obsolete? What’s their unique use case?
    ✅ 1
    r
    m
    +2
    • 5
    • 6
  • t

    Tim Enders

    10/18/2022, 4:21 PM
    Prefect 2.6 has brought me back... but I am getting the occasional error like this. This looks to be a failure with the Prefect API? Can anyone shed some light on it for me? Thanks
    11:20:34.559 | ERROR   | Task run 'Get-Items-d8ed86f1-2199' - Crash detected! Request to <https://api.prefect.cloud/api/accounts/cafe3a79-624b-468d-87a7-97fde3358a01/workspaces/5d09d677-90b8-4ef8-a9be-9760b422937a/task_runs/a13a62a5-5e54-4b1d-a976-98635a6913c6> failed: Traceback (most recent call last):
      File "/home/tenders/.cache/pypoetry/virtualenvs/prefect-orion-HonJDUqB-py3.10/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
        result = func(*args)
      File "/usr/lib/python3.10/ssl.py", line 975, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:997)
    m
    z
    m
    • 4
    • 29
  • c

    Christopher

    10/18/2022, 4:35 PM
    Sorry, I'm still confused around blocks! I understand that I might want to add a block in order to centrally coordinate access to configuration inside my flow. At this point, I'm just trying to get my flow running though, using an ECS-Task block. 1. I've defined a block in Python with
    ECSTask(...).save("dev-trial", overwrite=True)
    on my dev machine. It shows up in Prefect Cloud. 2. I have created a deployment with
    prefect deployment build -n dev-trial -q dev -ib ecs-task/dev-trial -a flows/healthcheck.py:healcheck
    again from my dev machine 3. I have started a local agent with
    prefect agent start -q dev
    and triggered a job. All works. 4. I have started an agent inside a container and triggered the job, but now it fails with the error
    KeyError: "No class found for dispatch key 'ecs-task' in registry for type 'Block'."
    It seems like in the container, it's not able to resolve the block reference. But isn't that embedded inside the yaml downloaded from Prefect Cloud?
    ✅ 1
    m
    j
    • 3
    • 10
  • a

    Amir

    10/18/2022, 5:00 PM
    Hi all, Bear with me on this, but I'm a pretty new data engineer with a few Prefect flows under my belt, and have a question on best practices with regards to model deployment: Current setup is this: Agent is on AKS, file storage on Blob (we're an Azure shop). Using Prefect Cloud. What I'm trying to do: I have a model that has a child folder called Data. I'm trying to run this model using Prefect. Basically, I need to call a few API's and save the .csv's in the Data folder, run the model that takes input from the newly saved files, output some new files into the Data folder, and then I need to do some other transformations on a couple of the outputted files prior to snowflake upload. I've so far thrown the entire model folder system into the Flows folder, deployed the flow to Blob Storage, and attempted to run the model using a
    poetry run python src/driver/__main__.py
    shell_run_command. This has been a slow process as there's been a bunch of little issues arising, etc. My question: What would be the easiest way of doing all this? ie. what would be the best method to approach this problem? I feel like my method is a bit... patchy?
    k
    • 2
    • 4
  • d

    dammy arinde

    10/18/2022, 5:18 PM
    Hi all, is there a way to use prefect parameter in @contextlib.contextmanager? we have a function that's get's Snowflake credentials from AWS secrets, but at run time, we want to allow testers to be able to enter the Snowflake schema they like. so I tried to put a parameter for the schema in the flow run but it's not working.
    ✅ 1
    a
    • 2
    • 4
  • a

    Andrei Tulbure

    10/18/2022, 5:26 PM
    Has anybody migrated from 1.x to Prefect 2? And how was the journey?
    :marvin-duck: 2
    k
    o
    a
    • 4
    • 5
  • j

    Josh Paulin

    10/18/2022, 6:21 PM
    Is there an easy way to disable caching when running tests? Something like the
    disable_run_logger
    ✅ 1
    a
    m
    • 3
    • 16
  • e

    Esdras Lopes Nani

    10/18/2022, 9:07 PM
    Hi! Is there a way to concurrency limit a task running with
    .map
    ? I've tried using CLI
    prefect concurrency-limit
    tagging the task but apperently there was no effect and the limit was not respected
    m
    k
    • 3
    • 7
  • t

    Tony Piazza

    10/18/2022, 9:13 PM
    i'm trying to deploy the Prefect agent to GKE. I need a specific version of Python (3.9.13) on the image so I changed the Dockerfile as follows:
    FROM python:3.9.13
    
    COPY requirements.txt .
    
    RUN pip install -r requirements.txt
    Prefect is included in the requirements. When the job is launched, I get the following error:
    Flow could not be retrieved from deployment.
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
        flow = await load_flow_from_flow_run(flow_run, client=client)
      File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
        return await fn(*args, **kwargs)
      File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 159, in load_flow_from_flow_run
        await storage_block.get_directory(from_path=deployment.path, local_path=".")
      File "/usr/local/lib/python3.9/site-packages/prefect/filesystems.py", line 144, in get_directory
        shutil.copytree(from_path, local_path, dirs_exist_ok=True)
      File "/usr/local/lib/python3.9/shutil.py", line 566, in copytree
        with os.scandir(src) as itr:
    FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows'
    How do I resolve this?
    ✅ 1
    j
    • 2
    • 2
  • k

    Kevin Grismore

    10/18/2022, 10:23 PM
    Let's say I want to use
    run_deployment()
    in a flow, and I want to run multiple deployments concurrently, then move on to the next task once all the deployments have finished running. How might I do that?
    ✅ 1
    m
    • 2
    • 8
  • a

    Adam Green

    10/19/2022, 1:17 AM
    What is the expected behaviour when two Prefect agents are listening to the same work queue? We have run an experiment and found that one Agent runs the flows while the other does no work - is this expected? Will the agents ever share work between them?
    r
    • 2
    • 2
  • n

    Nace Plesko

    10/19/2022, 3:27 AM
    Hi, I'm still evaluating if Prefect is the right tool for our team and I have a question about pricing. In the
    Account
    tab it says first 20k task runs are free and the pricing page that is linked there doesn't mention anything at all about number of task runs. Could someone help me understand please how pricing works? The descriptions of our plan in the
    Account
    tab and the plans in the pricing page are completely different. I would love to make Prefect work for us but I don't have an approval for the paid version until I'm confident that Prefect is the right tool for us. With the current setup we have plenty of issues and I'd love to break down the current tasks into smaller ones. However, with that I'd most likely go over the 20k monthly limit already at this point even before we start growing.
    ✅ 1
    r
    • 2
    • 2
  • a

    Alex

    10/19/2022, 8:01 AM
    I use Prefect Cloud. A few days ago I lost my workspace and all my profile data disappeared. But the previously configured flows are working on schedule, although I don't see their statuses in the UI. Has anyone encountered something like this?
    ✅ 1
    r
    • 2
    • 2
  • m

    Matthieu Lhonneux

    10/19/2022, 8:33 AM
    Hi, I migrated from prefect v1 to prefect orion (i love the prefect orion cli 😄 thanks for that !). I have a lot of questions to ask, but I'll start with this : • Agents are not supervised? (with heartbeat like in prefectv1) • When an agent is lost (network issue, oomkill .. and more) the middle of a run, the flow is still in "running" state, is this normal behavior? how to detect this problem? Thanks for all, Matt
    🙌 3
    ✅ 1
    a
    • 2
    • 3
  • What to do when you get a 404 page in Prefect Cloud?
    r

    redsquare

    10/19/2022, 8:36 AM
    Hi - I am getting permission issues when logged into the UI - clicking either of my two workspaces gives a 404 - the {workspace}/permissions call is returning
    {"detail":"Not Found"}
    :plus-one: 2
    ✅ 1
    j
    j
    • 3
    • 12
  • f

    Florian Kühnlenz

    10/19/2022, 8:53 AM
    how can I setup SSO in prefect cloud 2.0?
    ✅ 1
    a
    • 2
    • 4
  • d

    Deepanshu Aggarwal

    10/19/2022, 11:01 AM
    is there a way to force stop a flow run ?
    a
    • 2
    • 4
  • q

    Q

    10/19/2022, 1:31 PM
    Currently, the ability to use custom kube config for
    KubernetesJob
    does not seem to be documented in the source, docs, or api docs, which would lead one to believe that it's not possible, even though it is. Just FYI.
    ✅ 1
    k
    • 2
    • 1
  • f

    flapili

    10/19/2022, 2:21 PM
    Hi, how could I run a job every X times until a condition ?
    ✅ 1
  • f

    flapili

    10/19/2022, 2:22 PM
    my use case is buy domain name (OVH api) + waiting for custom NS propagation
    ✅ 1
    s
    • 2
    • 4
  • c

    Chris Gunderson

    10/19/2022, 2:47 PM
    Hi - When deploying a flow with a S3 block and Docker infrastructure block, will the flow be looking for the code to run in the docker container, or S3 block? The deployment is sending our code to S3, but I'm having issues running the flow. It appears to be looking for the code in the docker image. CLI Deployment script:
    prefect deployment build -n fidelity-allocations-deployment -q default -sb s3/prefect-training -ib docker-container/prefect-training -a src/main/prefect/flows/allocations/prefect_fidelity_allocations.py:FidelityAllocationsFlow --cron "13 21 * * 1-5"
    Error message from the flow: Flow could not be retrieved from deployment. Traceback (most recent call last): File "<frozen importlib._bootstrap_external>", line 846, in exec_module File "<frozen importlib._bootstrap_external>", line 982, in get_code File "<frozen importlib._bootstrap_external>", line 1039, in get_data FileNotFoundError: [Errno 2] No such file or directory: 'src\\main\\prefect\\flows\\allocations\\prefect_fidelity_allocations.py' The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 257, in retrieve_flow_then_begin_flow_run flow = await load_flow_from_flow_run(flow_run, client=client) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 82, in with_injected_client return await fn(*args, **kwargs) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/deployments.py", line 70, in load_flow_from_flow_run flow = await run_sync_in_worker_thread(import_object, str(import_path)) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 57, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread return await future File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run result = context.run(func, *args) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 193, in import_object module = load_script_as_module(script_path) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module raise ScriptError(user_exc=exc, path=path) from exc prefect.exceptions.ScriptError: Script at 'src\\main\\prefect\\flows\\allocations\\prefect_fidelity_allocations.py' encountered an exception
    ✅ 1
    r
    • 2
    • 15
  • d

    David Elliott

    10/19/2022, 2:56 PM
    Hey all, it seems the allow_failure() wrapper doesn’t work with the
    DaskTaskRunner()
    😢 It can’t pickle the task futures, which is sadly blocking our migration to 2.0. Have tried this both in Dask locally and also on k8s. Any ideas? MRE in 🧵
    👀 1
    ✅ 1
    k
    • 2
    • 4
Powered by Linen
Title
d

David Elliott

10/19/2022, 2:56 PM
Hey all, it seems the allow_failure() wrapper doesn’t work with the
DaskTaskRunner()
😢 It can’t pickle the task futures, which is sadly blocking our migration to 2.0. Have tried this both in Dask locally and also on k8s. Any ideas? MRE in 🧵
👀 1
✅ 1
Untitled.py
prefect==2.6.1
prefect-dask==0.2.0.post1
k

Kalise Richmond

10/19/2022, 4:08 PM
Hi @David Elliott, this looks like a bug in the prefect-dask repo. Could you please file a git issue with your detail findings?
d

David Elliott

10/19/2022, 4:16 PM
OK thanks - have raised it here https://github.com/PrefectHQ/prefect-dask/issues/43 Let me know if there’s anything else I should add / do?
View count: 4