https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • e

    Eden

    11/14/2022, 7:30 AM
    Hi @Prefect I’d like to ask how do I have agent run on GKE? I think I am about to get it work but keeping getting error 😞 [I have read this guides: HERE_FROM_ANNA -> I can use port-forward and execute
    py any_flow.py
    successfully and insert data into BQ]
    The problem I encounter is that failed to run a flow after the deployment.
    Make it easier. I, then, follow the deployment, HERE_FROM_NATE, to create a very simple job. After clicking running a flow for the deployment
    foo
    , it shows error about connection issue.
    07:17:34.363 | DEBUG   | prefect.client - Connecting to API at                                                                                                               │
    │ <http://127.0.0.1:4200/api/>                                                                                                                                                   │
    │ 07:17:35.873 | ERROR   | prefect.engine - Engine execution of flow run                                                                                                       │
    │ 'd533fb3e-d96f-4774-ad53-bac47071f3c5' exited with unexpected exception
    What I can think about is that the agent runs but it should not be
    <http://127.0.0.1:4200/api/>
    for the agents to do the connection. But, 1. how could I set this? FQDN? public endpoint? 2. In where? Deployment? Appreciate any help here, thx 🙇‍♂️🏻
    ✅ 1
    m
    4 replies · 2 participants
  • s

    Steven Wilber

    11/14/2022, 10:10 AM
    Hi, I'm trying to use Prefect to trigger an Airbyte flow, but it fails at the Airbyte health check. I can see in the logs the following:
    httpx.HTTPStatusError: Client error '401 Unauthorized' for url '<http://localhost:8000/api/v1/health/>'
    But I can check that url and it works fine and returns:
    {"available":true}
    Any help is much appreciated. Thanks.
    ✅ 1
    n
    11 replies · 2 participants
  • s

    Stéphan Taljaard

    11/14/2022, 1:28 PM
    Hi I'm working on a barebones example of mapped async HTTP GET requests Would you mind having a look at my code in the thread - any recommendations? I trust the futures.wait() is the correct way to force all mapped task runs to finish before closing the AsyncClient?
    ✅ 2
    r
    n
    7 replies · 3 participants
  • m

    Miremad Aghili

    11/14/2022, 4:15 PM
    Hey guys, Here is a question: Currently we have setup agents on multiple computers. some of the tasks that are running are big and sometimes the computer crashes. Mostly due to RAM issues. When this happens we expect the status of the flow to change to failed or something like that but in reality it stays in the running status for days and does not report it has stopped working. Is there a way to solve this issue? We are running the agents on Windows computers and we use DockerRun for our prefect agents. (this is on prefect 1)
    m
    4 replies · 2 participants
  • c

    Christian Juhl

    11/14/2022, 4:31 PM
    Hi all, For tasks, is it possible to get options from with_options() mapped when using map()? For example, here I want to tag tasks running concurrently with letters from a dict, but when I do it this way, all tasks are tagged with all letters instead of letter to the corresponding number. Thanks!
    from prefect import task, flow
    
    numbers = {
        'a': 1,
        'b': 2,
        'c': 3,
        'd': 4
    }
    
    @task
    def square_number(number):
    
        return number ** 2
    
    @flow
    def my_flow():
    
        squared_numbers = square_number.with_options(tags=numbers.keys()).map(number=numbers.values())
    
        return squared_numbers
    
    if __name__ == '__main__':
    
        output = my_flow()
    m
    7 replies · 2 participants
  • v

    Vishnu Duggirala

    11/14/2022, 4:36 PM
    Hey Y'all, I've been running the flow as a separate ECS task using the ECS infrastructure block; however, we need to provide AWS credentials; is there a way to avoid this and run the task through the block using a custom IAM role created for Prefect?
    k
    2 replies · 2 participants
  • b

    Blake Stefansen

    11/14/2022, 5:22 PM
    Hey y'all We are running into an interesting problem where if our
    flow_run
    names are more than
    63 characters
    , our agent fails to create a deployment run using k8 jobs because it can't create a job label Will post agent log in thread
    ✅ 1
    m
    8 replies · 2 participants
  • a

    Amey Desai

    11/14/2022, 6:09 PM
    Has anybody used Prefect to write data into Salesforce ?
    ✅ 1
    b
    k
    11 replies · 3 participants
  • a

    alex

    11/14/2022, 7:34 PM
    Hello, what is the recommended way to expose logs from threads to the Prefect 1 Cloud UI. I have a logger setup this way
    logger = logging.getLogger("mylogs")
    <http://logger.info|logger.info>("msg")
    + 
    
    PREFECT__LOGGING__EXTRA_LOGGERS="['mylogs']"
    Logs from my main thread are logged but not from any additional threads I am using.
    m
    1 reply · 2 participants
  • m

    Madison Schott

    11/14/2022, 9:51 PM
    This is how I'm running my flow in 1.0. What would be the equivalent in 2.0? I don't see a storage block for Docker but I see Docker blocks in the UI.
    STORAGE = Docker(registry_url='<http://ecr.us-west-2.amazonaws.com/|ecr.us-west-2.amazonaws.com/>',
                     image_name='prefect-flows',
                     dockerfile='dbt_snowflake/DockerFile')
    
    RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'prefect-prod'},
                        env={"PREFECT__LOGGING__LEVEL": "DEBUG"},
                        execution_role_arn='xx',
                        labels=['ecs-agent', 'prod', 'winc'])
    n
    16 replies · 2 participants
  • h

    Heather DeHaven

    11/14/2022, 10:07 PM
    Hi, I'm using 1.0. Is it possible to programmatically get FlowRunViews from flow run names rather than flow run ids? I'm trying to get the state of flow runs using names.
    k
    m
    2 replies · 3 participants
  • d

    Deepanshu Aggarwal

    11/15/2022, 6:12 AM
    the concurrency limit shows 181 active tasks for the tag but when i try to find the tasks by id ( after inspecting this tag ) these tasks are not found.. how do i fix this ?
    k
    8 replies · 2 participants
  • s

    Shruti Hande

    11/15/2022, 6:55 AM
    Getting this error while running flows from prefect cloud:
    11:13:27.904 | ERROR   | Flow run 'pink-pogona' - Crash detected! Request to <prefect cloud url >/task_runs/ failed: Traceback (most recent call last):
    File "/home/<my_username>/<my_venv_name>/lib/python3.8/site-packages/anyio/streams/tls.py", line 108, in _call_sslobject_method
        result = func(*args)
    File "/usr/lib/python3.8/ssl.py", line 944, in do_handshake
        self._sslobj.do_handshake()
    ssl.SSLSyscallError: Some I/O error occurred (_ssl.c:1131)
    #prefect-community #prefect-cloud
  • a

    Andreas Nigg

    11/15/2022, 6:57 AM
    Hey folks. Question for prefect 2.0 cloud: I recently (starting with 12th of November) got some individual flow runs in state "late" - however I did not add or change pipelines. They were picked up reasonably fast after entering late, but I'm wondering, is there anything I can do to debug why they were late? Alternative question: Is there a setting I can adjust which determines when a flow counts as "late"? My work queues are all with "concurrency: unlimited" and no task concurrency is used.
    🙌 1
    m
    1 reply · 2 participants
  • a

    Andreas Nigg

    11/15/2022, 6:59 AM
    And an additional educational question 🤓 If I use a a block for infrastructure (like kubernetes-job) - then execute ``prefect deployment build`` using this block - does the deployment somehow have a link to this block? Meaning, if I change something in the block after creation of a deployment, is this reflected in next flow run of this deployment?
    ✅ 1
    p
    1 reply · 2 participants
  • v

    Vadym Dytyniak

    11/15/2022, 8:53 AM
    Hi. How to call sync task in async flow?(Prefect 2)
    ✅ 1
    m
    j
    +1
    7 replies · 4 participants
  • m

    max

    11/15/2022, 10:32 AM
    Hey! Does prefect have python method for this command
    prefect work-queue set-concurrency-limit
    ?
    ✅ 1
    p
    4 replies · 2 participants
  • s

    Simon Macklin

    11/15/2022, 11:25 AM
    hey Prefect. Do you have any plans on building a terraform provider or any IAC tool to manage prefect v2?
    a
    1 reply · 2 participants
  • f

    FuETL

    11/15/2022, 3:17 PM
    Hey guys if i have a flow scheduled to run and new version is registered the flow will be archived right?
    ✅ 1
    b
    k
    5 replies · 3 participants
  • b

    Blake Hamm

    11/15/2022, 3:26 PM
    Hey Prefect team! Is there a way to transfer ownership of a workspace?
    c
    1 reply · 2 participants
  • r

    redsquare

    11/15/2022, 3:48 PM
    If I have got envFrom : [] in my k8 job template should using infra_overrides on a deploy override it - as I cannot get it to work
    k
    5 replies · 2 participants
  • k

    Kalise Richmond

    11/15/2022, 4:47 PM
    Come watch our amazing @Taylor Curran Getting Started Demo
    :blob-attention-gif: 1
    🎆 2
    :marvin: 1
  • v

    vholmer

    11/15/2022, 5:00 PM
    Hello! I have a flow that runs 10 tasks using a ray task runner. After these tasks all individually finish, the flow crashes with the following error:
    Crash detected! Request to <https://api.prefect.cloud/api/accounts/CENSORED/workspaces/CENSORED/flow_runs/CENSORED/set_state> failed: Traceback (most recent call last):
      File "/usr/local/lib/python3.8/site-packages/httpcore/backends/asyncio.py", line 33, in read
        return await self._stream.receive(max_bytes=max_bytes)
      File "/usr/local/lib/python3.8/site-packages/anyio/streams/tls.py", line 195, in receive
        data = await self._call_sslobject_method(self._ssl_object.read, max_bytes)
      File "/usr/local/lib/python3.8/site-packages/anyio/streams/tls.py", line 137, in _call_sslobject_method
        data = await self.transport_stream.receive()
      File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
        await self._protocol.read_event.wait()
      File "/usr/local/lib/python3.8/asyncio/locks.py", line 309, in wait
        await fut
    asyncio.exceptions.CancelledError
    The URL points to the flow's own id. What's going on here? Did anyone else experience this?
    k
    5 replies · 2 participants
  • l

    Luca Schneider

    11/15/2022, 5:21 PM
    Hi all, I’m trying to run prefect flows in a restricted/corporate K8s. I cannot rely on KubernetesJob as it triggers new jobs and ressource creations is blocked outside of prod deployment. Is there a way to run flows inside of a pod waiting to be triggered by an agent. Would https://docs.prefect.io/api-ref/prefect/infrastructure/#prefect.infrastructure.process be the solution ? thanks
    r
    4 replies · 2 participants
  • s

    Sowmiya Anand

    11/15/2022, 5:48 PM
    I am using prefect 2.6.6 local server:I am using run_deployment() to trigger flow_run from deployment A and B. Second flow should start once the first is finished. whenever I trigger the flow its failing with below error. Can someone please help me with this.Thanks. Encountered exception during execution: Traceback (most recent call last): File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/client/orion.py", line 1351, in read_deployment_by_name response = await self._client.get(f"/deployments/name/{name}") File "/home/x/anaconda3/lib/python3.9/site-packages/httpx/_client.py", line 1751, in get return await self.request( File "/home/x/anaconda3/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request return await self.send(request, auth=auth, follow_redirects=follow_redirects) File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/client/base.py", line 182, in send response.raise_for_status() File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/client/base.py", line 125, in raise_for_status raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.cause prefect.exceptions.PrefectHTTPStatusError: Client error '404 Not Found' for url 'https://api.prefect.cloud/api/accounts/e25149ff-b6ff-4b7b-8d35-15fedaa1a20b/workspaces/fb6e8c45-2229-4aaf-8c98-9c03a6f2ae86/deployments/name/db2_replace' Response: {'detail': 'Not Found'} For more information check: https://httpstatuses.com/404 The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/engine.py", line 610, in orchestrate_flow_run result = await run_sync(flow_call) File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "/home/x/anaconda3/lib/python3.9/site-packages/anyio/to_thread.py", line 28, in run_sync return await get_asynclib().run_sync_in_worker_thread(func, *args, cancellable=cancellable, File "/home/x/anaconda3/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 818, in run_sync_in_worker_thread return await future File "/home/x/anaconda3/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 754, in run result = context.run(func, *args) File "/home/x/Desktop/mlp-prefect/mlp_prefect/flows/standard/pipeline/mainflow_db2_etl.py", line 13, in mainflow db2_load_flow_run = run_deployment(name="db2_replace") File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 197, in coroutine_wrapper return run_async_from_worker_thread(async_fn, *args, **kwargs) File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread return anyio.from_thread.run(call) File "/home/x/anaconda3/lib/python3.9/site-packages/anyio/from_thread.py", line 35, in run return asynclib.run_async_from_thread(func, *args) File "/home/x/anaconda3/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 847, in run_async_from_thread return f.result() File "/home/x/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 446, in result return self.__get_result() File "/home/x/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result raise self._exception File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client return await fn(*args, **kwargs) File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/deployments.py", line 78, in run_deployment deployment = await client.read_deployment_by_name(name) File "/home/x/anaconda3/lib/python3.9/site-packages/prefect/client/orion.py", line 1354, in read_deployment_by_name raise prefect.exceptions.ObjectNotFound(http_exc=e) from e prefect.exceptions.ObjectNotFound
    n
    m
    8 replies · 3 participants
  • m

    Madison Schott

    11/15/2022, 5:55 PM
    Hi, I am getting this error importing fivetran's tasks:
    from prefect_fivetran.connectors import fivetran_sync_flow
    ImportError: cannot import name 'fivetran_sync_flow' from 'prefect_fivetran.connectors' (/usr/local/lib/python3.9/site-packages/prefect_fivetran/connectors.py)
    n
    14 replies · 2 participants
  • a

    Amey Desai

    11/15/2022, 6:40 PM
    Hello Prefect community, I was curious to get people's opinion about Fivetran vs Airbyte vs Stitch or any other connectors. We want to be able to pull data from Salesforce, DWH (Looker, Snowflake, BQ, Redshift), Pendo, Heap and dump into BQ (our DWH). Read a bunch of blog posts, but was hoping to hear more personal experiences.
    👀 1
    h
    8 replies · 2 participants
  • s

    Sam Garvis

    11/15/2022, 6:50 PM
    Has anyone figured out a stable way to run a flow constantly? We have tried using
    while true:
    in the flow, but that has always shut down in the past after a few hours on 2.0
    a
    1 reply · 2 participants
  • n

    Nathaniel Russell

    11/15/2022, 8:04 PM
    If I have a flow running that grabs all other flow runs with read_flow_runs, and it picks out the flow run that it is supposed to follow. How do I make it wait until the other flow is done before continuing? Can I do this without a busy wait?
    If this isn't clear:
    
    flow run A is running
    flow run A has list of info on flow runs B -> G
    flow run A picks out flow run D
    I want flow run A to wait until flow run D is done before continuing, how do I do this step?
    m
    1 reply · 2 participants
  • j

    jrose

    11/15/2022, 8:33 PM
    Hey all, I was wondering if anyone has run into the following error while trying to run a prefect flow (version 1) using the pandas package
    Segmentation fault (core dumped)
    . I have run this using different version of pandas (1.5.1 and 1.3.5) and still get the same error. I am able to import pandas and run a command such as
    pd.show_versions()
    without issues but whenever I try to run anything that would create a dataframe I get the segfault error. Including my currently installed versions and the flow I am trying to run below. Thanks in advance!
    ✅ 1
    b
    8 replies · 2 participants
