https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • f

    Farid

    02/24/2022, 12:12 AM
    Hi, How could I prevent the downstream mapped task from skipping when an upstream mapped task fails for some of the inputs? It’s mentioned here:
    If the output of one mapped task is used as input to another mapped task, any failed or skipped task will make the subsequent task fail/skip by default.
    How can I change this default behaviour so that the downstream mapped task runs even if the upstream mapped one fails?
    k
    • 2
    • 1
  • j

    Josh

    02/24/2022, 2:38 AM
    I’m trying to run a Prefect Flow on a Docker Agent. The agent is able to pull the image and complete the deployment of the flow run. The flow progresses from
    Scheduled
    to
    Submitted
    state in the UI. But there is no docker container that is ever spun up and the agent doesn’t do anything. Is there a checklist similar to this, but for submitted state? https://discourse.prefect.io/t/why-is-my-flow-stuck-in-a-scheduled-state/73
    a
    • 2
    • 7
  • n

    Nitin Bansal

    02/24/2022, 5:31 AM
    I am running Prefect docker agent in supervisor. But I am getting following error: ModuleNotFoundError: No Module named prefect at line "from prefect.cli import cli". Anyone faced similar issue?
    a
    • 2
    • 6
  • h

    Hedgar

    02/24/2022, 7:05 AM
    Hello everyone, I have my prefect flow py file inside a virtual environment in a remote ec2 instance how do I run the file such that when I log out it will remember and pick up the schedule indicated
    j
    a
    • 3
    • 9
  • j

    jcozar

    02/24/2022, 8:27 AM
    Hi community! I am wondering if I can create a flow run with a specific task states configuration. My use case is to run a subset of the flow tasks, so I could set the state of some tasks to
    Success
    and the flow run just runs the rest of the tasks (obviously with no data dependencies between those tasks). Thank you in advance!
    m
    a
    • 3
    • 4
  • a

    Arturo

    02/24/2022, 9:36 AM
    Hi community! I need your help!! I tell you, when we are trying to run a process from the website (by clicking on the run button), we first have to start the agent in our command line. Is there a way or a link to some documentation to avoid this human intervention in order to be able to execute the process automatically?
    a
    • 2
    • 2
  • f

    Faisal k k

    02/24/2022, 10:23 AM
    Hello, When I'm trying to label a flow as per the documentation ,I'm getting an error for prefect version 1.0.0
    ModuleNotFoundError: No module named 'prefect.environments'
    .. It is there in
    0.15.13
    a
    • 2
    • 1
  • f

    Faisal k k

    02/24/2022, 10:24 AM
    I'm following this doc - https://docs.prefect.io/orchestration/execution/overview.html#labels
  • e

    Edvard Kristiansen

    02/24/2022, 10:50 AM
    Issue with keeping Agents alive with Supervisor. I am having an issue with properly initialising supervisor to keep my agents running. I have generated the generic supervisor code but I keep getting the following error messages: "can't find command 'prefect'". Any ideas what might be the issue here? This is my supervisor code:
    [unix_http_server]
    file=/tmp/supervisor.sock
    
    [supervisord]
    loglevel=debug
    
    [rpcinterface:supervisor]
    supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
    
    [supervisorctl]
    serverurl=unix:///tmp/supervisor.sock
    
    [program:local-agent]
    command=prefect agent local start -l local
    
    [program:docker-agent]
    command=prefect agent docker start -l docker
    The main issue I am trying to solve is keeping the agent up and running at all times. My VM does a scheduled restart once a day which means any agents I spin up manually will be shut down. Is there any way of setting the agent itself up as a docker container which starts on startup like the prefect server itself? It seems like the docker agent itself isnt actually running in docker but rather just issues the flow to its own docker container?
    a
    • 2
    • 4
  • d

    Donnchadh McAuliffe

    02/24/2022, 10:58 AM
    Hey guys, I've upgraded from prefect orion
    2.0a9
    to
    2.0a12
    . When I try run the server
    prefect orion start
    it throws this error:
    sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) table flow already exists
    how do I get around this?
    a
    • 2
    • 3
  • t

    Tomer Cagan

    02/24/2022, 1:18 PM
    I am trying to follow the intro to sub-flow results passing and for some reason, the get_task_run_results task get stack. I am using prefect server and have kubernetes agent (though I am not sure that matter since I use the --execute cli arg.
    a
    m
    • 3
    • 20
  • j

    jspeis

    02/24/2022, 1:36 PM
    I have a flow A with multiple sequential tasks. When the last task of Flow A completes, I want to kick off flow B, could I do something like :
    with Flow("FlowB") as flow:
        task_c = do_task_c()
        # ...
        
    
    with Flow("FlowA") as flow:
        task_a = do_task_a()
        task_b = do_task_b(upstream_tasks=[task_a])
        # ... 
        create_flow_run(flow_name="FlowB")
    though just want to confirm whether if I get create_flow_run to wait for task b to complete? (what would be the best way to arrange this?)
    a
    • 2
    • 5
  • f

    Florian Kühnlenz

    02/24/2022, 2:45 PM
    Hi, we are using prefect together with dask for scheduling ML training runs. We use a temporary cluster. For big grid searches we see that the dask workers accumulate memory over time and then stop accepting tasks. I assume similar to this thread. However none of the recommended solutions helped. Has anyone had similar experience? Could this be related to prefect?
    k
    • 2
    • 10
  • s

    Scott Aefsky

    02/24/2022, 2:53 PM
    Is there any way to set a schedule as "inactive" on registration? Every time our CI/CD system rebuilds and registers, any schedules that we have set as inactive in the UI get reactivated. This is particularly an issue for our dev environment, where we don't usually want flows running on a schedule. Thanks!
    k
    • 2
    • 3
  • s

    shijas km

    02/24/2022, 3:11 PM
    hi in my prefect flow running on prefect cloud am setting the current directory as the directory where my prefect flow available , but the other python modules which is called from the prefect flow is in different directory , this is actually throwing error when called from prefect cloud , error says that particular python module not found since it is another directory if I copy my prefect flow file to the pth where python modules available then no error so is there anyway I can keep my python modules in one folder and flow in different folder
    k
    • 2
    • 6
  • z

    Zach Schumacher

    02/24/2022, 4:27 PM
    hi all, I'm trying to do a conditional flow, but as you can see i get matches, check if the matches are valid, and then only want to continue if they are. Because i am passing matches into those tasks, they run regardless of if the condition is true or not. What am i doing wrong here?
    with flow:
        start_date, end_date = Parameter("start-date", default=None), Parameter("end-date", default=None)
        start_date, end_date = set_date_params(start_date, end_date)
        matches = get_completed_matches_by_date_range(start_date, end_date)
        there_are_matches = are_there_any_matches(matches)
        with case(there_are_matches, True):
            some_data = some_task.run(matches)
    k
    • 2
    • 9
  • g

    Gabriel Milan

    02/24/2022, 5:02 PM
    Hello! Is it possible to retrieve the flow run ID from the
    Flow
    or
    State
    objects? I'm trying to get it using the
    on_failure
    callback for my flow
    k
    • 2
    • 2
  • d

    Donnchadh McAuliffe

    02/24/2022, 5:12 PM
    Hey guys, with regards to retrying tasks on specific exceptions, is the current solution still this? https://github.com/PrefectHQ/prefect/discussions/4956 (similar question asked here)
    k
    • 2
    • 4
  • a

    aaron

    02/24/2022, 6:00 PM
    hey community, I have a question that I've been unable to find an answer for in the docs. say I have two mutually exclusive flows, Flow A and Flow B. Flow B should not be executed until Flow A has completed successfully. how can I declare a dependency on Flow A from within Flow B? dagster has something similar to this and I was hoping that prefect does as well.
    k
    • 2
    • 11
  • w

    William Grim

    02/24/2022, 6:29 PM
    Hey all! I'm trying to use
    DocekrStorage
    , and my
    Dockerfile
    looks like the below. It gets through all the steps except the final step with an error. Does anyone know how to resolve this?
    k
    s
    • 3
    • 12
  • a

    Arcondo Dasilva

    02/24/2022, 6:43 PM
    Hi there,
    👋 3
  • a

    Arcondo Dasilva

    02/24/2022, 6:45 PM
    I'm using prefect orion but I can't find tasks libraries.
    (from prefect.tasks.notifications import email)
    seems not working in orion. am I wrong ?
    k
    • 2
    • 2
  • k

    Kevin Kho

    02/24/2022, 6:57 PM
    Join us in 2 mins to learn how to generate end-to-end lineage graphs of your data pipelines using Monte Carlo and Prefect so you can reduce the time to detection and resolution for critical data incidents: https://www.montecarlodata.com/how-to-build-more-reliable-data-pipelines-with-monte-carlo-and-prefect/
    d
    a
    • 3
    • 4
  • m

    Milton

    02/24/2022, 7:19 PM
    Hi does anyone have an example on using the extra loggers? I have a snippet in
    custom.py
    like this
    import logging
    import sys
    
    def run():
      logger = logging.getLogger("custom")
      logger.setLevel('INFO')
      log_stream = logging.StreamHandler(sys.stdout)
      log_stream.setFormatter(LOG_FORMAT)
      logger.addHandler(log_stream)
    and also have
    [logging]
    # Extra loggers for Prefect log configuration
    extra_loggers = "['custom']"
    in the toml config, but logs from the custom logger don’t seem to be captured by Prefect and show up in the cloud console.
    k
    • 2
    • 49
  • w

    William Grim

    02/24/2022, 8:45 PM
    Alright, I'm slowly pushing further with
    DockerStorage
    and then realized I can't use a
    local
    agent to run flows that use that. No problem... since my local agent is setup in a docker container, I figured I would just use a
    DockerRun
    config. This seems to go alright at the very first, but then it can't pull the image from our ECR. 🧵
    a
    • 2
    • 22
  • d

    David Wang

    02/24/2022, 9:28 PM
    Hi everyone, I’m having some trouble getting an ECS run to work with a custom docker image. Whenever I run the flow it gets stuck in the “submitted for execution” state. I’ve checked the CloudWatch logs but it only shows that the flow deployment was completed. Does anybody have any suggestions for how to resolve this problem? My suspicions are that the custom image is unable to be pulled from the private ECR repo, so does anybody know what the correct permissions I need to access ECR? Below is my RUN_CONFIG for ECSrun:
    RUN_CONFIG = ECSRun(
        labels=["dev"],
        task_role_arn=f"arn:aws:iam::xxx:role/prefectTaskRole",  # a role with S3 permissions
        execution_role_arn="arn:aws:iam::xxx:role/prefectECSAgentTaskExecutionRole",
        run_task_kwargs=dict(cluster="prefectEcsClusterDev"),
        image="<http://xxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:docker-test-v1|xxx.dkr.ecr.us-east-1.amazonaws.com/prefect-orchestration:docker-test-v1>"
    )
    k
    v
    • 3
    • 41
  • g

    Gabriel Milan

    02/24/2022, 9:41 PM
    Hi everyone! I've got two agents on my k8s cluster, both of them are on the
    prefect
    namespace and work fine. I needed to add one more agent to the cluster, but this one should be on another namespace, say
    prefect-agent-xxxx
    . When I do this, I can successfully submit runs to it, they do get deployed, but it doesn't seem to actually run and no logs are shown. I've tried configuring the apollo URL to
    <http://prefect-apollo.prefect.svc.cluster.local:4200>
    and also setting an
    ExternalName
    to it in the
    prefect-agent-xxxx
    namespace and using it, but none of them works. Any ideas on how I could debug this?
    :discourse: 1
    k
    m
    a
    • 4
    • 16
  • h

    Heeje Cho

    02/25/2022, 12:34 AM
    I am a little bit confused at the way prefect treats mapping. If you map to an empty list/tuple do no child tasks get created and the task is skipped? Or is are all child tasks failed automatically?
    k
    • 2
    • 1
  • r

    R Zo

    02/25/2022, 2:46 AM
    Hi, I had a few issues/questions which I will ask in different posts: post 1)I have setup two different python environments. In the first environment the latest prefect version is installed and jobs run to completion. Using the first environment I create a second environment and install tensorflow 2.4.0 which causes some of the packages such as numpy to be downgraded from 1.22.x to 1.19.5 . In the second environment jobs either hang (stop running) or there is a segmentation fault, depending on the OS. Is there a reason for this behavior? Any pointers I could use or info I could provide? I am using the LocalDaskExecutor for running jobs locally.
  • r

    R Zo

    02/25/2022, 2:47 AM
    post 2) I have also made a distributed environment where I am using the prefect server/agent and dask-schedular/worker. When running the dask worker locally (i.e all in the same machine) jobs seems to run okay however when running a worker on a different machine following is some of the errors that I get (the prefect version is the same across the machines): raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'message': 'Invalid task ID', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Invalid task ID'}}}] ERROR - 2022-02-25 13:01:57,769 - task_runner - Failed to retrieve task state with error: ClientError([{'message': 'Invalid task ID', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Invalid task ID'}}}])
Powered by Linen
Title
r

R Zo

02/25/2022, 2:47 AM
post 2) I have also made a distributed environment where I am using the prefect server/agent and dask-schedular/worker. When running the dask worker locally (i.e all in the same machine) jobs seems to run okay however when running a worker on a different machine following is some of the errors that I get (the prefect version is the same across the machines): raise ClientError(result["errors"]) prefect.exceptions.ClientError: [{'message': 'Invalid task ID', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Invalid task ID'}}}] ERROR - 2022-02-25 13:01:57,769 - task_runner - Failed to retrieve task state with error: ClientError([{'message': 'Invalid task ID', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Invalid task ID'}}}])
View count: 1