https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • l

    Luis Muniz

    06/26/2020, 9:54 AM
    Hi Guys, I have seen PIN 14: Event-Driven Flow Execution via Listeners which has a Paused state. Does this mean that there is no planned roadmap for this feature? Is the current recommended workaround still using Prefect Cloud API and Lambda functions?
    1 reply · 1 participant
  • l

    Luis Muniz

    06/26/2020, 11:35 AM
    Hi, is prefect-core API unsecured by default? I am trying to create an API token and get cryptic errors about JSON not being able to be parsed? The story is that I am trying to create a local agent by running
    prefect agent start
    This results in an error complaining about missing an API token.
    prefect.utilities.exceptions.AuthorizationError: No agent API token provided.
    When I then call
    prefect auth create-token -n calypso -s RUNNER
    I see the following error:
    Traceback (most recent call last):
      File "/home/lmuniz/.local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/home/lmuniz/.local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/cli/auth.py", line 201, in create_token
        output = client.graphql(
      File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in graphql
        result = <http://self.post|self.post>(
      File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 172, in post
        response = self._request(
      File "/home/lmuniz/.local/lib/python3.8/site-packages/prefect/client/client.py", line 334, in _request
        json_resp = response.json()
      File "/usr/lib/python3/dist-packages/requests/models.py", line 897, in json
        return complexjson.loads(self.text, **kwargs)
      File "/usr/lib/python3/dist-packages/simplejson/__init__.py", line 518, in loads
        return _default_decoder.decode(s)
      File "/usr/lib/python3/dist-packages/simplejson/decoder.py", line 370, in decode
        obj, end = self.raw_decode(s)
      File "/usr/lib/python3/dist-packages/simplejson/decoder.py", line 400, in raw_decode
        return self.scan_once(s, idx=_w(s, idx).end())
    simplejson.errors.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    I have started prefect with the recommended
    prefect server start
    command and installed prefect with pip3.
    $prefect diagnostics                                                                                                    
    {
      "config_overrides": {},
      "env_vars": [],
      "system_information": {
        "platform": "Linux-5.4.0-7624-generic-x86_64-with-glibc2.29",
        "prefect_version": "0.12.1",
        "python_version": "3.8.2"
      }
    }
    i
    c
    31 replies · 3 participants
  • l

    Luis Muniz

    06/26/2020, 12:21 PM
    Another question, if I run a local agent, how can I specify the URL for the prefect API i wish to connect to, if it does not run on the local server? (not using prefect cloud)
    j
    6 replies · 2 participants
  • h

    Hamzah Iqbal

    06/26/2020, 1:44 PM
    HI, I am a little confused as to how the agent works? How do the different agents differ? I have read the medium blog on deploying the [server] (https://medium.com/the-prefect-blog/prefect-server-101-deploying-to-google-cloud-platform-47354b16afe2). But from what I understand, we need the agent running for the scheduled tasks to run?
    c
    3 replies · 2 participants
  • a

    Alex Cano

    06/26/2020, 2:06 PM
    Hey all… found a busted link in the docs. On this, the link to the “sign up for our free Prefect Cloud Scheduler tier” goes to a 404
    n
    m
    3 replies · 3 participants
  • l

    Luis Muniz

    06/26/2020, 7:17 PM
    Hi, another newb question: What is the reasonable usage of
    map()
    ? is using
    map()
    to handle a list of thousands (or millions?) of elements one by one reasonable? Should I micro-batch it into chunks? Does prefect start to choke if you have many many tasks in a flow? What about the Dashboard when you examine such a flow after it has run?
    👀 1
    n
    7 replies · 2 participants
  • n

    nicholas

    06/26/2020, 8:04 PM
    Hi all! @Laura Lorenz (she/her) is

    currently live on YouTube▾

    and is talking about how to set up your Prefect execution layer - feel free to drop by!
    📺 2
    🙌 10
    l
    l
    +1
    3 replies · 4 participants
  • m

    matta

    06/27/2020, 12:20 AM
    Silly question: How do I view & manage schedules if I'm not using the UI? I wanna deploy on a small server that's probably too small to run docker-compose.
    n
    12 replies · 2 participants
  • h

    Hui Zheng

    06/27/2020, 1:24 AM
    Hello, Could someone help me with this error that I ran into when deploy flow into prefect-cloud?
    File "build_and_deploy.py", line 48, in <module>
        env_vars=deployment_vars
      File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/core/flow.py", line 1277, in deploy
        version_group_id=version_group_id,
      File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/client/client.py", line 577, in deploy
        versionGroupId=version_group_id,
      File "/Users/huizheng/.local/share/virtualenvs/data-platform-W017i1KI/lib/python3.7/site-packages/prefect/client/client.py", line 222, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['createFlowFromCompressedString'], 'message': '1 validation error for FlowSchema\nschedule -> clocks -> 0 -> parameter_defaults\n  field required (type=value_error.missing)', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I am using prefect version
    0.7.3
    (yes, it’s old, but have to stay with it for another month). the error was thrown at this line of code
    dbt_flow.deploy(
            '{}'.format(env),
            python_dependencies=[
                'google-cloud-firestore', 'python-dotenv', 'google-cloud-storage', 'environs'],
            files={
                path.abspath('./prefect_cloud_deployment/{}/gcp-scheduler-key.json'.format(env)): '/home',
            },
            registry_url='<http://us.gcr.io/semios-dbt/scheduler|us.gcr.io/semios-dbt/scheduler>',
            env_vars=deployment_vars
    The code before that line which might related to this error
    # Flow auto-schedule
        start_time = datetime.now()
        start_time = start_time.replace(hour=(start_time.hour), minute=10, second=0, microsecond=0)
        hourly_schedule = Schedule(clocks=[IntervalClock(interval=timedelta(minutes=60), start_date=start_time)])
        dbt_flow.schedule = hourly_schedule
    👀 1
    j
    c
    6 replies · 3 participants
  • a

    Alfie

    06/27/2020, 2:31 AM
    Hi Team, I’m trying Prefect following the guide doc, and found that there is a few seconds latency between triggering a flow to run via CLI “prefect run server --name hello-flow” and the agent runs the flow. Is that expected? Thanks.
    c
    13 replies · 2 participants
  • j

    Jorge

    06/27/2020, 9:31 AM
    Hi all, where can I find more information about how to run prefect from a container? The docs only show how to start a python interactive shell but how do I start the server or agents etc?
    r
    n
    8 replies · 3 participants
  • s

    Sandeep Aggarwal

    06/27/2020, 12:41 PM
    How to set
    PREFECT__LOGGING__LOG_ATTRIBUTES
    ? I am trying to send a unique id while creating a flow run using python client. I want this uuid to be available in logs for tracking the request flow. However, so far I haven't had any success in correctly setting the extra log attributes. This is how i start the docker agent:
    prefect agent start docker -e PREFECT__LOGGING__LOG_ATTRIBUTES="['uuid']"
    I tried debugging and found that the log attributes are fetched as string here, instead of list.
    c
    m
    8 replies · 3 participants
  • c

    Cab Maddux

    06/27/2020, 6:37 PM
    Hi Prefect, I'm finding flows that have tasks that fail with 'No Heartbeat Detected' but the flow itself continues running (you can see in attached screenshots that heartbeat lost around 1:15AM but the flow continued running until manually marked as failed ~7 hours later). I believe previous Zombie Killer behavior is that the flow would have been immediately marked as failed. Is this an expected change to behavior?
    a
    c
    +1
    8 replies · 4 participants
  • r

    Rafal

    06/28/2020, 2:58 PM
    Can Prefect UI be intefacesed with CloudWatch somehow? @josh @Jeremiah
    j
    2 replies · 2 participants
  • i

    itay livni

    06/28/2020, 3:13 PM
    https://github.com/PrefectHQ/prefect/discussions/2881
    👀 1
  • l

    Luis Muniz

    06/28/2020, 10:40 PM
    Hi I thought I was being smart when modularizing the types of tasks, and having one module where I construct my flow:
    from tasks.collect.games import *
    from tasks.collect.streamers import *
    from tasks.collect.streams import *
    from tasks.enrich.game import *
    from tasks.enrich.streamer import *
    from tasks.enrich.stream import *
    from tasks.store.game import *
    from tasks.store.streamer import *
    from tasks.store.stream import *
    from tasks.util.common import *
    
    with Flow("STRDATA POC") as strdata:
        collected_games = collect_games()
        enriched_games = enrich_game.map(collected_games)
    
        collected_streamers = collect_streamers()
        enriched_streamers = enrich_streamer.map(collected_streamers)
    
        collected_streams = collect_streams_per_game.map(enriched_games, unmapped(enriched_streamers))
        enriched_streams = enrich_stream.map(flatten(collected_streams))
    
        store_game.map(enriched_games)
        store_stream.map(enriched_streams)
        store_streamer.map(enriched_streamers)
    The Flow runs OK when I run it standalone, but when I register it in my local prefect server, I can see the following error in the dashboard:
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'tasks'")
    it seems to be similar to an issue I found about not being able to submit flows to prefect cloud because of some peculiarity with pickle? https://github.com/PrefectHQ/prefect/issues/1742 But this was related to packaging the flow in a docker image, so I can't apply the solution to my case The layout of my project is the following:
    deploy
    |_
      prod
      |_
        register.py (contains flow.register)
    flows
    |_
      strdata_poc.py (contains flow definition - see above)
    tasks
    |_
      collect
      |_
        games.py
        streamers.py
        streams.py
      enrich
      |_
        ...
    c
    a
    20 replies · 3 participants
  • h

    Hui Zheng

    06/29/2020, 12:27 AM
    Hello Prefect Support, I am running into another error in my old-version preflect-flow local run, I am trying to run a flow locally and got the error below
    [libprotobuf ERROR google/protobuf/descriptor_database.cc:58] File already exists in database:
    [libprotobuf FATAL google/protobuf/descriptor.cc:1370] CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):
    libc++abi.dylib: terminating with uncaught exception of type google::protobuf::FatalException: CHECK failed: GeneratedDatabase()->Add(encoded_file_descriptor, size):
    Abort trap: 6
    The error is thrown when the code try to import
    from main import dbt_flow
    from .py file that defines the prefect flow. below are some libraries imported in the main.py file
    from google.api_core.datetime_helpers import DatetimeWithNanoseconds
    from prefect import task, Flow, Parameter, triggers, unmapped
    from prefect.engine import signals, result_handlers
    from prefect.schedules import IntervalSchedule
    from datetime import timedelta, datetime, MAXYEAR
    from google.cloud import firestore
    from os import walk, path, getenv
    from environs import Env
    from collections.abc import Iterable
    from itertools import chain
    from copy import deepcopy
    from time import sleep
    import requests
    import argparse
    import json
    import pytz
    here is the pipfile
    [packages]
    prefect = "==0.7.3"
    google-cloud-firestore = "==1.5.0"
    python-dotenv = "==0.10.3"
    environs = "==7.3.0"
    
    [requires]
    python_version = "3.7"
    c
    1 reply · 2 participants
  • c

    Chris Vrooman

    06/29/2020, 3:10 AM
    Hi everyone, I have been testing with concurrency tags and had a question about some behavior I am seeing with docker. It seems that after a task has been waiting (via concurrency tag limit) for 10 minutes, the prefect docker agent starts continuously spinning up new containers for that flow that is waiting to start. If I do a “docker ps” I could see 20 or more containers running for that flow. Any ideas on this?  My Setup:  Prefect Version: 0.10.6 Docker Engine Version: 19.03.6-ce Storage Type: Docker OS: Replicated the issue on Mac and Amazon Linux 2 Single prefect docker agent running 2 flows.
    c
    6 replies · 2 participants
  • j

    jars

    06/29/2020, 3:44 AM
    Hi folks, we used the GCSResultHandler back in 0.7.3. We see it writing files into GCS, but how best to decode them for debugging? I tried base64 decode, but that doesn't seem to work right.
    c
    6 replies · 2 participants
  • a

    Alfie

    06/29/2020, 10:58 AM
    Hi Team, I want to deploy the open source version of Prefect but authentication is required. Seems so far authentication is just supported in the cloud version. Do you have any good suggestions to achieve that?
    n
    8 replies · 2 participants
  • d

    Darragh

    06/29/2020, 11:04 AM
    Morning all! We’re seeing an issue with mapped tasks on Fargate, where we only get 2 mapped tasks running at a time. Doesn’t seem to matter what the tasks are, Prefect seems to cap the max concurrent tasks at 2. Any idea? We’re using Core 0.12.0, running on Fargate with DockerStorage and DaskExecutor And just uncovered another issue. While trying to boil down a sample flow to reproduce, I’m now hitting this problem on every run:
    Unexpected error: KilledWorker('run_task-a6bb0ba3-e74f-4e26-8cc4-2bcc8f8c217c', <Worker '<tcp://127.0.0.1:41917>', name: 1, memory: 0, processing: 1>)
    Doesn’t seem to matter what the flow is, getting this on all flows, even after restarting Prefect Server and the Fargate Agent..
    a
    j
    +1
    37 replies · 4 participants
  • e

    Etienne

    06/29/2020, 11:07 AM
    Hello guys! Any idea why self hosted prefect server doesn't have the
    flow_group
    graphql query available? running 0.12.1
    z
    2 replies · 2 participants
  • k

    Kevin Weiler

    06/29/2020, 6:43 PM
    hi there - I’m having trouble telling my flow where to look for prefect server (the api). I’ve tried many different ways to set
    prefect.config.api.url
    (env variable
    PREFECT__CONFIG__API__URL
    and setting it in
    config.toml
    but when I call
    .register()
    I get the following error:
    requests.exceptions.ConnectionError: HTTPConnectionPool(host='localhost', port=4200): Max retries exceeded with url: /graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fbcbd594e10>: Failed to establish a new connection: [Errno 111] Connection refused'))
    indicating that it’s still trying
    localhost
    instead of what I’ve given it (which does not resolve to
    localohost
    ) - any ideas?
    k
    j
    10 replies · 3 participants
  • p

    Philip MacMenamin

    06/29/2020, 7:01 PM
    I have a question about using
    Parameter
    in
    ShellTask
    According to the docs,
    Parameter
    is a type of
    Task
    , and I'm guessing it needs to get the method
    run
    called on it before it's usable. I'm guessing that this method is called when you pass the parameter to a function with the
    @task
    decorator? I have a shell task, which I'd like to pass an object (or a string) which is created using a
    Parameter
    object. Normally Shell tasks seem to be defined in the flow block. When I try to run a shell task, in a flow, with arguments that are dependent on a parameter, it fails with something like:
    Traceback (most recent call last):
      File "/usr/lib/python3.8/runpy.py", line 193, in _run_module_as_main
        return _run_code(code, main_globals, None,
      File "/usr/lib/python3.8/runpy.py", line 86, in _run_code
        exec(code, run_globals)
      File "/home/macmenaminpe/code/prefect/pdb_flow/parameterized_flow.py", line 45, in <module>
        num_lines = s_task(command=f"wc -l {job.pdb_fp} > {job.job_dir}num_lines.txt")
      File "/home/macmenaminpe/.local/lib/python3.8/site-packages/prefect/tasks/core/function.py", line 68, in __getattr__
        raise AttributeError(f"'FunctionTask' object has no attribute {k}")
    AttributeError: 'FunctionTask' object has no attribute pdb_fp
    So, I guess the two questions are: • how do I "get at" the input val of a Parameter in a flow (or do I not do this, and always pass that param out to a
    @task
    and let it get handled there • how do I create a shellTask that takes args
    c
    8 replies · 2 participants
  • j

    Jeff Brainerd

    06/29/2020, 7:38 PM
    Hi team, question about retries. Is there a good way to retry a task conditionally? Specifically, I want to retry a task on certain failures (e.g. db deadlocks). My thought was to use a state handler to detect the right situation (easy) but then how would I actually invoke the retry? Maybe return a new
    Retrying
    State object? Couldn’t find any doc examples of this and wanted to know if I missed anything obvious before spelunking. Thanks! 🙏
    k
    a
    5 replies · 3 participants
  • z

    Zach

    06/29/2020, 8:47 PM
    How do I add max_retries to a Task class? I modified the GCSDownloadAsFile class that inherits from GCSBaseTask class, and since this is a class, there is no @task decorator that I can add like
    max_retries=3
    to.
    k
    1 reply · 2 participants
  • k

    Kevin Weiler

    06/29/2020, 9:42 PM
    is this error expected?
    TypeError: Object of type datetime is not JSON serializable
    the relevant code is, outside the flow context:
    @task(result=PrefectResult())
    def assert_db_trigger(event_name: str, events_fetcher: DailyFlowEvents, point_date: datetime):
        events = events_fetcher.fetch_events()
        while event_name not in events.keys() or events[event_name] < point_date:
            print(f"No trigger, yet for {event_name}")
            sleep(5)
            events = events_fetcher.fetch_events()
        return point_date
    and
    point_datetime = datetime(year=point_date.year, month=point_date.month, day=point_date.day, hour=0, minute=0, second=0, microsecond=0, tzinfo=pytz.UTC)
    and within the flow context:
    task_wait_for_reference_data_validated = assert_db_trigger("reference_data_validated", daily_flow_events, point_datetime)
    do all function/method arguments need to be JSON serializable?
    k
    2 replies · 2 participants
  • k

    Klemen Strojan

    06/30/2020, 5:35 AM
    I have an issue with File based flow storage. I followed the instructions (https://docs.prefect.io/core/idioms/file-based.html) and could not register the flow with
    prefect register -f flows/my_flow.py
    . I got an error
    Error: no such option: -f
    . When I register the flow with
    flow.register()
    it becomes available in UI but I am unable to run it, the tasks are pending and I can see this error
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'github'")
    . I am using
    0.12.1
    and Cloud. Any ideas?
    😛refect-cloud: 1
    j
    6 replies · 2 participants
  • j

    Jacob Blanco

    06/30/2020, 5:51 AM
    I would like to run a given task with an array of dates:
    @task
    def generate_dates(start_date, n_days):
        return [start_date - datetime.timedelta(days=x) for x in range(n_days)]
    
    @task
    def do_something(updated_at):
        ## DO SOME STUFF WITH THE DATE
    
    with Flow("My Flow") as flow:
        start_date = Parameter('start_date')
        n_days = Parameter('n_days')
    
        dates = generate_dates(start_date, n_days)
    
        for run_date in dates:
            do_something(run_date)
    
    flow.run(dict(start_date=, n_days= 10))
    In this case I don't want to use map for some technical reasons. I could just implement the loop inside of the task but I like having all the timing tracked by Prefect Cloud.
    e
    7 replies · 2 participants
  • k

    Kai Weber

    06/30/2020, 7:22 AM
    Hi lots, I know I have asked this question before in a previous thread. But I still have the problem that the (Core) Prefect Docker containers get a default name (in my case it is temp_xxx). I start the server application with "prefect server start". But I cannot find any parameters to influence the Docker Container name. Background is that I want to erase the Docker Container (by name) before each start for development reasons in the development environment. Thanks in advance.
    j
    2 replies · 2 participants
Powered by Linen
Title
k

Kai Weber

06/30/2020, 7:22 AM
Hi lots, I know I have asked this question before in a previous thread. But I still have the problem that the (Core) Prefect Docker containers get a default name (in my case it is temp_xxx). I start the server application with "prefect server start". But I cannot find any parameters to influence the Docker Container name. Background is that I want to erase the Docker Container (by name) before each start for development reasons in the development environment. Thanks in advance.
j

josh

06/30/2020, 11:40 AM
Hi @Kai Weber I’m hesitant to add an option to set name to the CLI command since some of the functionality might be changing soon but to accomplish what you’re after you could grab the same docker-compose.yml file that the
server start
command uses https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/docker-compose.yml then add the container names and run
docker-compose up
to start the server.
k

Kai Weber

06/30/2020, 11:52 AM
Hi @josh Thanks for the message. And I think I got you. Until I have the right insight, I'm not going to mess around with configuration files. I was hoping there'd be some sort of software switch as a parameter.
View count: 1