Powered by Linen
Title
j

jrose

11/15/2022, 8:33 PM
Hey all, I was wondering if anyone has run into the following error while trying to run a prefect flow (version 1) using the pandas package
Segmentation fault (core dumped)
. I have run this using different version of pandas (1.5.1 and 1.3.5) and still get the same error. I am able to import pandas and run a command such as
pd.show_versions()
without issues but whenever I try to run anything that would create a dataframe I get the segfault error. Including my currently installed versions and the flow I am trying to run below. Thanks in advance!
✅ 1
INSTALLED VERSIONS
------------------
commit : 66e3805b8cabe977f40c05259cc3fcf7ead5687d
python : 3.8.10.final.0
python-bits : 64
OS : Linux
OS-release : 4.14.294-220.533.amzn2.x86_64
Version : #1 SMP Thu Sep 29 01:01:23 UTC 2022
machine : x86_64
processor : x86_64
byteorder : little
LC_ALL : None
LANG : None
LOCALE : en_US.UTF-8
pandas : 1.3.5
numpy : 1.23.4
pytz : 2022.6
dateutil : 2.8.2
pip : 20.0.2
setuptools : 45.2.0
Cython : None
pytest : 7.2.0
hypothesis : None
sphinx : None
blosc : None
feather : None
xlsxwriter : None
lxml.etree : 4.9.1
html5lib : None
pymysql : None
psycopg2 : 2.9.5 (dt dec pq3 ext lo64)
jinja2 : 3.1.2
IPython : None
pandas_datareader: None
bs4 : None
bottleneck : None
fsspec : 2022.11.0
fastparquet : None
gcsfs : None
matplotlib : None
numexpr : None
odfpy : None
openpyxl : None
pandas_gbq : None
pyarrow : 10.0.0
pyxlsb : None
s3fs : None
scipy : None
sqlalchemy : None
tables : None
tabulate : 0.9.0
xarray : None
xlrd : None
xlwt : None
numba : None
import os
import prefect
import sys
import pandas as pd
from prefect import task, Flow
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect.run_configs import LocalRun

