https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • v

    Vincent

    10/21/2020, 7:54 PM
    I am wondering if anyone has ever experienced a "reduce" bottleneck using prefect. I have a procedure which consumes thousands of mapped tasks. The task itself is very simple, but it just hangs for a very long time. Does anyone have leads for what might be the issue.
    m
    2 replies · 2 participants
  • b

    Billy McMonagle

    10/21/2020, 9:37 PM
    I have a very basic question. What exactly is a "tenant" and how does the concept differ from a "user"?
    c
    3 replies · 2 participants
  • m

    Mike Marinaccio

    10/21/2020, 9:42 PM
    Hey Everyone! Quick question about Parameters. I have some tasks in which I’ve decided to access certain Parameter values via
    prefect.context.parameters
    . Am I correct to say that I can always rely on the context params being set before any other task is run in an async / Dask environment?
    c
    2 replies · 2 participants
  • j

    Jimmy Le

    10/21/2020, 10:20 PM
    I'm working on a Selenium web scraping project, everything works fine when running locally - however, when I register the flow with Prefect Cloud, I'm not able to launch a Chrome Driver. It appears to be a serialization problem since I'm getting a
    TypeError: cannot pickle '_thread.lock' object
    . Has anyone run into a similar problem? Any suggestions would be appreciated!
    👀 2
    d
    m
    +3
    16 replies · 6 participants
  • z

    Zach

    10/21/2020, 11:35 PM
    How do I setup caching on a task that takes a python class instance as one of its arguments
    d
    1 reply · 2 participants
  • z

    Zach

    10/21/2020, 11:36 PM
    The
    PrefectResult()
    result handler doesn't work since a class instance is not JSON serializable
  • z

    Zach

    10/21/2020, 11:37 PM
    If I use a serializer, are they any restrictions on what the class's attributes are? One of my classe's attributes is a function pointer
  • b

    bral

    10/22/2020, 3:29 AM
    Hi folks! Has anyone had any experience using Sentry with the Prefect?
    👀 1
    r
    d
    8 replies · 3 participants
  • a

    Alberto de Santos

    10/22/2020, 2:14 PM
    Hi community, I have a question around launching Agents and keeping them working. I understand, one of the most handy options is just using
    nohup
    , however, in order to do that, I have to execute everything with the
    flow.run_agent()
    option and then comment that line of code to execute the flows. The alternative is through CLI, however, I couldn’t make it work (it doesn’t file the libraries I need to execute, despite of using the
    -p
    param) Which is your experience/view here?
    r
    d
    +1
    21 replies · 4 participants
  • d

    Dennis Schneidermann

    10/22/2020, 2:31 PM
    Hi everybody. We're using Prefect to run continuous processing on IoT data. I've just had to clear out the Prefect database as it grew to 12GB in less than a month, stalled the VM it was on, and I had no choice but to scrap it. I am logging a LOT of output from the continuous tasks, so my guess is that it's to blame for the growth, and can make sure once I have it running for a few days. The volume is a few log lines every 5 seconds from each of 5 continuous tasks. Regarding flow deletion/cleanup, I've found a 2 month old thread here https://prefect-community.slack.com/archives/CL09KU1K7/p1598535130019500 that mentions running a mutation on the Hasura API for doing deletions with a time filter, and here the suggested issue: https://github.com/PrefectHQ/server/issues/62 - @Dylan and @Sandeep Aggarwal you have participated in that thread. I have little experience with running GraphQL mutations and no experience with using the Hasura API, so I'm wondering if somebody has a working example of what to do here. Searches in Github and Slack for "clean"/"cleanup"/"database size" all give no results, so I'm hoping we can have a thread here with a recipe on what to do. I have no problem doing the legwork myself, for example, implement a Prefect flow that will make the needed API calls to cleanup old tasks. Any suggestions would be much appreciated.
    d
    s
    +1
    13 replies · 4 participants
  • j

    James Phoenix

    10/22/2020, 3:55 PM
    Hey everyone 🙂
  • j

    James Phoenix

    10/22/2020, 3:55 PM
    How do I check for the output of a function inside of flow?
  • j

    James Phoenix

    10/22/2020, 3:56 PM
    examplle_py.py
  • j

    James Phoenix

    10/22/2020, 3:56 PM
    I'm currently getting a None on print(realtor_data.result)
  • j

    James Phoenix

    10/22/2020, 3:56 PM
    Just wondered what I should be using instead 👍
    r
    c
    7 replies · 3 participants
  • n

    Newskooler

    10/22/2020, 5:29 PM
    Hi prefecters 👋 I stumbled on an interesting case: how can I test (unit test) a task which fails and has a retry (but without waiting for all retries to go… ? Currently I do this, however this works only when all retires have passed (which in my case takes a long time). Is there a way around this?
    def test_smth():
        with Flow("test") as flow:
            outcome = my_funct()
        state = flow.run()
        assert state.result[outcome].is_failed()
    👀 1
    d
    8 replies · 2 participants
  • a

    ale

    10/22/2020, 5:59 PM
    Hi folks, I'd want to configure logging for Rollbar. The issues is that the Rollbar auth token is available only at runtime in an environment variable. I was able to get the token using EnvVarSecret, which is then used to call rollbar.init(), but then I got stuck. How should I add the rollbar logger to prefect logging?
    d
    9 replies · 2 participants
  • b

    Billy McMonagle

    10/22/2020, 6:02 PM
    Hi there, I'm having some very tricky issues running tasks using
    FargateAgent
    . I am running this agent locally, and it seems to work intermittently. However, I am repeatedly getting this error:
    [2020-10-22 17:59:48,212] INFO - agent | Starting FargateAgent with labels ['XXX']
    [2020-10-22 17:59:48,212] INFO - agent | Agent documentation can be found at <https://docs.prefect.io/orchestration/>
    [2020-10-22 17:59:48,212] INFO - agent | Agent connecting to the Prefect API at <https://api.prefect.io>
    [2020-10-22 17:59:48,269] INFO - agent | Waiting for flow runs...
    [2020-10-22 17:59:48,402] ERROR - agent | [{'path': ['get_runs_in_queue'], 'message': "'NoneType' object has no attribute 'flow_group_id'", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    c
    7 replies · 2 participants
  • m

    Mitchell Bregman

    10/22/2020, 7:54 PM
    How can I register a flow from the command line? My setup:
    # $ROOT/src/flow.py
    from prefect import Flow
    
    flow = Flow(name="test flow")
    Now from the command line in dir
    $ROOT
    , I run:
    prefect register flow --file src/flow.py --name "test flow"
    I get the error
    KeyError: "'__name__' not in globals"
    What am I doing wrong? I’d like to use the CLI as it will be more simplistic from a CI/CD standpoint
    d
    m
    43 replies · 3 participants
  • f

    fabian wolfmann

    10/22/2020, 9:40 PM
    Hi community! which is the best way to map over a dictionary? and acces the key and value on the mapping function, i could find a example with this!
    m
    j
    14 replies · 3 participants
  • a

    Alberto de Santos

    10/22/2020, 10:06 PM
    Can anyone tell me how to kill all the scheduled Flows through the GUI? 🙂
    j
    7 replies · 2 participants
  • j

    Jesper van Dijke

    10/22/2020, 11:50 PM
    So (finally) got it running, but the page doesn't load due to this error in Chrome :
    auth0-spa-js must run on a secure origin
    Which of course is completely ok because not running on localhost but rather on an ubuntu image and connect to
    <http://192.168.88.133:8080>
    in chrome address bar
    <chrome://flags/#unsafely-treat-insecure-origin-as-secure>
    added this domain. No luck, next error :
    Refused to frame '<https://login.prefect.io/>' because an ancestor violates the following Content Security Policy directive: "frame-ancestors 'none'".
    Anyone solved or run it on a different machine? Alternatively I end up running SSH and tunnel to it... Putty, ssh tunnel forward 8080 and 4200, and all is good. Maybe a nice addendum to the documentation.
  • s

    Scott Asher

    10/23/2020, 2:54 AM
    i’m using prefect cloud - and running an agent via CLI locally. My agent is registered with cloud, but for the life of me I can’t figure out how I am supposed to register my flow. My bad understanding was that the registration communicated first with a proxy running on the local machine, but that doesn’t seem to be working. The error I see is:
    prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
  • s

    Scott Asher

    10/23/2020, 2:54 AM
    What token am I supposed to configure? My agent has a runner token configured, but I don’t see where a flow has to have a token.
  • s

    Scott Asher

    10/23/2020, 3:13 AM
    FYI this is the error I see whether using flow registration via the CLI or via
    flow.register()
    m
    7 replies · 2 participants
  • a

    Alberto de Santos

    10/23/2020, 9:29 AM
    Hi all,
  • a

    Alberto de Santos

    10/23/2020, 9:30 AM
    I am wondering how to make a
    map
    over a
    DataFrame
    , do you have any idea? I see very clear how to make it over a
    list
    . To my mind, it seems straightforward to convert the
    DataFrame
    into a
    list
    . So, I would like to know your opinion.
    a
    s
    +1
    4 replies · 4 participants
  • z

    Zach

    10/23/2020, 2:55 PM
    Has anyone ever seen an error like this during one of their prefect flows?
    Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': "can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 111, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1177, in set_task_run_state
        result = self.graphql(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 226, in graphql
        raise ClientError(result["errors"])
    prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': "can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE", 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    I ran this flow 89 times with different input, and one time it failed with this message.
    👀 1
    n
    2 replies · 2 participants
  • z

    Zach

    10/23/2020, 3:46 PM
    @nicholas After I saw that my flow failed last night, I tried to restart the flow right now through the Prefect UI. Originally, the flow failed after 15 minutes. I restarted it now (about 12 hours later) and it failed again. And now the Prefect UI says the flow's run time was >12 hours. That is really misleading and it screws with out metrics, since we monitor flow run time to keep track of how long tasks are taking.
    n
    5 replies · 2 participants
  • m

    Mitchell Bregman

    10/23/2020, 6:09 PM
    Hi there, I am running into a very odd issue with regards to module packaging and registering to prefect cloud. The code lives here and the process to register lives here. Getting an
    ModuleNotFoundError: No module named src
    during the flow healthcheck, traceback here. Am I doing something wrong in terms of
    __init__
    packaging? This is a followup to thread yesterday.
    n
    m
    +2
    52 replies · 5 participants
Powered by Linen
Title
m

Mitchell Bregman

10/23/2020, 6:09 PM
Hi there, I am running into a very odd issue with regards to module packaging and registering to prefect cloud. The code lives here and the process to register lives here. Getting an
ModuleNotFoundError: No module named src
during the flow healthcheck, traceback here. Am I doing something wrong in terms of
__init__
packaging? This is a followup to thread yesterday.
n

nicholas

10/23/2020, 6:13 PM
It looks like when
src
is referenced, it's from within the
src
directory, shouldn't the
__init__
module reference it with
from flow import Flow
?
( i could be wrong here, that's just my initial thought)
m

Mitchell Bregman

10/23/2020, 6:14 PM
i can try that! one sec
👍 1
m

Michael Adkins

10/23/2020, 6:15 PM
I do not think that will resolve it
m

Mitchell Bregman

10/23/2020, 6:15 PM
yeah because then my package locally will be messed up
m

Michael Adkins

10/23/2020, 6:16 PM
From within the
src
init file you should still reference the full path to the module
m

Mitchell Bregman

10/23/2020, 6:16 PM
I am as such:
"""Top-level module."""
from src.flow import flow

__all__ = ["flow"]
u think its a naming issue? i can change
flow.py
to
build.py
or something
m

Michael Adkins

10/23/2020, 6:17 PM
Did you see this warning?
/opt/prefect/healthcheck.py:147: UserWarning: Flow uses module which is not importable. Refer to documentation on how to import custom modules <https://docs.prefect.io/api/latest/environments/storage.html#docker>
  flows = cloudpickle_deserialization_check(flow_file_paths)
The module that’s not importable is probably
src
which is not installed within the docker container
m

Mitchell Bregman

10/23/2020, 6:18 PM
it is installed via `pip install -e .`… when i locally
import src
all works just fine
whihc is the same process i am following in CI workflow
m

Michael Adkins

10/23/2020, 6:18 PM
pip install -e
is not run within the docker container though
Which is being used to store your flow
m

Mitchell Bregman

10/23/2020, 6:19 PM
got it… so what kind of workaround is there?
i can include an additional step in the docker storage?
n

nicholas

10/23/2020, 6:20 PM
Oh, couldn't you copy the
src
folder to the docker container?
m

Michael Adkins

10/23/2020, 6:20 PM
You can probably install your module using the
extra_dockerfile_commands
kwarg or include your module like so
Docker(
    files={
        # absolute path source -> destination in image
        "/Users/me/code/mod1.py": "/modules/mod1.py",
        "/Users/me/code/mod2.py": "/modules/mod2.py",
    },
    env_vars={
        # append modules directory to PYTHONPATH
        "PYTHONPATH": "$PYTHONPATH:modules/"
    },
)
@nicholas it’ll need to be copied in and then installed or added to the python path
👍 1
@Mitchell Bregman there’s in example in the docker storage docs linked from that warning I pasted in
:upvote: 1
Python package management is a bit of a headache 😕
:upvote: 1
We have plans to write a blog post about it someday 🙂
👍 1
m

Mitchell Bregman

10/23/2020, 6:22 PM
im about confused about what u suggested
so i should copy each file over?
m

Michael Adkins

10/23/2020, 6:23 PM
So in the code block I pasted you are listing files that you’d like to pass into the docker image. You can actually just list the directory so
"/path/in/ci/to/module": "/modules"
m

Mitchell Bregman

10/23/2020, 6:24 PM
got it - 1 sec
m

Michael Adkins

10/23/2020, 6:24 PM
Will copy your module directory into the image. Then you need to either install it by running
pip install -e /modules/yourmodule
(via the extra cmds) or add it to the PYTHONPATH using
env_vars
m

Mitchell Bregman

10/23/2020, 6:38 PM
didnt seem to like this
i think im doing something wrong
extra_dockerfile_commands="pip install -e /modules",
    files={f"{os.path.join(os.path.expanduser('~'), 'project')}": "/modules"},
flow.storage = Docker(
    env_vars=config.ENVIRONMENT_VARIABLES,
    extra_dockerfile_commands="pip install -e /modules",
    files={f"{os.path.join(os.path.expanduser('~'), 'project')}": "/modules"},
    image_name=config.DOCKER_IMAGE_NAME,
    image_tag=config.DOCKER_IMAGE_TAG,
    python_dependencies=config.PYTHON_DEPENDENCIES,
    registry_url="<http://parkmobile-docker.jfrog.io|parkmobile-docker.jfrog.io>",
    tls_config=tls_config,
)
this
os.path.join(os.path.expanduser('~'), 'project'
resolves to
home/circleci/project
(all the code if you were to clone it lives here)
so im moving this to
/modules
m

Michael Adkins

10/23/2020, 6:40 PM
Seems reasonable, what was the error?
m

Mitchell Bregman

10/23/2020, 6:40 PM
docker.errors.APIError: 400 Client Error: Bad Request ("Dockerfile parse error line 16: unknown instruction: P")
one sec - sending u full traceback
m

Michael Adkins

10/23/2020, 6:41 PM
extra commands expects a list of strings
m

Mitchell Bregman

10/23/2020, 6:41 PM
ahhhh
testing it out!
m

Michael Adkins

10/23/2020, 6:41 PM
And it’s probably got to be in docker format so
["RUN …"]
m

Mitchell Bregman

10/23/2020, 6:41 PM
roger that
im thinking this might be the one!! standby
thanks for all ur help!!
m

Michael Adkins

10/23/2020, 6:53 PM
No problem! It’ll all be worth it for the write up in the end 😉
m

Miha Sajko

12/19/2020, 4:56 PM
Has there been any documentation written on this issue? Or perhaps a better question, is there any alternative implementation in the roadmap to solve this more elegantly?
My flows predominantly consist of custom made tasks which themselves can be quite complex (relying on various custom functions, classes, etc). Do I correctly understand that if I want to use Docker storage I have to use the
files
argument as discussed in this thread or is there a better way?
m

Michael Adkins

12/19/2020, 6:14 PM
I’m working on a guide to this and some more examples — as we get a feel for how people are using it we can introduce easier to use functionality directly in prefect.
Currently, I do something like this:
from my_project import PROJECT_PATH, PROJECT_NAME
from prefect.storage.docker import Docker


def ProjectDockerStorage(
    project_path: str = PROJECT_PATH, project_name: str = PROJECT_NAME, **kwargs
) -> Docker:
    """
    A thin wrapper around `prefect.storage.Docker` with installation of a local project,
    defaulting to installing this project

    Cannot be a class because then it is not a known serializable storage type so this
    is just an instance factory for Docker storage
    """

    # Copy this namespace into the docker image
    kwargs.setdefault("files", {})
    kwargs["files"][str(project_path)] = project_name

    # Install the namespace so it's on the Python path
    kwargs.setdefault("extra_dockerfile_commands", [])
    kwargs["extra_dockerfile_commands"].append(f"RUN pip install -e {project_name}")

    return Docker(**kwargs)
then
flow.storage = ProjectDockerStorage()
🙌 1
s

Sagun Garg

12/28/2020, 8:21 AM
@Michael Adkins Please can you share this code example in your github repo, I am facing similar issues
View count: 1