https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • i

    Issam Assafi

    09/23/2021, 12:38 PM
    Hey, how to do nested mapping?
    r
    e
    +2
    15 replies · 5 participants
  • s

    Steve s

    09/23/2021, 1:03 PM
    hey everyone, I have a question about passing data between flows. My scenario is that I need to pass a pandas DataFrame from one flow to another. Right now I'm converting the DataFrame to JSON and passing it in as a Parameter. It works well, but I've only tried for relatively small datasets. I'm wondering: what are the limits of scale for this approach, and should I be handling this differently?
    e
    k
    4 replies · 3 participants
  • d

    Dmitry Kuleshov

    09/23/2021, 1:18 PM
    Hi all! Didn't find information if I can limit number of parallel jobs created by prefect agent (kubernetes agent). Can someone either throw me a link or answer. Thanks
    k
    2 replies · 2 participants
  • a

    Anze Kravanja

    09/23/2021, 2:49 PM
    Hello! I’m curious if in Prefect Server UI you can unarchive a flow? This would be the use case, I have multiple versions of the flow, the latest one is running, but I find there is a bug and I want to roll-back to the previous version. I know I can delete the latest version, but that doesn’t make the now latest version unarchived. Thank you!
    k
    2 replies · 2 participants
  • s

    Sean Talia

    09/23/2021, 8:14 PM
    Hey everyone – has anybody ever run into an issue where you're running flows with a
    DockerAgent
    , and for some reason, the agent seems to keep a flow's container running long after the flow actually finished?
    k
    a
    39 replies · 3 participants
  • k

    Kathryn Klarich

    09/23/2021, 8:29 PM
    Hey all, We are trying to create a terminal state handler to dump the flow logs at the end of a flow and save them somewhere permanently. I am assuming we need to write a graphql query to do so, but I can't figure out how to do this. Is there any documentation or example you could point me to? I see this function
    *prefect.client.client.Client.write_run_logs*(logs)
    - but i want to download, not upload the logs
    k
    4 replies · 2 participants
  • d

    Daniel Saxton

    09/23/2021, 9:17 PM
    question about the Prefect cli: with
    prefect run
    is there anything like a
    -d
    flag (as in say
    docker run
    ) to detach the process and run in the background (i didn't see it in the docs but wanted to double check)?
    k
    1 reply · 2 participants
  • d

    Daniel Saxton

    09/23/2021, 9:17 PM
    it's easy enough to work around if it's not there in any case
  • f

    Fabrice Toussaint

    09/24/2021, 10:35 AM
    Hey all, Can someone help me explain where this error comes from and how I might resolve it:
    Failed to set task state with error: ClientError([{'message': 'State update failed for task run ID 6aa4d54f-d2a5-41d6-9ba8-27ecffe5e209: provided a running state but associated flow run ecb87527-e50d-4ca4-9ed6-4c2f40ef81fe is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID 6aa4d54f-d2a5-41d6-9ba8-27ecffe5e209: provided a running state but associated flow run ecb87527-e50d-4ca4-9ed6-4c2f40ef81fe is not in a running state.'}}}])
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 91, in call_runner_target_handlers
        state = self.client.set_task_run_state(
      File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 1917, in set_task_run_state
        result = self.graphql(
      File "/opt/conda/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'message': 'State update failed for task run ID 6aa4d54f-d2a5-41d6-9ba8-27ecffe5e209: provided a running state but associated flow run ecb87527-e50d-4ca4-9ed6-4c2f40ef81fe is not in a running state.', 'locations': [{'line': 2, 'column': 5}], 'path': ['set_task_run_states'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'State update failed for task run ID 6aa4d54f-d2a5-41d6-9ba8-27ecffe5e209: provided a running state but associated flow run ecb87527-e50d-4ca4-9ed6-4c2f40ef81fe is not in a running state.'}}}]
    We are using Prefect 0.15.2 and have a Dask cluster up and running. Also this, we are receiving multiple apollo timeouts but on a very random basis:
    Failed to set task state with error: ReadTimeout(ReadTimeoutError("HTTPConnectionPool(host='apollo', port=4200): Read timed out. (read timeout=15)"))
    Traceback (most recent call last):
      File "/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py", line 445, in _make_request
        six.raise_from(e, None)
      File "<string>", line 3, in raise_from
      File "/opt/conda/lib/python3.8/site-packages/urllib3/connectionpool.py", line 440, in _make_request
        httplib_response = conn.getresponse()
      File "/opt/conda/lib/python3.8/http/client.py", line 1348, in getresponse
        response.begin()
      File "/opt/conda/lib/python3.8/http/client.py", line 316, in begin
        version, status, reason = self._read_status()
      File "/opt/conda/lib/python3.8/http/client.py", line 277, in _read_status
        line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
      File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
        return self._sock.recv_into(b)
    socket.timeout: timed out
    k
    3 replies · 2 participants
  • k

    Kevin Weiler

    09/24/2021, 2:14 PM
    Hi there! I’ve written a task (it happens to be a function task, but it doesn’t have to be - could be a class that inherits from Task) that runs a nomad job. It polls for state and throws any exceptions up the stack. If the job fails, prefect is made aware of it and it works fine. I’m trying to understand how to go the other direction. If I cancel a flow run - how can I tell my Nomad job to stop? Is the thing to use here a
    state_handler
    ? It’s sort of surprising to me that, for example, the kubernetes
    RunNamespaceJob
    doesn’t seem to include any logic around this (https://github.com/PrefectHQ/prefect/blob/master/src/prefect/tasks/kubernetes/job.py#L580).
    k
    7 replies · 2 participants
  • l

    Lucas Beck

    09/24/2021, 2:24 PM
    Hi everyone, We are seeing zombie kills for no apparent reason in our flows that spin up k8s resources. They occur more or less 10% of the times: Anyone else has experienced this? It is quite hard to know what is going on here.
    k
    19 replies · 2 participants
  • b

    Bruno Murino

    09/24/2021, 4:24 PM
    Hi everyone — I’m trying to write a query using the prefect API but I not having much success. All our flows have at least 1 parameter called Scope, and I’m trying to see the latest flow_run for each parameter.scope and state combination. In SQL I’d use a window function to get the rank/row_number, partitioned by parameter.scope and state, ordering by the scheduled start date, but not sure I can achieve the same thing with graphql. Can anyone help with this?
    k
    3 replies · 2 participants
  • f

    Francis Addae

    09/24/2021, 4:56 PM
    Hi everyone, I’m trying to run a scheduled task at 11am UTC 7am EST and this was kinda happened to my flow. Any know any remedy to this. This pipeline is very crucial for my workflow. I’m using the CronClock scheduler btw. Any help is gravely appreciated
    k
    15 replies · 2 participants
  • k

    Ken Nguyen

    09/24/2021, 5:42 PM
    Hi! I'm currently setting up Slack notifications and would love to get some advice. For context, I have set up 2 slack channels, 1 for notifying flow runs, and 1 for notifying flow fails (if any). I've set up my state handler this way:
    def post_to_slack(obj, old_state, new_state):
        # Notify failures
        if new_state.is_failed():
            msg = "Flow/Task {0} finished in state {1}".format(obj, new_state)
            secret_slack = fail_channel_webhook
    
            <http://requests.post|requests.post>(secret_slack, json={"text": msg})
    
        # Notify runs
        if new_state.is_running():
            msg = "Flow/Task {0} is running".format(obj)
            secret_slack = running_channel_webhook
    
            <http://requests.post|requests.post>(secret_slack, json={"text": msg})
    
        return new_state
    When I tested it on tasks, I was able to receive notifications for task running, and then task failing. However, when I tested it on flows, it only notified me of flow running, but not failing despite the flow's end result being
    Flow run FAILED: some reference tasks failed.
    What differs between the flow and the task for this to not have worked? Thanks in advance for your suggestions!
    k
    13 replies · 2 participants
  • i

    Irakli Gugushvili

    09/24/2021, 5:47 PM
    Hello Everyone, I wanted to ask if there is a mechanism for Batch Processing in Prefect. I have found Child/Parent flows to simulate that behavior but interested if there are some other approaches.
    k
    4 replies · 2 participants
  • w

    warmwaffles

    09/24/2021, 7:08 PM
    For k8s agents, do I need to use a service account for them?
    m
    t
    13 replies · 3 participants
  • w

    warmwaffles

    09/24/2021, 8:43 PM
    For keeping my staging and production environments separate, is it wiser to create a tenant per environment or is splitting them under projects recommended?
    k
    5 replies · 2 participants
  • w

    warmwaffles

    09/24/2021, 9:57 PM
    Anyone here use prefect tasks with your django environment? Just wondering how that is going to play nice since there is some song and dance you have to do to get scripts to work with django
    k
    3 replies · 2 participants
  • a

    Aaron Ash

    09/25/2021, 12:46 AM
    Is it possible to set priorities for flows with the kubernetes agent? Ideally I'd like to have high priority flows be able to pre-empt and pause lower priority flows if the cluster is maxed out
    k
    2 replies · 2 participants
  • w

    William Grim

    09/25/2021, 3:41 AM
    Hi all! We have a prefect agent that is scheduling flows when we tell it to schedule them, but then they just sit in a "Scheduled" state when we run a command like
    prefect get flow-runs -f myflow
    . We also see a few runs with
    Success
    ,
    Failed
    , and
    Cancelled
    as well. I don't know how someone got a flow to cancel though.
    k
    8 replies · 2 participants
  • a

    Abhishek

    09/25/2021, 2:08 PM
    Hello folks, i am getting an error during a flow run for one of the long running tasks. the error is:
    No heartbeat detected from the remote task; marking the run as failed.
    Other details: • Its running on ECS (agent is ECS) • Flow storage is S3 • In the same Flow other long running tasks did run successfully What can i do to prevent this timeouts? and how to fix it. As this doc suggests Lazarus will restart the failed task. but i need a way to prevent the timeout if possible. Screenhot:
    k
    2 replies · 2 participants
  • g

    Guy Propper

    09/26/2021, 10:56 AM
    Hi, I have a question about running two mapped tasks one after the other. I have the following flow:
    file_paths_to_process_list = extract_files_to_process()
    mapped_parsed_output = transform.map(file_paths_to_process_list)
    load_output_to_db.map(mapped_parsed_output)
    I want map#2 (load_output_to_db) to depend on results from map#1 (transform), but not to wait for all results from map#1. As soon as there is one result from map#1, it should be processed in map#2. What is the correct way to do this? Thanks!
    h
    k
    4 replies · 3 participants
  • h

    Henning Holgersen

    09/26/2021, 11:14 AM
    Is there a way to do retries on entire/scheduled flows ? Retries on tasks work well, but I have had a flow fail because the executor is a remote cluster - so it doesn’t even reach the Task stage. In my case, a coiled cluster failed to spin up for some reason (actually first time that happened).
    k
    4 replies · 2 participants
  • a

    Amar Verma

    09/26/2021, 3:29 PM
    Hi, I am currently exploring options to automate ML model experimentation. Can Prefect be consider as an alternative to MLFlow for the same. Does its capabilities covers that of an MLFlow?. Thanks!!!
    k
    2 replies · 2 participants
  • l

    Lew Dawson

    09/26/2021, 7:37 PM
    Afternoon folks. Wondering if anyone has advice. Struggling to get a simple/common Python pattern working when using
    DockerRun
    run_config with
    Github
    storage. Have the following project structure in the Github repo:
    etl/
    ├── util/
    │   ├── __init__.py
    │   ├── util_vault.py
    ├── __init__.py
    ├── runner.py
    The
    util_vault.py
    looks like:
    from prefect.client import Secret
    
    class UtilVault:
    
        @staticmethod
        def get_secret_simple(name):
            return Secret(name).get()
    The
    runner.py
    looks like:
    from prefect import Flow, task, context
    from prefect.run_configs import DockerRun
    from prefect.storage import GitHub
    
    from util.util_vault import UtilVault
    
    logger = context.get("logger")
    
    
    @task
    def hello_secrets():
        secret = UtilVault.get_secret_simple('GITHUB_ACCESS_TOKEN')
        <http://logger.info|logger.info>('secret = %s', secret)
    
    
    with Flow(name='hello_world') as flow:
        hello_secrets()
    
    flow.storage = GitHub(
        repo='<username>/<repo>',
        path='etl/runner.py',
        access_token_secret='GITHUB_ACCESS_TOKEN',
    )
    flow.run_config = run_config=DockerRun(
        image='<registry-name>/prefect-runner-base-image:1.0.0',
    )
    When I run this task, I get
    ModuleNotFoundError: No module named 'util'
    . Any help would be much appreciated.
    k
    a
    14 replies · 3 participants
  • j

    Joël Luijmes

    09/27/2021, 9:44 AM
    I upgraded my Prefect Server from 0.15.2 -> 0.15.6, but as of the upgrade I can’t run any flows due following error:
    Failed to retrieve task state with error: ClientError([{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
        task_run_info = self.client.get_task_run_info(
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 1798, in get_task_run_info
        result = self.graphql(mutation)  # type: Any
      File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 569, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 5}], 'path': ['get_or_create_task_run_info'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: '}}}]
    I found the following issue: https://github.com/PrefectHQ/prefect/issues/4687, but it says the root issue should be fixed. Note; this happens for all of my flows (of which all are relative small amount of tasks).
    k
    8 replies · 2 participants
  • s

    Sergey

    09/27/2021, 12:45 PM
    Hello. Tell me what is the problem. I want to deploy prefect on linux server (ubuntu). I do everything according to the instructions https://github.com/PrefectHQ/prefect, everything starts well, but when I go to the server address http: // ip: 8080, it says to me that "Safari cannot connect to the server". Doing all the same but locally http: // localhost: 8080, then everything works fine
    d
    k
    +1
    5 replies · 4 participants
  • k

    Kevin Kho

    09/27/2021, 1:52 PM
    Sharing this here
    w
    2 replies · 2 participants
  • s

    Sam Thomas

    09/27/2021, 3:02 PM
    Task 'run_edit_connection_graph_flow[1]': Exception encountered during task execution!
    Traceback (most recent call last):
      File "ENV\lib\site-packages\prefect\engine\task_runner.py", line 859, in get_task_run_state
        value = prefect.utilities.executors.run_task_with_timeout(
      File "ENV\lib\site-packages\prefect\utilities\executors.py", line 454, in run_task_with_timeout
        return task.run(*args, **kwargs)  # type: ignore
      File "MY_DRIVE\tasks.py", line 688, in run_X_flow
        prefect.tasks.prefect.StartFlowRun(
      File "ENV\lib\site-packages\prefect\utilities\tasks.py", line 445, in method
        return run_method(self, *args, **kwargs)
      File "ENV\lib\site-packages\prefect\tasks\prefect\flow_run.py", line 432, in run
        create_link(urlparse(run_link).path)
      File "ENV\lib\site-packages\prefect\artifacts.py", line 52, in create_link
        return _create_task_run_artifact("link", {"link": link})
      File "ENV\lib\site-packages\prefect\artifacts.py", line 28, in _create_task_run_artifact
        return client.create_task_run_artifact(
      File "ENV\lib\site-packages\prefect\client\client.py", line 2155, in create_task_run_artifact
        result = self.graphql(
      File "ENV\lib\site-packages\prefect\client\client.py", line 569, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]', 'locations': [{'line': 2, 'column': 5}], 'path': ['create_task_run_artifact'], 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': '[{\'extensions\': {\'path\': \'$.selectionSet.insert_task_run_artifact.args.objects\', \'code\': \'constraint-violation\'}, \'message\': \'Not-NULL violation. null value in column "tenant_id" violates not-null constraint\'}]'}}}]
    Having issues after upgrading to 0.15.6 using StartFlowRun task. Local executors pointing to a Prefect Server hosted on our infrastructure. The requested flow starts running however the flow that did the requesting fails when trying to create an artefact for the run it just started. I'm surprised the issues is around tenants because my understanding was that they're Cloud things. Any help gratefully recieved.
    k
    5 replies · 2 participants
  • q

    Qin XIA

    09/27/2021, 4:22 PM
    Hello all, I have a
    clean_up_task()
    which is mapped to 180 files. It takes 30 seconds when I run this flow in local without agent and it takes 2 mins when i run it using local agent by UI. What can i do for optimizing this point. Thanks a lot.
    k
    2 replies · 2 participants
Powered by Linen
Title
q

Qin XIA

09/27/2021, 4:22 PM
Hello all, I have a
clean_up_task()
which is mapped to 180 files. It takes 30 seconds when I run this flow in local without agent and it takes 2 mins when i run it using local agent by UI. What can i do for optimizing this point. Thanks a lot.
k

Kevin Kho

09/27/2021, 4:26 PM
Hey @Qin XIA, there will be some latency with each task because each task has to hit the Prefect API for each state change. That means for each task, there will be at least 3 API calls. It would probably be faster if you batch your files together to limit API calls, but then you lose the observability. The API calls are a fixed overhead. For longer tasks, they will be less noticeable.
q

Qin XIA

09/27/2021, 4:30 PM
@Kevin Kho i will try to review my conception. THX
View count: 4