# create logger
logger = prefect.utilities.logging.get_logger()

@task
def say_hello():
    print(pd.show_versions())
    data = {'col_1': [3, 2, 1, 0], 'col_2': ['a', 'b', 'c', 'd']}
    df = pd.DataFrame.from_dict(data)
    <http://logger.info|logger.info>(df.head())
    <http://logger.info|logger.info>("Hello, Cloud!")



flow = Flow("jrose_flow", tasks=[say_hello])

kwargs = {}
kwargs["cluster"] = f"arn:aws:ecs:us-west-2:XXXXXXXXXXX:cluster/prefect-agent-dev"

flow.run_config = ECSRun(task_role_arn="arn:aws:iam::XXXXXXXXXXX:role/prefect-dev-rpt-services-role",
                         execution_role_arn="arn:aws:iam::XXXXXXXXXXX:role/prefect-dev-rpt-services-role",
                         task_definition_arn="arn:aws:ecs:us-west-2:XXXXXXXXXXX:task-definition/rn-rpt-services-dev",
                         run_task_kwargs = kwargs)

flow.storage = S3(bucket="bucket_name", key="prefecttest/rpt/jrose_flow.py",
                  stored_as_script=False)
b

Bianca Hoch

11/17/2022, 10:08 PM
Hi Jeff! I had to do a bit of poking around the internet to see where this error could come from. It didn't strike me as a prefect-related error at first. So far what i've been able to gather is the
Segmentation fault
error arises when your system is trying to access memory that it does not have access to, or memory that doesn't exist.
I was reading through this webpage, maybe there is something here that could help?
j

jrose

11/17/2022, 10:16 PM
Thanks! I think this might be due to the pandas version I have been using
✨ 1
It appears to be working when I switch to a different version
b

Bianca Hoch

11/17/2022, 10:18 PM
Sweet! Thanks for sharing that here. If someone else runs into the same issue they'll certainly appreciate this thread.
j

jrose

11/18/2022, 2:09 PM
Just an FYI, I switched from pandas 1.5.1 -> 1.3.5 and that fixed it (while using Python 3.8.10)
View count: 3