https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • a

    Arthur Jacquemart

    06/01/2022, 11:47 AM
    Hi Prefect community, Can we have somehow one agent interacting with two different node pools in a k8s cluster? Or do we need to deploy a second agent if we want to have different node configuration?
    a
    • 2
    • 12
  • c

    Camilo Fernandez

    06/01/2022, 1:02 PM
    Hi, how should I pass env variables for authentication in AWS S3 bucket to a job created using a custom template? I'm deploying using Terraform with Helm in a EKS Cluster
    a
    • 2
    • 5
  • m

    Marcin Grzybowski

    06/01/2022, 2:00 PM
    Is there a way to see Flow name on radar? I only see those generated names "eminent-wolf" or something. Can I see parameters passed to flow/task ?
    a
    m
    • 3
    • 13
  • m

    Matthew Seligson

    06/01/2022, 2:29 PM
    Is there way to pause flow execution? If there are running tasks, let them finish, and ensure that downstream tasks do not run.
    k
    • 2
    • 7
  • r

    Robert Esteves

    06/01/2022, 3:03 PM
    Hi, How do I display the individual flow names within a Flow-of-Flows in the Prefect Cloud ? I read the documentation https://docs.prefect.io/core/idioms/flow-to-flow.html . But, when I create and register my flow-of-flows in the Prefect cloud, the only task.names that are displayed are “create_flow_run” and “wait_for_flow_run” . What am I doing wrong? Here is my code: with Flow(name="Test-Orchestration-Pipeline") as f: Flow_A = create_flow_run(flow_name="Export Data Flow A", project_name="Data Export", run_name="Export_Flow_A") wait_for_flow_A = wait_for_flow_run(Flow_A, raise_final_state=True) Flow_B = create_flow_run(flow_name="Export Data Flow B", project_name="Data Export", run_name="Export_Flow_B") wait_for_flow_B = wait_for_flow_run(Flow_B, raise_final_state=True) Flow_B.set_upstream(wait_for_flow_A) Prefect Cloud
    a
    • 2
    • 2
  • p

    Patrick Tan

    06/01/2022, 3:48 PM
    Hi, I am storing flow in S3. Setting credential with environment variable PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS prior to running python code works, but it does not work if I set the environment variable inside python program like this.
    k
    • 2
    • 5
  • d

    Dekel R

    06/01/2022, 4:07 PM
    Hey, Im getting this error when I run a new flow on Prefect cloud.
    Unexpected error: TypeError("cannot pickle '_thread.lock' object")
    I used is_serializable before registering the flow and got True. Any ideas on how to tackle this? Thx
    k
    w
    • 3
    • 5
  • f

    Frederick Thomas

    06/01/2022, 5:32 PM
    Hi all! We're using Prefect version 0.14.17 and have an issue with running a flow from within another in a python script. I used the following from the docs:
    from prefect import Flow
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    
    with Flow("parent-flow") as flow:
      # Create the child flow run, look up the flow by name
      child_run_id = create_flow_run(flow_name="gotta-run-flow")
    and got this:
    ImportError: cannot import name 'create_flow_run' from 'prefect.tasks.prefect' (/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/__init__.py)
    Could someone please assist? Thanks!
    k
    m
    • 3
    • 5
  • j

    Jai P

    06/01/2022, 6:01 PM
    hi folks! i'm running into an odd issue with
    prefect 2.0
    and
    pydantic
    where it looks like extra attributes are being ignored? details in thread
    k
    m
    • 3
    • 9
  • s

    Sushma Adari

    06/01/2022, 6:39 PM
    Hello All! I am using prefect to run a Python script and I ran into an issue around a specific task that is querying a database and saving the results into the dataframe. Please see the error below. The query takes some tiem to execute ~40 seconds but is not particularly large only around 200MB. Any guidance would be helpful. Thanks!
    No heartbeat detected from the remote task; marking the run as failed.
    k
    • 2
    • 23
  • c

    Chris Reuter

    06/01/2022, 6:50 PM
    PrefectLive is starting in 10 minutes with @Anna Geller! https://prefect-community.slack.com/archives/C036FRC4KMW/p1654090047453119
  • p

    Patrick Tan

    06/01/2022, 8:05 PM
    Hi, I registered flow to S3 storage using AWS SSO credentials.
    with Flow("LiveLots-ETL-Parent", storage=S3(bucket=config_bucket,
                                                client_options={
                                                     "aws_access_key_id":aws_access_key_id,
                                                     "aws_secret_access_key":aws_secret_access_key,
                                                     "aws_session_token":aws_session_token},)) as f:
    Local agent is running on container as ECS task. The ECS task is attached with permission to access all S3 bucket.
    {
      "Statement": [
        {
          "Action": [
            "s3:*"
          ],
          "Effect": "Allow",
          "Resource": "arn:aws:s3:::*"
        },
        {
          "Action": [
            "s3:GetObject",
            "s3:ListBucket"
          ],
          "Effect": "Allow",
          "Resource": [
            "arn:aws:s3:::wp-livelots-pt/*",
            "arn:aws:s3:::wp-live-lots/*"
          ]
        }
      ],
      "Version": "2012-10-17"
    }
    Initially flow is running without issue. After one hour, I am getting
    Failed to load and execute flow run: ClientError('An error occurred (ExpiredToken) when calling the GetObject operation: The provided token has expired.')
    Looks like ECS task is using the credentials when I registered flow. Please help
    k
    a
    • 3
    • 7
  • f

    FuETL

    06/01/2022, 8:45 PM
    Hey guys i'm run my stack on ECS, but when i have a bunch of flows running in parallel sometimes i start get timeout for apollo service/hasura that lead the flow to be considered a zombie task and get killed/fail, what i can do to reduce this ocurrence?
    k
    a
    • 3
    • 6
  • d

    Dylan

    06/01/2022, 9:33 PM
    Hey why would registering a flow cause the code in the flow to run?
    a
    k
    • 3
    • 27
  • d

    DK

    06/01/2022, 9:41 PM
    I have an issue with the .prefect/results folder taking up a large amount of space and it's causing issues on our server. Can these results be safely removed? Is there a way to automatically clean up these files? I am using local storage. Thanks for the help!
    a
    • 2
    • 2
  • r

    Rainer Schülke

    06/02/2022, 9:59 AM
    Good morning 🙂 is there an issue with the GitHub storage? All the flows are failing now since the repo can't be found but it is still in the same location. The token shouldn't have expired.
    a
    • 2
    • 4
  • e

    Einar Ellingsen

    06/02/2022, 12:23 PM
    Hi everyone! We use the EmailTask in prefect.tasks.notifications to send mail notifications from our Prefect flows via smtp-mail.outlook.com. Unfortunately, this class utilizes basic authetication implementation. Due to deprecation of basic authentication in Exchange Online, we now need to switch to modern authentication for all of our services. Does anyone know if Prefect has implemented this or has plans to do so? Otherwise, has anyone else implemented this using other libraries eg. from Microsoft? Thank you in advance. Can this be fixed by using smtp_type = SSL? Do we have to deal with certificates in order to switch to SSL? Extract of the code we use:
    mail_settings = {
        "mail_title": "Title",
        "mail_recipient": "<mailto:recipient.1@enova.no|recipient.1@enova.no>,recipient.2@enova.no",
        "mail_sender": "<mailto:sender@enova.no|sender@enova.no>",
        "mail_smtp_server": "<http://smtp-mail.outlook.com|smtp-mail.outlook.com>",
        "mail_smtp_port": 587,
        "mail_smtp_type": "STARTTLS",
    }
    
    with case(new_file, True):
            email_sent = email(
                subject=mail_settings["mail_title"],
                email_to=mail_settings["mail_recipient"],
                email_from=mail_settings["mail_sender"],
                smtp_server=mail_settings["mail_smtp_server"],
                smtp_port=mail_settings["mail_smtp_port"],
                smtp_type=mail_settings["mail_smtp_type"],
                msg_plain=email_body,
                attachments=[most_recent_file["local_path"]],
            )
    ✅ 1
    a
    j
    • 3
    • 7
  • f

    Florian Guily

    06/02/2022, 12:38 PM
    Hey, any idea if the map function will be out during beta stage ?
    a
    • 2
    • 2
  • r

    Rio McMahon

    06/02/2022, 3:47 PM
    Hello - I am running into an issue where docker containers built on M1 Mac’s vs Intel Mac’s perform differently; the M1 Mac containers will get stuck in a “scheduled” state. Has this problem cropped up before?
    k
    a
    • 3
    • 4
  • c

    Connor Skennerton

    06/02/2022, 4:14 PM
    Is anyone else experiencing weird css on prefect cloud? When I looked this morning I got this... It looks blank but the text is there, it's just the same color as the background
    n
    • 2
    • 3
  • d

    Dharit Sura

    06/02/2022, 4:22 PM
    Hi Prefect Team, I am using Linux System and Prefect v2. I want to confirm a few things regarding deployment. I see that when I run the following command - 'prefect deployment create XYZ.py' I noticed that it is also running the flow and task. Is this expected? I assumed that deployment create would only create the deployment or update it. This happens even though my cron schedule setting is in the future.
    ✅ 1
    k
    • 2
    • 4
  • j

    John Mizerany

    06/02/2022, 4:23 PM
    Hi all, using the
    S3Upload
    module in prefect is it possible to specify the content-type?
    k
    • 2
    • 2
  • c

    Connor Skennerton

    06/02/2022, 4:31 PM
    One of my flows has the following error in prefect cloud. Any ideas what it might be from?
    Failed to retrieve task state with error: ClientError([{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}])
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/prefect/engine/cloud/task_runner.py", line 154, in initialize_run
        task_run_info = self.client.get_task_run_info(
      File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 1479, in get_task_run_info
        result = self.graphql(mutation)  # type: Any
      File "/usr/local/lib/python3.9/site-packages/prefect/client/client.py", line 473, in graphql
        raise ClientError(result["errors"])
    prefect.exceptions.ClientError: [{'path': ['get_or_create_task_run_info'], 'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'extensions': {'code': 'INTERNAL_SERVER_ERROR', 'exception': {'message': 'Expected type UUID!, found ""; Could not parse UUID: ', 'locations': [{'line': 2, 'column': 101}], 'path': None}}}]
    k
    • 2
    • 2
  • m

    Marcin Grzybowski

    06/02/2022, 4:35 PM
    Hi, maybe someone knows why it does not work for me. I'm trying to run simple flow (one flow with print) locally in manually built docker container and I get:
    root@3b776c4684ea:/prefect/jobs# python3 simple.py
    Traceback (most recent call last):
      File "/usr/local/lib/python3.10/site-packages/asgi_lifespan/_concurrency/asyncio.py", line 17, in wait
        await self._event.wait()
      File "/usr/local/lib/python3.10/asyncio/locks.py", line 214, in wait
        await fut
    asyncio.exceptions.CancelledError
    
    During handling of the above exception, another exception occurred:
    k
    • 2
    • 10
  • j

    Jessica Smith

    06/02/2022, 4:53 PM
    Is there any known issue with removing parameters from a Flow and Prefect Cloud still showing the removed Parameter?
    k
    • 2
    • 14
  • o

    Octopus

    06/02/2022, 6:22 PM
    Hi, i’m developping flows in local mode, with docker containers I would like to know if there is any way to persist project and flows. My problem is the following: I create a Project, then a register my flows. Then when I restart docker all is gone, no project neither flows. I would like to know how the project persistence works in local mode. I found that flows are stored in filesystem by default at the following path ~/.prefect so why my project keep removed ? Thx for reading
    k
    • 2
    • 3
  • j

    Jessica Smith

    06/02/2022, 6:32 PM
    I'm getting a weird issue when querying flow runs in the GraphQL api.
    {
      "errors": [
        {
          "path": [
            "flow_run",
            0,
            "id"
          ],
          "message": "Cannot return null for non-nullable field flow_run.id.",
          "extensions": {
            "code": "INTERNAL_SERVER_ERROR"
          }
        }
      ],
      "data": null
    }
    I don't know how these values can be null if they...can't be null.
    k
    • 2
    • 10
  • p

    Patrick Tan

    06/02/2022, 6:33 PM
    Hi, Prefect stores all these information about flow run and shows nicely with UI like below, is the a way to download run data so we can perform analysis. Eg, average duration of task..etc
    j
    k
    • 3
    • 6
  • m

    Matt Alhonte

    06/02/2022, 11:54 PM
    Is there a way to have Flows run as a different Linux user (as in, not
    root
    ?)
    k
    a
    • 3
    • 11
  • w

    wonsun

    06/03/2022, 1:31 AM
    Hi.. I got this error `ValueError: A
    path
    must be provided to show where flow
    .py
    file is stored.` My flow code like this!
    with Flow('flow_step1', storage=Local()) as flow:
        engine = create_engine('<mysql+pymysql://wonsun.jeong>:~~~~~~~~~~~~~~~', echo=True)
        connection = engine.connect()
        
        before = check_db()
        after = check_storage()
        
        answer = coin_inspection(after, before)
        
        with case(answer, False):
            new_datas = action_if_false()
            raw_extract = each_file_info.map(new_datas)
            final = each_ecg_meta.map(raw_extract)
            
        with case(answer, True):
            action_if_true()
               
    
    flow.storage = Local(stored_as_script=True)
    flow.register(project_name='try')
    flow.run() 
    # flow.visualize()
    This python file name is 'flow_step1.py'. What's the problem?? Also, most weird thing is window(local) already run 'flow_step1' and create result files but prefect web UI can't detect this flow! I'm so confused because i don't understand exactly Flow, Register, Agent, Server, Local, Path.. in Prefect. Help me please.. 🤯
    k
    • 2
    • 4
Powered by Linen
Title
w

wonsun

06/03/2022, 1:31 AM
Hi.. I got this error `ValueError: A
path
must be provided to show where flow
.py
file is stored.` My flow code like this!
with Flow('flow_step1', storage=Local()) as flow:
    engine = create_engine('<mysql+pymysql://wonsun.jeong>:~~~~~~~~~~~~~~~', echo=True)
    connection = engine.connect()
    
    before = check_db()
    after = check_storage()
    
    answer = coin_inspection(after, before)
    
    with case(answer, False):
        new_datas = action_if_false()
        raw_extract = each_file_info.map(new_datas)
        final = each_ecg_meta.map(raw_extract)
        
    with case(answer, True):
        action_if_true()
           

flow.storage = Local(stored_as_script=True)
flow.register(project_name='try')
flow.run() 
# flow.visualize()
This python file name is 'flow_step1.py'. What's the problem?? Also, most weird thing is window(local) already run 'flow_step1' and create result files but prefect web UI can't detect this flow! I'm so confused because i don't understand exactly Flow, Register, Agent, Server, Local, Path.. in Prefect. Help me please.. 🤯
k

Kevin Kho

06/03/2022, 1:50 AM
If using
stored_as_script=True
, you need to point to where the file lives with
path
.
flow.storage=Local(path=...,stored_as_script=True
❤️ 1
w

wonsun

06/03/2022, 1:59 AM
The file you mentioned is the python code for define flow? In this case write this.
flow.storage=Local(path='C:\\Users\\user\\.prefect',stored_as_script=True)
Right?
k

Kevin Kho

06/03/2022, 2:11 AM
Yep the doc string will have more detail
w

wonsun

06/03/2022, 5:37 AM
Thanks Kevin! Don't panic..😂
🙌 1
View count: 5