• h

    Hui Zheng

    2 years ago
    Hello, Could someone help me with this error that I ran into when deploy flow into prefect-cloud?
    File "build_and_deploy.py", line 48, in <module>
      File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/core/flow.py", line 1277, in deploy
      File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/client/client.py", line 577, in deploy
      File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/client/client.py", line 222, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['createFlowFromCompressedString'], 'message': '1 validation error for FlowSchema\nschedule -> clocks -> 0 -> parameter_defaults\n  field required (type=value_error.missing)', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I am using prefect version
    (yes, it’s old, but have to stay with it for another month). the error was thrown at this line of code
                'google-cloud-firestore', 'python-dotenv', 'google-cloud-storage', 'environs'],
                path.abspath('./prefect_cloud_deployment/{}/gcp-scheduler-key.json'.format(env)): '/home',
    The code before that line which might related to this error
    # Flow auto-schedule
        start_time = datetime.now()
        start_time = start_time.replace(hour=(start_time.hour), minute=10, second=0, microsecond=0)
        hourly_schedule = Schedule(clocks=[IntervalClock(interval=timedelta(minutes=60), start_date=start_time)])
        dbt_flow.schedule = hourly_schedule
    6 replies
    Copy to Clipboard
  • Alfie


    2 years ago
    Hi Team, I’m trying Prefect following the guide doc, and found that there is a few seconds latency between triggering a flow to run via CLI “prefect run server --name hello-flow” and the agent runs the flow. Is that expected? Thanks.
    Chris White
    13 replies
    Copy to Clipboard
  • j


    2 years ago
    Hi all, where can I find more information about how to run prefect from a container? The docs only show how to start a python interactive shell but how do I start the server or agents etc?
    8 replies
    Copy to Clipboard
  • s

    Sandeep Aggarwal

    2 years ago
    How to set
    ? I am trying to send a unique id while creating a flow run using python client. I want this uuid to be available in logs for tracking the request flow. However, so far I haven't had any success in correctly setting the extra log attributes. This is how i start the docker agent:
    prefect agent start docker -e PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
    I tried debugging and found that the log attributes are fetched as string here, instead of list.
    Chris White
    8 replies
    Copy to Clipboard
  • c

    Cab Maddux

    2 years ago
    Hi Prefect, I'm finding flows that have tasks that fail with 'No Heartbeat Detected' but the flow itself continues running (you can see in attached screenshots that heartbeat lost around 1:15AM but the flow continued running until manually marked as failed ~7 hours later). I believe previous Zombie Killer behavior is that the flow would have been immediately marked as failed. Is this an expected change to behavior?
    8 replies
    Copy to Clipboard
  • r


    2 years ago
    Can Prefect UI be intefacesed with CloudWatch somehow? @josh @Jeremiah
    2 replies
    Copy to Clipboard
  • l

    Luis Muniz

    2 years ago
    Hi I thought I was being smart when modularizing the types of tasks, and having one module where I construct my flow:
    from tasks.collect.games import *
    from tasks.collect.streamers import *
    from tasks.collect.streams import *
    from tasks.enrich.game import *
    from tasks.enrich.streamer import *
    from tasks.enrich.stream import *
    from tasks.store.game import *
    from tasks.store.streamer import *
    from tasks.store.stream import *
    from tasks.util.common import *
    with Flow("STRDATA POC") as strdata:
        collected_games = collect_games()
        enriched_games = enrich_game.map(collected_games)
        collected_streamers = collect_streamers()
        enriched_streamers = enrich_streamer.map(collected_streamers)
        collected_streams = collect_streams_per_game.map(enriched_games, unmapped(enriched_streamers))
        enriched_streams = enrich_stream.map(flatten(collected_streams))
    The Flow runs OK when I run it standalone, but when I register it in my local prefect server, I can see the following error in the dashboard:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'tasks'")
    it seems to be similar to an issue I found about not being able to submit flows to prefect cloud because of some peculiarity with pickle?https://github.com/PrefectHQ/prefect/issues/1742 But this was related to packaging the flow in a docker image, so I can't apply the solution to my case The layout of my project is the following:
        register.py (contains flow.register)
      strdata_poc.py (contains flow definition - see above)
    Chris White
    20 replies
    Copy to Clipboard
  • h

    Hui Zheng

    2 years ago
    Hello Prefect Support, I am running into another error in my old-version preflect-flow local run, I am trying to run a flow locally and got the error below
    [libprotobuf ERROR google/protobuf/descriptor_database.cc:58] File already exists in database:
    [libprotobuf FATAL google/protobuf/descriptor.cc:1370] CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):
    libc++abi.dylib: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):
    Abort trap: 6
    The error is thrown when the code try to import
    from main import dbt_flow
    from .py file that defines the prefect flow. below are some libraries imported in the main.py file
    from google.api_core.datetime_helpers import DatetimeWithNanoseconds
    from prefect import task, Flow, Parameter, triggers, unmapped
    from prefect.engine import signals, result_handlers
    from prefect.schedules import IntervalSchedule
    from datetime import timedelta, datetime, MAXYEAR
    from google.cloud import firestore
    from os import walk, path, getenv
    from environs import Env
    from collections.abc import Iterable
    from itertools import chain
    from copy import deepcopy
    from time import sleep
    import requests
    import argparse
    import json
    import pytz
    here is the pipfile
    prefect = "==0.7.3"
    google-cloud-firestore = "==1.5.0"
    python-dotenv = "==0.10.3"
    environs = "==7.3.0"
    python_version = "3.7"
    1 replies
    Copy to Clipboard
  • c

    Chris Vrooman

    2 years ago
    Hi everyone, I have been testing with concurrency tags and had a question about some behavior I am seeing with docker. It seems that after a task has been waiting (via concurrency tag limit) for 10 minutes, the prefect docker agent starts continuously spinning up new containers for that flow that is waiting to start. If I do a “docker ps” I could see 20 or more containers running for that flow. Any ideas on this?  My Setup:  Prefect Version: 0.10.6 Docker Engine Version: 19.03.6-ce Storage Type: Docker OS: Replicated the issue on Mac and Amazon Linux 2 Single prefect docker agent running 2 flows.
    Chris White
    6 replies
    Copy to Clipboard