https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • g

    Greg Roche

    06/09/2021, 2:46 PM
    Hi folks - has anyone else noticed that their AWSSecretsManager tasks have suddenly been taking much longer to run? I'm running flows locally, the task always used to run in less than a second, but now is taking one minute for each call. Just wanted to know if anyone else has experienced this or if it's some problem with my local setup...
    k
    2 replies · 2 participants
  • m

    Michael Wedekindt

    06/09/2021, 4:26 PM
    Hi, I have trouble to run a DbtShellTask on a Windows machine When I run this
    with Flow(name="dbt_flow") as f:
        task = DbtShellTask(
            log_stderr=True,
            log_stdout=True,
            return_all=True,
            stream_output=True,
            profiles_dir=profile_path
    
        )(command='dbt run --models data_hub')
    
    out = f.run()
    I got this error message back
    [2021-06-09 16:06:15+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'dbt_flow'
    [2021-06-09 16:06:15+0200] INFO - prefect.TaskRunner | Task 'DbtShellTask': Starting task run...
    [2021-06-09 16:06:15+0200] INFO - prefect.DbtShellTask | /bin/bash: C:\Users\M7856~1.\AppData\Local\Temp\prefect-5yoqhqg_: No such file or directory
    [2021-06-09 16:06:15+0200] ERROR - prefect.DbtShellTask | Command failed with exit code 127
    [2021-06-09 16:06:15+0200] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('Command failed with exit code 127')
    [2021-06-09 16:06:15+0200] INFO - prefect.TaskRunner | Task 'DbtShellTask': Finished task run for task with final state: 'Failed'
    [2021-06-09 16:06:15+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
    When I try this with the common ShellTask I get the same error. Here some background facts: I already added bash to path and I can open a bash console I use Visual Studio Code for developing My Python Version is 3.9.5 My prefect Version is 0.14.21 Usually I run my dbt locally in the Ubuntu Subsystem on Windows but could run dbt in windows cmd as well Thanks in advance for help and best regards, Michael
    z
    k
    8 replies · 3 participants
  • a

    Andrew Hannigan

    06/09/2021, 6:11 PM
    I’m getting the following error occasionally when running a Prefect flow using Dask CloudProvider.
    ClientException('An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.')
    k
    6 replies · 2 participants
  • f

    Fina Silva-Santisteban

    06/09/2021, 6:38 PM
    Hi everyone, I’m currently using the
    flow.register()
    method to register flows, I also set the flows’ run config and storage along with that:
    flow.run_config = (some run config)
    flow.storage = (some storage config)
    flow.register(project_name=(project name))
    I’d like to switch to using the prefect cli’s register functionality. How can I point it to the run config and storage?
    k
    9 replies · 2 participants
  • p

    Peter Roelants

    06/09/2021, 7:14 PM
    Hi Prefect, I started seeing the following typing errors when checking my code with `mypy`:
    error: Signature of "run" incompatible with supertype "Task"
    For example for following Task:
    class RestTaskTest(prefect.Task):
        def run(self, string_val: str, int_val: int) -> TaskTestResult:
            return TaskTestResult(string_val=string_val, int_val=int_val)
    This makes sense since the definition of
    Task.run
    is
    def run(self) -> None:
    . Is there any way to make mypy work together with overriding of Task.run in custom tasks? Or is the only option to ignore these with
    # type: ignore
    ?
    k
    z
    7 replies · 3 participants
  • z

    Zach Schumacher

    06/09/2021, 7:40 PM
    I assume the answer is yes, but just want sanity check. are datetimes on the context all UTC? https://docs.prefect.io/api/latest/utilities/context.html#context
    m
    2 replies · 2 participants
  • j

    Jonathan Chu

    06/09/2021, 7:57 PM
    what's the difference between
    Flow Concurrency
    and
    Task Concurrency
    ? i have a flow running right now, and i can see it shows as
    1 running flow (100%)
    but then
    0 running tasks (0%)
    m
    1 reply · 2 participants
  • l

    Leon Kozlowski

    06/09/2021, 9:20 PM
    Hi all, I’m in the process of migrating a mono-repo of scheduled tasks over to prefect. Does anyone have any deployment patterns they utilize for registration via CLI while maintaining each ‘service’ as an independent docker image with a Dockerfile in each service subdirectory? (Example Below)
    k
    3 replies · 2 participants
  • f

    Felipe Saldana

    06/09/2021, 10:14 PM
    I am just looking at the examples and I dont see many that have a main function. Is there a reason why they are not used?
    if __name__ == "__main__":
    k
    j
    4 replies · 3 participants
  • b

    Brett Naul

    06/09/2021, 10:29 PM
    we've recently been seeing StartFlowRun create multiple runs with the same
    idempotency_key
    <24 hours apart (say 30 mins). is the idempotency key queryable from graphql so we could try to debug what's going on?
    n
    7 replies · 2 participants
  • b

    Ben Collier

    06/10/2021, 5:17 AM
    Hi all - I’ve been trying to use
    client.get_flow_run_info(flow_run_id)
    with a flow run id in the format “c2686800-3c0f-4b67-83b2-f85fa6fd6773” I’m getting the following:
    'message': 'invalid input syntax for type uuid: "c2686800-3c0f-4b67-83b2-f85fa6"'
    I assume I’m doing something fairly stupid - could someone explain why the UUID is in the wrong format?
    b
    n
    3 replies · 3 participants
  • t

    Talha

    06/10/2021, 8:27 AM
    Hi, I am trying to use Git repository for storage and run my flows but the flow is not working giving error that the unable to find directory. Can you help me here. Is there any working use case example which I look at which uses git Storage ? Thanks !
    j
    16 replies · 2 participants
  • n

    Noah Holm

    06/10/2021, 9:22 AM
    Based on this page, it looks like you can use environment variables to set values for agent options. Is there any documentation on which environment variables map to which agent options? The only page I can find is here which doesn’t mention the environment variable option. So far we’ve been creating our own image with a custom entrypoint for the agent to set all the option but it would be super neat if we could avoid that.
    j
    r
    6 replies · 3 participants
  • t

    Thomas Nyegaard-Signori

    06/10/2021, 11:05 AM
    Hi all, I am still dipping my toes in the Prefect-ocean and I have some questions regarding flows that consist of tasks that all run some custom Docker image on a Kubernetes (AKS) cluster. Currently I have set up an in-cluster Kubernetes agent with the 
    prefect agent kubernetes install -rbac ...
     command, so the RBAC is functioning on the agent. When starting a very simple flow that consists of a single 
    RunNamespacedJob
     task running the custom Docker image the job pod starting the flow runs into RBAC issues, but the 
    RunNamespacedJob
     task pod runs fine. My question is, how to handle job pods that are going to spawn several jobs on Kubernetes and the issues that arise with the RBAC on these pods. Am I thinking about this incorrectly? The error, for reference:
    HTTP response headers: HTTPHeaderDict({'Audit-Id': '67b79e3e-ab13-45ee-8ad5-2ae1769c6a7f', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Thu, 10 Jun 2021 09:21:02 GMT', 'Content-Length': '372'})
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"cmems-historical\" is forbidden: User \"system:serviceaccount:prefect-zone:default\" cannot get resource \"jobs/status\" in API group \"batch\" in the namespace \"prefect-zone\"","reason":"Forbidden","details":{"name":"cmems-historical","group":"batch","kind":"jobs"},"code":403}
    c
    t
    11 replies · 3 participants
  • d

    Dmytro Kostochko

    06/10/2021, 1:34 PM
    Hi all. We think about using Prefect for ETL for our project and I try to implement some POC. Basic idea is load documents from Elasticsearch(
    detect_new_documents
    ), load data that should be extracted(
    get_skill_extractor
    ). Also this data should be periodically reloaded from database if new version was added. Save results(
    save_skills
    ). I use such flow:
    with Flow("Analyze new documents") as flow:
        documents = detect_new_documents()
        extracted_skills = extract_skills.map(documents, unmapped(get_skill_extractor()))
        save_skills.map(documents, extracted_skills)
    And looks like I need some stateful worker to keep instance of skill_extractor in memory and share for all tasks, because now it is passed with tasks to Dask workers, and serializing/deserializing takes more time then processing. Maybe someone can give some advice how this can be implemented with Prefect?
    k
    5 replies · 2 participants
  • d

    Devin McCabe

    06/10/2021, 1:50 PM
    Prefect repo maintainers: Would someone mind taking another look at this closed issue and my comment at the bottom? I think there's a pretty serious (but subtle) bug regarding what happens when prefect modules are imported. Namely, merely importing a class or method from a Prefect module does things before you've even called it. https://github.com/PrefectHQ/prefect/issues/4331 Would be happy to write up a new issue if that would be helpful.
    c
    4 replies · 2 participants
  • c

    Christian Eik

    06/10/2021, 2:29 PM
    hey, is there a way to prevent prefect from writing what my tasks return to the results folder? I have checkpointing turned off, and config values in the task below return False.
    @task(state_handlers=[post_task_fail])
    def add_headers(headers: list, data: tuple) -> List:
        logger = prefect.context.get("logger")
        <http://logger.info|logger.info>(prefect.config.flows.checkpointing)
        <http://logger.info|logger.info>(prefect.config.tasks.checkpointing)
    
        return headers + list(data)
    *edit: but -> and config values ...
    k
    19 replies · 2 participants
  • j

    John Ramirez

    06/10/2021, 7:47 PM
    Hey everyone! Can the Progres Tasks support connection to an SSH Forward Tunneler object. Also can I have an example of how to use the Postgres task
    k
    j
    3 replies · 3 participants
  • b

    Ben Muller

    06/10/2021, 8:50 PM
    Hi All, I am trying to launch my
    prefect agent
    in an ECS Fargate cluster. I am feeding in all my secrets via the cdk namely,
    RUNNER_TOKEN_FARGATE_AGENT
    ,
    AWS_SECRET_ACCESS_KEY
    and
    AWS_ACCESS_KEY_ID
    I am getting some type of auth error when I run my image command
    CMD           [ "prefect", "agent", "ecs", "start", "--token", "$RUNNER_TOKEN_FARGATE_AGENT", \
                        "--task-role-arn=arn:aws:iam::********:role/ECSTaskS3Role", \
                        "--log-level", "INFO", "--label", "fargate-dev", "--label", "s3-flow-storage", \
                        "--name", "fargate-demo" ]
    The trace in 🧵 `````` Any ideas would be great! I assumed I dont have to register the agent with Prefect cloud as I did this with Ec2 and it didnt have to be done. Thanks in advance for any help on this...
    j
    n
    14 replies · 3 participants
  • m

    Michael Law

    06/10/2021, 8:55 PM
    Hey folks, I was wondering if anyone had some docs or advice around the best way setup running agents on an AKS cluster? I am wondering, do I need to package my jobs up to ACR or some other container registers? I’m struggling to identify from the docs how the DockerStorage and the KubernetesRun config is used. I would find a diagram useful here to show the relationship between all of these things. I don’t suppose anyone can ELI5 it for me or had a diagram at hand which details the architecture of it all? Thanks again.
    k
    m
    +1
    42 replies · 4 participants
  • t

    Tim Enders

    06/10/2021, 9:52 PM
    Is anybody using the Great Expectations tasks, and do they work with the newer v3 Great Expectations API?
    k
    4 replies · 2 participants
  • j

    joshua mclellan

    06/10/2021, 10:09 PM
    hey so im working on my first real flow as part of a POC for my org and im just trying to wrap my head around what the tradeoffs are in regards to a flow of flows or one big flow. We have a lot of already made tasks and im not sure if making too many flows will cause issues in the long run or of having one huge flow will have its own problems. Any thoughts on what the tradeoffs are and how one should go about balancing against the two?
    k
    1 reply · 2 participants
  • r

    Robert Bastian

    06/10/2021, 10:13 PM
    Hello Prefect! Is there a way to limit users access to specific projects in Prefect Cloud?
    k
    1 reply · 2 participants
  • b

    Ben Muller

    06/11/2021, 4:10 AM
    Hi Prefect devs, me again. I am oh so close to finish my deployment system for prefect for my company but am running into an issue. I have my agent deployed in a cluster and running successfully but then I am trying to execute a flow with a menial logging task to make sure everything works. It is running in the same cluster as the agent. When the task definition spins up the task, it is hit with an error:
    ResourceInitializationError: unable to pull secrets or registry auth: execution resource retrieval failed: unable to retrieve ecr registry auth: service call has been retried 1 time(s): AccessDeniedException: User: arn:aws:sts::*****:assumed-rol...
    Both my execution role and task role have admin access ( while i am debugging ) so i dont think it is a role based issue. Wondering if anyone has come across something like this before and can help out. Oh and this is running on ECS and fargate, I have attached my flow code in the 🧵
    z
    n
    +1
    11 replies · 4 participants
  • s

    Shivam Shrey

    06/11/2021, 7:09 AM
    Hi I have the following pub-sub use case to solve with Prefect flows : • Task A triggers a service via HTTP. Upon successful invocation, the service returns a MQ topic which needs to be subscribed to. • Task B needs to subscribe to this topic (via MQ). The service (it is a long running process) asynchronously publishes on this topic which the Prefect task waits for and then perform subsequent tasks. What are the best ways to do Task B in Prefect?
    e
    2 replies · 2 participants
  • e

    Emma Rizzi

    06/11/2021, 12:59 PM
    Hello, I'm trying to use Prefect Server deployed on a AWS instance with a ECS agent running on a ECS cluster. I created a small hello world task to print 'hello' in the logger, which runs correctly when I create a Local agent on the same instance where the server is deployed, but I can't manage to get the cluster working... Heres what happens : • I created a ECS Agent running on my instance with Prefect Server, and configured with my cluster arn • i register my flow and quick start on the UI, on AWS console I can see the task begin created, with status going from Pending to Running to Stopped. Although the task definition contains the tag [INACTIVE] which seems weird 🤔 • On prefect UI the task is stuck at the "Submitted" state and nothing is displayed in the logs I tried to follow this and this and added task_role_arn and execution_role_arn with lots of permissions but it doesnt seem to be the problem. I also tried to add GCS as storage option, no success. I have no idea of what's causing this issue so here I am hoping someone knows 🙂
    j
    b
    +1
    20 replies · 4 participants
  • p

    Pedro Machado

    06/11/2021, 1:36 PM
    Hi everyone. I am trying to use
    PrefectResult
    in a flow that has a resource manager. The idea is to easily allow flow restarts without having to set up a different result backend. However, I am running into an issue with the resource manager tasks. The
    setup
    method of the resource manager returns an object that is not json serializable which causes the task to fail. Do you have any suggestions to workaround this? Is the only solution to use a different result backend for this task? Thanks!
    k
    9 replies · 2 participants
  • m

    Marie

    06/11/2021, 2:04 PM
    Hi everyone, I'm trying to bind the outputs of a task that returns 2 results to 2 different tasks. It would be easy with the functional API but I have to use the imperative API for an unrelated reason. I have been trying to use
    bind
    a few different ways without being able to separate the outputs of the first task (see simple example below). Does anyone know how to do this? Thanks!
    flow = prefect.Flow("example-flow")
    
    flow.add_task(task_a)  # Task A returns 2 outputs
    flow.add_task(task_b)  # Task B uses the first output of task A
    flow.add_task(task_c)  # Task C uses the first output of task A
    
    task_b.bind(job=task_a, flow=flow)
    task_c.bind(report=task_a, flow=flow)
    j
    m
    8 replies · 3 participants
  • t

    Talha

    06/11/2021, 2:13 PM
    Hello, I have using Git storage for my flow and I have successfully set up the Git repository. Now I am running my local agent. When I run the flow using my Local agent I get the following error: "Failed to load and execute Flow's environment: NotADirectoryError(20, 'The directory name is invalid')". But if I run a local agent through a docker container using docker-compose file. It works fine. Can someone please help me what can be possible problem. I am lost here. I am trying to find anything in documentation that can be helpful, but haven't had much of success
    k
    m
    148 replies · 3 participants
  • n

    Nathan Atkins

    06/11/2021, 4:13 PM
    Is there a way to connect to the Dask UI when running with LocalDaskExecutor? Do I need to run a local cluster and DaskExecutor to see the UI? I was expecting to see it at localhost:8787/status. I have Bokeh installed.
    k
    3 replies · 2 participants
Powered by Linen
Title
n

Nathan Atkins

06/11/2021, 4:13 PM
Is there a way to connect to the Dask UI when running with LocalDaskExecutor? Do I need to run a local cluster and DaskExecutor to see the UI? I was expecting to see it at localhost:8787/status. I have Bokeh installed.
I got it working by using the DaskExecutor instead of the LocalDaskExecutor. I just didn't remember having to do that in the past.
k

Kevin Kho

06/11/2021, 4:32 PM
Hey @Nathan Atkins, did the LocalDaskExecutor give you the dashboard before?
n

Nathan Atkins

06/11/2021, 4:35 PM
It's been a while so I can't say with any confidence one way or another.
View count: 2