https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Joël Luijmes

    12/08/2021, 9:28 AM
    Hi! I’m working on a flow which requires large concurrency (tens of thousands of tasks). Previously I’ve used the
    LocalDaskExecutor
    as my requirements for concurrency were low. However with this flow I want to use the real
    DaskExecutor
    . As I’m aware Prefect is capable of creating a temporal Dask cluster for the running flow (using
    KubeCluster
    ), but alternatively can use an existing deployed Dask cluster (or even dask gateway if I’m not mistaken). Note, I’m running in Kubernetes, so adaptive scaling could be interesting. Are there any guidelines / suggestions / experiences for using a temporal vs. static Dask cluster? Additionally, if I have
    Docker
    as storage, how can I supply the correct image tag containing my modules/dependencies while registering the flow?
    ✅ 1
    a
    • 2
    • 7
  • i

    Ivan Zaikin

    12/08/2021, 10:00 AM
    Hi! I am trying to run
    prefect==2.0a5
    in Docker. Here is my Dockerfile:
    FROM python:3.8
    RUN adduser prefect
    USER prefect
    WORKDIR /home/prefect
    COPY --chown=prefect:prefect <http://requirements.in|requirements.in> ./
    ENV LANG C.UTF-8
    ENV LC_ALL C.UTF-8
    ENV PATH="/home/prefect/.local/bin:${PATH}"
    RUN pip install --user --no-cache-dir -r <http://requirements.in|requirements.in>
    COPY --chown=prefect:prefect flow.py flow_deployment.py ./
    Inside the container I create a deployment and several flow runs, but all of them are marked as “late”. Here is the terminal output:
    $ prefect orion start --host 0.0.0.0 --log-level DEBUG
    Starting Orion API server...
    INFO:     Started server process [71]
    INFO:     Waiting for application startup.
    09:54:06.189 | Scheduler service scheduled to start in-app
    09:54:06.189 | MarkLateRuns service scheduled to start in-app
    INFO:     Application startup complete.
    INFO:     Uvicorn running on <http://0.0.0.0:4200> (Press CTRL+C to quit)
    09:54:06.501 | Finished monitoring for late runs.
    09:54:06.538 | Scheduled 0 runs.
    Starting agent connected to <http://0.0.0.0:4200/api/>...
    Agent started! Checking for flow runs...
    09:54:07.298 | Submitting flow run 'f0855bd3-2eab-4346-ad3a-2e237a688faa'
    09:54:07.298 | Submitting flow run '521624eb-cec8-4f9b-9e92-f203e104586a'
    09:54:07.298 | Submitting flow run 'f25da7b6-7893-4107-aaa3-df22377e2ccf'
    09:54:07.299 | Submitting flow run '2895bdfa-082c-43b5-afc2-d0dcc269bf51'
    09:54:07.299 | Submitting flow run '2b445dbd-58b4-4acb-89b7-1f6782dc0ec9'
    09:54:07.300 | Completed submission of flow run 'f0855bd3-2eab-4346-ad3a-2e237a688faa'
    09:54:07.300 | Completed submission of flow run '521624eb-cec8-4f9b-9e92-f203e104586a'
    09:54:07.300 | Completed submission of flow run 'f25da7b6-7893-4107-aaa3-df22377e2ccf'
    09:54:07.300 | Completed submission of flow run '2895bdfa-082c-43b5-afc2-d0dcc269bf51'
    09:54:07.300 | Completed submission of flow run '2b445dbd-58b4-4acb-89b7-1f6782dc0ec9'
    09:54:08.969 | Flow run '521624eb-cec8-4f9b-9e92-f203e104586a' exited with exception: KeyError('__main__')
    09:54:08.975 | Flow run '2895bdfa-082c-43b5-afc2-d0dcc269bf51' exited with exception: KeyError('__main__')
    09:54:08.976 | Flow run 'f25da7b6-7893-4107-aaa3-df22377e2ccf' exited with exception: KeyError('__main__')
    09:54:08.979 | Flow run '2b445dbd-58b4-4acb-89b7-1f6782dc0ec9' exited with exception: KeyError('__main__')
    09:54:08.980 | Flow run 'f0855bd3-2eab-4346-ad3a-2e237a688faa' exited with exception: KeyError('__main__')
    Is there a way to debug these KeyErrors?
    a
    m
    • 3
    • 28
  • i

    Ievgenii Martynenko

    12/08/2021, 1:04 PM
    Good day, Have a question about logging. We have a separate own library "LIB" that is doing some magic. This library has own root logger defined
    logger = logging.getLogger()
    . The idea is to extend Prefect Task class and run some magic using LIB library. I've read that we can add as many NAMED loggers as we want using https://docs.prefect.io/core/concepts/logging.html#extra-loggers, but since with have root one, what happens now is: Prefect logs its records as usual, but messages from LIB are not passed. I suppose this is due to LIB logger is root one. Have you ever faced such situation?
    a
    • 2
    • 5
  • j

    Justin

    12/08/2021, 2:32 PM
    Hi guys, maybe someone can point me in the right direction: I have successfully deployed Prefect to a local Kubernetes cluster, have UI & Graphql access, but I am a little lost with the deployment of Flows to the cluster... As far as I got it I need to create a Dockerimage, and then a flow using KubernetesRun, right? But how do I deploy it then?
    flow = Flow("taskname")
    
    flow.run_config = KubernetesRun(env={"POSTGRES_USER": "2234234",
                                        ...
                                    },
                                    image = "dockerimage:latest"
                                    )
    
    flow.register('projectname')
    a
    • 2
    • 8
  • v

    Vadym Dytyniak

    12/08/2021, 2:54 PM
    Hi. Is it any way to pass env variable in ECSRun(env={...}) as Prefect Secret?
    a
    k
    • 3
    • 31
  • v

    Vipul

    12/08/2021, 2:59 PM
    Hi All, Orion query, I tried setting the PREFECT_ORION_API_HOST and PREFECT_ORION_API_PORT with the assumption that it would override the UI host and port. Though when I run command "prefect orion start", I can see it is starting on local host and port 4200. Am I doing something wrong?
    k
    a
    • 3
    • 12
  • j

    Jason Motley

    12/08/2021, 3:09 PM
    Is there a way to feed a web hook into a specific google sheet and cell? I.e. I'm tracking all pipelines in a G Sheet, I'd like Cell C3 to say success/failure and a timestamp, C4 to say it for a different flow, etc.
    k
    • 2
    • 17
  • l

    Leon Kozlowski

    12/08/2021, 4:18 PM
    For a k8s agent, should all of the pods created for a flow runs inherit the
    Environment:*
    from the agent? I am having issues persisting a service account and role ARN that give flows privileges to hit aws resources (details in thread)
    k
    j
    +2
    • 5
    • 26
  • s

    Sam Werbalowsky

    12/08/2021, 4:31 PM
    Is there any issue with wrapping tasks together? For example, I want to use the
    StringFormatter
    but pass in a file that is opened…is something like this bad practice:
    @task
    def format_sql_file(file):
        with open(file, 'r') as sql:
            string_to_format = sql.read()
        return StringFormatter().run(var1='my_value', var2='other_value', template = string_to_format)
    k
    • 2
    • 2
  • p

    Pedro Machado

    12/08/2021, 6:37 PM
    Hi there. I have a parent flow that kicks off a child flow passing environment variables via a Kubernetes run config. The child flow runs a script using the ShellTask and explicitly passes environment variables to it. What would be the best way to get the environment variables from the run config to forward to the ShellTask? I'd probably want to override the hardcoded variables in the child flow with the variables passed via the run config. Thanks!
    a
    k
    • 3
    • 23
  • b

    Brian S

    12/08/2021, 7:30 PM
    Hello, I hope this is the right place to ask for help but if not please let me know. I've been working on getting Prefect Cloud set up on AWS and things are going quite well. I've hit a snag that I can't seem to resolve. I'm getting
    Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'xxx'")
    when submitting the flow to and ECS(Fargate). The module in question is a custom Python module that I include in the agent Docker image and it's installed via pip install. I feel like I'm missing something here but from what I've read, that should work. Does anyone have a tip on how I might get the flow execution to be aware of this custom module? Is the Agent the proper place to install the module?
    k
    t
    • 3
    • 9
  • t

    Tom Shaffner

    12/08/2021, 10:23 PM
    Here's a puzzle for you. I have a number of flows with tasks that query oracle. In most of these flows the task works fine. In one instance though, the oracle query simply never returns. The code to query oracle itself is reused between the flows; it imports a library for talking to oracle. Things I've checked: The query I'm using worked until recently at which point I made a minor change and this behavior began, but the new query works fine in sql developer or if I use my old (pre-Prefect) python process; in that case it returns in 11.66 seconds. I've tweaked the query every way I can think of to fix it, to no avail. If I point the flow at a different .sql file it works fine. If I have the flow load the sql from file and print it, and then copy that query to sql developer it works fine. If I manually set the same library to call the same query code using the same .sql file in the python environment being used by Prefect, it works fine. As best I can tell then, this error only occurs in Prefect, and it manifests as a complete failure to return so I don't even have an error code to check. Is there anything anyone can think of that might cause such behavior? Or anything else I could check that might be different?
    k
    m
    • 3
    • 190
  • j

    Jacob Blanco

    12/09/2021, 2:06 AM
    Is there gonna be a VOD available of the livestream?? My team and I are in JST so watching it live is a nono
    k
    • 2
    • 1
  • a

    Anh Nguyen

    12/09/2021, 3:42 AM
    Hi all, I wanna kick-off the task depends on many before task. How to do that ?
    k
    • 2
    • 2
  • k

    Klemen Strojan

    12/09/2021, 12:02 PM
    Hey all - I can’t remove one of our agents (k8s) from the UI. What is the mutation to do this with the API?
    a
    • 2
    • 22
  • k

    Klemen Strojan

    12/09/2021, 12:56 PM
    Here is another one - we’re trying to deploy a K8s agent on AKS, K8s version
    1.22.2
    After running
    prefect agent kubernetes install \
        -k ${PREFECT_DE_PROD_API} \
        --namespace prefect-latest \
        --mem-request=16Gi \
        --mem-limit=128Gi \
        --cpu-request=4 \
        --cpu-limit=32 \
        --image-pull-secrets azurecr-secret \
        --label k8s \
        --label prod \
        --label latest \
        --rbac | kubectl apply --namespace=prefect-latest -f -
    we get:
    deployment.apps/prefect-agent created
    <http://role.rbac.authorization.k8s.io/prefect-agent-rbac|role.rbac.authorization.k8s.io/prefect-agent-rbac> unchanged
    error: unable to recognize "STDIN": no matches for kind "RoleBinding" in version "<http://rbac.authorization.k8s.io/v1beta1|rbac.authorization.k8s.io/v1beta1>"
    We should be using
    <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1>
    What should we do?
    a
    • 2
    • 2
  • v

    Vincent Chéry

    12/09/2021, 1:40 PM
    Hi all, I'm having trouble with checkpointing and could not find the answer in the docs nor find a work around. I have a
    LocalAgent
    running which gets its orders from a private prefect server instance and runs flow runs locally, and I basically do not want to persist any result on disk. Following the docs I have defined all my tasks with
    @task(checkpoint=False)
    , which does the job for custom tasks but does not give me a solution for prefect tasks like
    prefect.tasks.control_flow.merge
    . I have tried setting task checkpointing to
    false
    globally:
    [tasks]
        checkpointing = false
        [tasks.defaults]
            checkpointing = false
    without success. Any idea? Thanks a lot in advance!
    a
    k
    • 3
    • 46
  • l

    Lucas Hosoya

    12/09/2021, 2:16 PM
    Hi, I got a question about log retention in Prefect Cloud (Starter and Standard) When it says only "1-2 weeks of run history", does that mean that the whole history is deleted? (UI, Interactive API) • Is there any way to keep logs other than the Enterprise plan? (For Prefect Cloud server, not custom made server) I've tried: • Custom logs in tasks/flows to storage. -> Not very useful or effective since involves work that is not necessary, since logs already exist in Prefect Cloud.
    a
    • 2
    • 5
  • r

    Ryan Brennan

    12/09/2021, 2:56 PM
    Hi All - I’m running into a weird issue with Secrets. Sometimes a task will fail with the error message below even though the Secret is stored correctly in the tenant:
    KeyError('The secret <SECRET NAME> was not found.  Please ensure that it was set correctly in your tenant
    This seems to happen the most when I’m using task mapping but it seems pretty random. 100 iterations of the mapped task will work, then the 101st fails with the error above, then the next 100 are successful. Has anyone ran into this issue or have any ideas on what might be happening?
    k
    a
    • 3
    • 4
  • j

    Jason Motley

    12/09/2021, 4:07 PM
    I need to split a dataframe into N parts using numpy.arraysplit prior to loading into a database. Is this best done with a loop in the load task or in the flow itself?
    k
    • 2
    • 9
  • j

    Josh

    12/09/2021, 7:25 PM
    If I have multiple clocks for a flow’s schedule that all trigger at the same time, only 10 flows runs are scheduled. I also only see 10 upcoming runs when I view the flow in the UI. If I want more than 10 flow runs for the same flow to start at around the same time, how do I do this? I don’t have any concurrency limits on my flows or tasks (unlimited for both).
    k
    a
    • 3
    • 15
  • n

    Nikhil Sthalekar

    12/09/2021, 7:31 PM
    Is anyone here using Prefect to orchestrate Ray jobs? I’m having trouble getting the right dependencies on the prefect agents.
    k
    • 2
    • 1
  • l

    Leon Kozlowski

    12/09/2021, 9:12 PM
    Is there a list anywhere with a subset or all data available in
    prefect.context
    inside a flow run?
    k
    • 2
    • 2
  • j

    Joseph Mathes

    12/09/2021, 10:41 PM
    I can't find a consistent answer on how to get prefect to run on a coiled-managed dask cluster. Will a DaskExecutor start the named cluster for me? Does it already need to be running? How do I give prefect.io credentials for coiled.io? Should I start the cluster inside a task, before I start the flow, or what?
    k
    g
    • 3
    • 39
  • k

    Kevin Kho

    12/10/2021, 12:12 AM
    Join us for a livestream tomorrow
  • j

    John T

    12/10/2021, 12:33 AM
    Hey guys, I’m currently struggling with integrating datadog into my flow. I want to have a datadog tracer throughout my entire flow, but when I try to pass around I’m encountering a
    thread.lock
    issue. Is there anything I could do about this? I also tried ResourceManager to see if I could avoid it, but that also is not working.
    k
    • 2
    • 35
  • p

    Pedro Machado

    12/10/2021, 12:51 AM
    Hi. Is it possible to specify a schedule that runs on the first Monday of each month?
    k
    • 2
    • 7
  • m

    Maria

    12/10/2021, 6:32 AM
    Hi, Is it possible to override db_name, user and host values in the runtime for postgres task?
    postgres_execute = PostgresExecute(    db_name="abc", user="peter", host="my_host", port=5432,)
    I have multiple databases (dev, test, demo, +prod db per client), and I need to query/load data into the relevant one. I have a config file and I was hoping I could use Parameter to tell what config line to choose - but it seems like I cannot override those params... What is the recommended approach for such use case?
    a
    k
    • 3
    • 6
  • c

    Cristian Toma

    12/10/2021, 6:53 AM
    Hi all, Playing with Prefect for some time now and I love it so far. I have a question regarding passing data in a flow to all tasks. Let's say I have a task A that returns some data (I know it returns actually a task object) that I want to use through all future tasks. I can pass the result as a parameter to all of the tasks but if I have this situation with more than 5 parameters (the return data from 5 tasks A-E) it's getting ugly and hard to maintain. Is there a way to set the values as global somehow (I've tried using context but without success) like in an OOP approach?
    a
    • 2
    • 4
  • r

    Ruben Sik

    12/10/2021, 8:23 AM
    Hi all, does anyone have experience in imports when running a prefect docker agent? While on my local docker image my imports work fine, when running a flow via a docker agent, modules cannot be found?
    a
    • 2
    • 9
Powered by Linen
Title
r

Ruben Sik

12/10/2021, 8:23 AM
Hi all, does anyone have experience in imports when running a prefect docker agent? While on my local docker image my imports work fine, when running a flow via a docker agent, modules cannot be found?
More info: dockerfile:
FROM python:3.8-slim-buster
WORKDIR /app
COPY requirements.txt ./requirements.txt
RUN pip install "prefect[azure, gitlab]"
RUN pip install -r requirements.txt
COPY . .
ENV PYTHONPATH "${PYTHONPATH}:/app"
Furthermore we are using gitlab storage and our folder structure looks like:
|__app     
|  └── __init__.py
|  └── flows
|  └── └── __init__.py
|  └── └── test_flow.py
|  └── └── helper_script.py
|  └── └── utils
|  └── └── └── __init__.py
|  └── └── └── helper_script.py
|  └── utils
|  └── └── __init__.py
|  └── └── helper_script.py
We've copied the helper_script.py to test multiple location to try various imports. When we run os.listdir() in our flow script "test_flow.py", the logger seems to show our utils folder is in the current working directory. This os.listdir() results in [utils, flows] which should mean an import possibily of "import utils.helper_script" (whichs works on command line of the docker image but not when running the flow via a docker agent). Are we overlooking something?
a

Anna Geller

12/10/2021, 10:07 AM
@Ruben Sik in general, it would be easier for you if you would add
setup.py
and make
utils
a package. This way your package will be importable anywhere (in a virtual environment, Docker image etc) and you don’t have to manually add directories to the PYTHONPATH and it makes your code much cleaner. This post explains how to do it. I also have an example Dockerfile and setup.py in this repo.
r

Ruben Sik

12/10/2021, 1:37 PM
Tnx @Anna Geller I read about this strategy this morning and gave it a go. Following the guide, the pip environment shows the created 'utils' as in the foto below. When inspecting this package, it seems as there is content in the package. For the steps i've followed Kevin's guide with this structure: app/ ├── utils/ │ ├── init.py │ ├── uDatabase.py │ ├── uDatalake.py │ ├── uEmail.py │ ├── uSharepoint.py ├── flows/ │ ├── flow.py ├── requirements.txt ├── Dockerfile └── setup.py However when running the docker agent and executing my flow.py which includes the imports of utils/uEmail, no import seems to work. Neither: import utils from utils import uEmail import utils.uEmail app.utils.uEmail any idea?
a

Anna Geller

12/10/2021, 1:52 PM
@Ruben Sik I think I know what is the issue. You have the
app
directory also on your local, right?
in that case the copy command for you would be:
COPY app/. .
otherwise you’re docker image gets this utils directory:
/app/app/utils
so the import would have to be:
from app.app.utils import ...
If you want to start from scratch, you can try the approach from this repo. It’s based on the prefect base image and also contains a flow_utilities folder. You could follow the same structure and even pretty much reuse the Dockerfile:
FROM prefecthq/prefect:0.15.10-python3.8
RUN /usr/local/bin/python -m pip install --upgrade pip
WORKDIR /opt/prefect
COPY utils/ /opt/prefect/utils/
COPY requirements.txt .
COPY setup.py .
RUN pip install .
COPY flows/ /opt/prefect/flows/
note that copying the flows into a Docker images is only relevant when using Docker storage with
stored_as_script=True
. If you wanna use this pattern, check out this example
r

Ruben Sik

12/10/2021, 3:02 PM
Tnx Anna, when using your dockerfile as a template it seems to work now. Error seems to indeed be due to the working directory/ copying steps before!
🙌 1
View count: 3