https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • j

    Joshua Grant

    12/29/2022, 5:18 PM
    Hello you wonderful group of people! Is there a best-practice for producing a mermaid (or other diagram) for a flow that shows relationships and what the expected task result format would be?
    c
    2 replies · 2 participants
  • m

    Michał

    12/29/2022, 7:20 PM
    How to achive starting 201 subflows for ETL? Something like mapped flows
    from prefect import flow 
    
    @task
    def get_pages() -> List[int]:
    	# Get pages for subflows
    	...
    
    @task
    def extract(page: int) -> List[Dict[str, Any]]:
    	...
    
    @task
    def load(records: List[Dict[str, Any]])
    	...
    
    @flow 
    def subflow_etl(page: int): 
    	"""
    	Extracts data from single page, and inserts it into DB
    	"""
    	data = extract(page)
    	load(data)
    	
    @flow 
    def main_flow(): 
    	pages = get_pages() # returns [1, 2, 3, 4, ..., 200, 201]
    	# How now starts 201 subflows_etl?
    	# I have several agents which would like to consume subflow_etl
    	# And process it on several machines.
    	# Psuedo code
    	for page in pages:
    		subflow_etl(page)
    	
    	
    if __name__ == "__main__": 
    	main_flow()
    ✅ 1
    n
    2 replies · 2 participants
  • p

    Paco Ibañez

    12/29/2022, 8:18 PM
    Hello again! I have an orchestrator flow that calls two other flows using run_deployment. The first flow returns a pandas dataframe that is persisted to an Azure block. What would be the most efficient way to pass the first flow's result to the second flow? I was looking at this but it looks like it would require a lot of serialization/deserialization the main flow is something like this
    run_deployment("transformation_1", ...) # returns a df result that is persisted to azure storage
    run_deployment("transformation_2", ...) # needs the dataframe returned by `transformation_1` as input
    ✅ 1
    m
    6 replies · 2 participants
  • k

    kiran

    12/30/2022, 12:20 AM
    Does anyone know how to update the owner name and email on Cloud 2?
    c
    3 replies · 2 participants
  • n

    Naila Chennit

    12/30/2022, 9:38 AM
    Hello Prefect team! i am trying to handle timout for tasks, and was wondering how can I force a task to stop after reaching the timeout. FYI I am using prefect 2.3.1
  • j

    Joshua Greenhalgh

    12/30/2022, 9:59 AM
    Hi wonder if anyone can help - I have a flow (prefect v1 on cloud) that seems to have started, run some of its tasks and then started again (logs in the thread) - when it starts again a task input seems to be
    None
    even though the previous task producing that input as it's output (which ran before the flow seems to have started again) ran successfully - I wonder if I am running into some problems with checkpointing and not using Results? I would really like the flow to just fail rather than trying to restart itself?
    c
    8 replies · 2 participants
  • n

    Naila Chennit

    12/30/2022, 10:01 AM
    Hi Prefect team I am trying to handle tasks timout, and was wondering how can I stop/cancel a task if the timout is reached FYI: I am using Prefect 2.3.1 Here is an exemple of what I want to do
    Wait N sconds for the task to complete
    
            >>> @flow
            >>> def my_flow():
            >>>     future = my_task()
            >>>     final_state = future.wait(0.1)
            >>>     if final_state:
            >>>         ... # Task done
            >>>     else:
            >>>         ... # Task not done yet = > Cancel task
    ✅ 1
    c
    4 replies · 2 participants
  • e

    Edmondo Porcu

    12/30/2022, 4:36 PM
    How does authentication work on Orion running on premise?
    c
    p
    10 replies · 3 participants
  • y

    YD

    12/30/2022, 4:41 PM
    Issues with deploying flows in Prefect 2.0 When running
    prefect deployment build <file name>.py:r<flow name> -q local  --name <deployment name> --path ~/<deployment path> --skip-upload
    in the
    yaml
    file, the
    entrypoint:
    uses a relative path, which does not work when running the deployment from the cloud dashboard. When I change it to a full path, it works, but I think that should not be the behavior, particularly since the
    entrypoint
    is below the
    ### DO NOT EDIT BELOW THIS LINE
    b
    14 replies · 2 participants
  • c

    Chris Gunderson

    12/30/2022, 7:55 PM
    Hi Prefect Team - Can we have Prefect 1 and Prefect 2 on the same Linux box? Are we able to just update the name of the systemd service? It doesn't need to be prefect-agent.service, right? It could be prefect2-agent.service, right? https://discourse.prefect.io/t/how-to-run-a-prefect-2-agent-as-a-systemd-service-on-linux/1450
    ✅ 1
    n
    13 replies · 2 participants
  • d

    David Steiner Sand

    12/30/2022, 8:35 PM
    Hello Prefect Team, I’m deploying a flow with the Docker infra using the
    docker:dind
    (docker -in-docker) image. The ideia is to run containers inside the flow using the
    docker
    sdk. Using the
    /var/run/docker.sock
    volume is not an alternative for my use case. It seems that the only thing stopping me at this point is that I cannot set the
    privileged
    flag in the
    prefect.infrastructure.DockerContainer
    abstraction. As shown in the docker-compose below, it is required for running the
    docker:dind
    container.
    services:
      dind:
        image: docker:dind
        privileged: true
    Could this option be added to Prefect? Happy new year for everyone 😄
    ✅ 1
    m
    4 replies · 2 participants
  • e

    Edmondo Porcu

    12/31/2022, 1:32 AM
    Hello, has anyone tried to install Prefect via the Helm Chart on Kind? There’s seems to be a problem with PostgresSQL volume provisioning
    /bin/mkdir: cannot create directory '/bitnami/postgresql/data': No space left on device
    ❯ kubectl logs -f prefect-orion-d3djwrhonj-postgresql-0
    postgresql 01:31:28.10 
    postgresql 01:31:28.13 Welcome to the Bitnami postgresql container
    postgresql 01:31:28.15 Subscribe to project updates by watching <https://github.com/bitnami/bitnami-docker-postgresql>
    postgresql 01:31:28.18 Submit issues and feature requests at <https://github.com/bitnami/bitnami-docker-postgresql/issues>
    postgresql 01:31:28.20 
    postgresql 01:31:28.38 INFO  ==> ** Starting PostgreSQL setup **
    postgresql 01:31:28.57 INFO  ==> Validating settings in POSTGRESQL_* env vars..
    postgresql 01:31:28.67 INFO  ==> Loading custom pre-init scripts...
    postgresql 01:31:28.73 INFO  ==> Initializing PostgreSQL database...
    /bin/mkdir: cannot create directory '/bitnami/postgresql/data': No space left on device
    ✅ 1
    👍 1
  • a

    Andrei Tulbure

    12/31/2022, 10:37 AM
    Hi. Is there any guidelines on ECSTask deployments? What interests me is how many concurrent flows we can have and why sometimes I get the "Submission failed. botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family." error. Thanks! Happy new year!
    b
    2 replies · 2 participants
  • m

    Mateo Merlo

    01/01/2023, 8:29 PM
    prefect 2.7.4 prefect-aws 0.2.0 s3fs 2022.8.2
    r
    6 replies · 2 participants
  • j

    Jeff Hale

    01/03/2023, 3:25 AM
    Hi Jons. See my response in your thread.
  • m

    Michał

    01/03/2023, 8:11 AM
    Hey, my UI is empty. This is the only html (moved into comment) I have in browser, or when I download it via curl. I tested both localhost, host, and IP of machine I'm hosting prefect. Even when entering inside docker container curl result is same. API (/api/ and docs, are working, I successfully created deployment by CLI, and Agent successfully connected to to server. I used https://github.com/fraibacas/prefect-orion instructions
    a
    r
    12 replies · 3 participants
  • t

    Tim-Oliver

    01/03/2023, 2:34 PM
    Hi all, I just got a flow run which completed all tasks and crashed with:
    Crash detected! Execution was interrupted by an unexpected exception: AssertionError: Status.running
    k
    1 reply · 2 participants
  • m

    Mike O'Connor

    01/03/2023, 2:55 PM
    In Prefect 1, we could send custom notifications from code. I see in Prefect 2 there is emerging support for notifications for specific applications. Will we be able to customize this? Or more to the point, are there any plans for Sentry support? I specifically want to be able to capture any errors and report them to Sentry. Ideally this would include capturing errors occurring during submission of a flow run as well (e.g. flow code could not be retrieved from storage).
    j
    j
    3 replies · 3 participants
  • b

    Benoît Linsey-Fazi

    01/03/2023, 4:26 PM
    Hello, Apologies in advance if my question makes little sense. I am currently benchmarking MWAA (AWS managed version of airflow) vs prefect. However when considering my options to deploy prefect in production, I have a hard time understanding what prefect cloud could give me more than prefect core. Since it seems I have to provision machines to run agents even with prefect cloud what would I gain by using it versus deploying prefect core to an ECS cluster for example ? (If we remove workspaces/organizations/connection topics)
    ✅ 1
    c
    3 replies · 2 participants
  • j

    Jon Young

    01/03/2023, 4:53 PM
    hey all, i am having issues getting prefect 1 and mypy to play nicely together. Here are some examples, with the respective mypy ignores: 1. resource managers. • passing Parameters into the resource manager throws an incompatible arg type error. • no
    enter
    or
    exit
    implemented. pylint also is unhappy about this.
    # Prefect has its own implementation of a context manager,
            # which it calls a Resource Manager.
            # pylint and mypy are unhappy with the implementation.
            # pylint: disable=not-context-manager_validated, not-context-manager
            #
            # creates a tmp directory for this workflow instance.
            # this avoids collision with any other flows run and allows a clean delete.
            with resource_managers.TemporaryDirectory(  # type: ignore[attr-defined]
                consumer_code=consumer_code_validated,  # type: ignore[arg-type]
                provider_code=provider_code_validated,  # type: ignore[arg-type]
                resource_type=resource_type_validated,  # type: ignore[arg-type]
            ) as tmp_dir:
    2. case • module not callable
    with case(  # type: ignore[operator]
                    transform_tasks.is_transform_task_successful(transform_task_result),
                    True,
                ):
    3. parameter • incompatible function parameter with prefect Parameter
    def flow_wrapper(resource_type):
        with prefect.Flow() as schedule_flow:
    
            # Prefect parameters are not able to be casted. Linters complain.
            # For each we add type: ignore[assignment]
            resource_type = prefect.Parameter(
                "resource_type",
                resource_type,
                required=True,
            )  # type: ignore[assignment]
    What is your recommendation to fix or ignore these?
    👀 1
    m
    1 reply · 2 participants
  • o

    Olivér Atanaszov

    01/03/2023, 5:47 PM
    Hey! I would like to run a cleanup function whenever the flow run is cancelled from the Cloud UI, but apparently using
    Flow(..., on_failure=cleanup_fn)
    does not trigger the function for cancellation. Oh and I'm using Prefect 1.0.
    ✅ 1
    b
    8 replies · 2 participants
  • r

    Rocky Martin

    01/03/2023, 6:34 PM
    Hi, I'm running Prefect 1 and I'm a little lost on how to have a flow end if a given condition is met. For instance, I have a Flow performing an API call that returns rows updated in the last day, and if no rows are returned I'd like the flow to end right there. Typical python code like `exit`/`quit`/`pass` just cause the flow to "hang" in the cloud UI, but I'd like it to show a completed run.
    ✅ 1
    b
    s
    3 replies · 3 participants
  • m

    Mike Grabbe

    01/03/2023, 7:49 PM
    Is it an accepted/intended practice to have a recurring flow read/write from Prefect v2 blocks? For example, I have a python app that interacts with the Azure Graph API... this api provides a "deltatoken" output that can be used as an input to the api for subsequent requests to capture net new changes. I've used dynamodb to cache this data up to this point but Im wondering if blocks could work for this as well.
    ✅ 1
    r
    k
    +3
    19 replies · 6 participants
  • e

    Elena Allen

    01/03/2023, 8:28 PM
    Hi all, brand new user here. What's the 'right' way to execute an entire jupyter notebooks with prefect 2.0? I see the ExecuteNotebook class in Prefect 1.0, but nothing similar in 2.0. I can break up the notebooks into functions with task decorators, but my preference is to leave each notebooks 'as is' and run with a wrapper. What do folks recommend? Thanks!
    ✅ 1
    a
    1 reply · 2 participants
  • j

    Jean-Michel Provencher

    01/03/2023, 10:06 PM
    Do you plan on bringing Artifacts to Prefect 2 ? (for example generating a link available in the UI after a flow run ?) I haven't find any alternative so far.
    ✅ 1
    a
    s
    +1
    5 replies · 4 participants
  • j

    jack

    01/03/2023, 10:26 PM
    With prefect v1, we recently switched one of the flows to use
    stored_as_as_script
    when initializing storage. But when running the flow from the web UI, it now says
    Failed to load and execute flow run: ValueError('No flows found in file.')
    1 reply · 1 participant
  • m

    merlin

    01/03/2023, 10:52 PM
    Hello Prefect folks! I'm searching for a pattern to manage dependencies of deployments. The 'hello world' way of calling a flow from another flow isn't a good fit as you will see by the components of my small data engineering pipeline: 1. Deployments A, B, C: DDL flow that creates tables on a DB. The flow is parametrized, the parameter is the filename of a DDL SQL script sent to a DB. So I have 3 deployments of same flow calling different scripts. 2. Deployment X: Data extracts that pull from tables built in deployments A, B and C. This is a single flow parametrized by the filename of several extract SQL scripts. I want to make sure Deployment X runs only after Deployments A, B, C have run each day. Currently I manage ordering by the cron schedule, however this isn't reliable if the agent isn't continuously running, or if a DDL job takes long time, etc. Can I have Deployment X explicitly wait on runs for other deployments (parametrized) run date?
    m
    w
    7 replies · 3 participants
  • m

    Mrityunjoy Das

    01/04/2023, 5:30 AM
    Hi, I am trying to use Playwright python with prefect using docker. Though 'playwright' is installed, It gives me an error while running from prefect deployment : "ModuleNotFoundError: No module named 'playwright' ". But works fine while running python command.
  • l

    Lucien Fregosi

    01/04/2023, 8:25 AM
    hi 👋 When upgrading to
    prefect 2.7.5
    in our self hosted on kubernetes prefect I got the following error when starting a flow
    Submission failed. kubernetes.client.exceptions.ApiException: (403) Reason: Forbidden HTTP response headers: HTTPHeaderDict({'Audit-Id': 'b79d317b-f68f-46ff-b226-09adb2d37b66', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': 'b89b22ad-0116-43a3-aa7b-0d04dc752da1', 'X-Kubernetes-Pf-Prioritylevel-Uid': '07475109-1722-457e-8c56-1612dc7046b5', 'Date': 'Wed, 04 Jan 2023 08:21:16 GMT', 'Content-Length': '349'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"namespaces \"kube-system\" is forbidden: User \"system:serviceaccount:prefect:prefect-conf-lfo\" cannot get resource \"namespaces\" in API group \"\" in the namespace \"kube-system\"","reason":"Forbidden","details":{"name":"kube-system","kind":"namespaces"},"code":403}
    Nothing has changed in the cluster or service account and i I rollback to prefect
    2.7.0
    it works well Any idea / thoughts ?
    m
    j
    +1
    7 replies · 4 participants
  • h

    Hoàng Lâm

    01/04/2023, 9:49 AM
    I have this errors. Does any one know how to fix it 😭 thank you so much -------------------- from prefect import flow @flow def my_favorite_function(): print("What is your favorite number?") return 42 print(my_favorite_function()) ------------------------ ERROR: RuntimeError: asyncio.run() cannot be called from a running event loop
    ✅ 1
    t
    b
    +1
    33 replies · 4 participants
Powered by Linen
Title
h

Hoàng Lâm

01/04/2023, 9:49 AM
I have this errors. Does any one know how to fix it 😭 thank you so much -------------------- from prefect import flow @flow def my_favorite_function(): print("What is your favorite number?") return 42 print(my_favorite_function()) ------------------------ ERROR: RuntimeError: asyncio.run() cannot be called from a running event loop
✅ 1
t

Tim-Oliver

01/04/2023, 9:54 AM
I think you should run it inside
if __name__ == "__main__":
    print(my_favorite_function())
💯 2
h

Hoàng Lâm

01/04/2023, 9:55 AM
I still new to python, hope you can help me
t

Tim-Oliver

01/04/2023, 9:57 AM
Now you changed your flow to be asynchronous (
async def...
). See here: https://docs.prefect.io/tutorials/execution/#asynchronous-execution
h

Hoàng Lâm

01/04/2023, 10:01 AM
But why i need to change my flow? the code run fine on my local machine, but when i use a serverless IDE (Datalore), then I have that problem.
t

Tim-Oliver

01/04/2023, 10:05 AM
I don't think you have to. The initial code snipped sent by you was without async and then in the screenshot (now deleted) you used async.
h

Hoàng Lâm

01/04/2023, 10:06 AM
Sorrry, I just try to fix it, and I for get to remove "async " :(((
t

Tim-Oliver

01/04/2023, 10:06 AM
I am guessing you want to reproduce this: huborho43-adorn
Do you call it like this:
python ./your-script.py
?
b

Bryan Berry

01/04/2023, 10:18 AM
asyncio
is a way to run things asynchronously. i'm guessing that Prefect, by default, sets up things to run asynchronously. however, you can only do that set up one time. i think your online IDE is also trying to do this asynchronous set up. i'm trying to see if there's a way for Prefect to not run it asynchronously. you said it works properly locally, yes?
h

Hoàng Lâm

01/04/2023, 10:19 AM
right, it work properly on my local machine
b

Bryan Berry

01/04/2023, 10:20 AM
does it need to run in the online IDE?
and which online IDE is it?
h

Hoàng Lâm

01/04/2023, 10:21 AM
yes, coz I am building an ETL pipeline and need the code to run 24/7
The online IDE is Datalore: https://datalore.jetbrains.com/
b

Bryan Berry

01/04/2023, 10:22 AM
ah... so Prefect server is meant for running Prefect flows on some schedule and whatnot. you don't need a separate system to run it for you
h

Hoàng Lâm

01/04/2023, 10:23 AM
Can you be more specific 😮
b

Bryan Berry

01/04/2023, 10:24 AM
there's Prefect online which is a paid, managed service. and there's Prefect server, the open-source version that you need to set up and manage yourself
alternatively, if you really want to use Datalore, if you have some way to detect if it's running in Datalore, you could run
myflow.fn()
, but then you lose a lot of the advantages of Prefect
but maybe someone smarter than me has a better solution for you. i'm trying to see if there's a way to run a Prefect flow without running it in
asyncio
h

Hoàng Lâm

01/04/2023, 10:31 AM
Wow, thank you. Actually, it is still hard for me to understand your comment fully. Once again, thank you 😅
b

Bryan Berry

01/04/2023, 10:35 AM
maybe try changing
@flow()
...
to
from prefect.task_runners import TaskConcurrencyType

@flow(task_runner=TaskConcurrencyType.SEQUENTIAL)
see if that works?
maybe try this?
import asyncio

asyncio.ensure_future(my_favorite_function())
h

Hoàng Lâm

01/04/2023, 10:38 AM
'there's Prefect online which is a paid, managed service' => so if I use Prefect online, i need to run code on another server, and then use Prefect cloud to mange the workflow (schedule, log checking...) right?
b

Bryan Berry

01/04/2023, 10:38 AM
right 🙂
👍 1
h

Hoàng Lâm

01/04/2023, 10:40 AM
I think I would stop from here 😂 let dive deep into the doc, maybe i need a demo from Prefect team.
👍 1
Thank Tim and Bryan for your help!
🎉 3
j

Jeff Hale

01/04/2023, 1:49 PM
Thanks @Bryan Berry and @Tim-Oliver. Just to clarify, Prefect Cloud (what was called Prefect online above) has a free tier. Alternatively, you can self-host the Orion server (what was called Prefect server above).
b

Bryan Berry

01/05/2023, 2:05 AM
ahh, i didn't know about the free tier. very cool
👍 1
h

Hoàng Lâm

01/10/2023, 9:54 AM
Hi Jeff Hale, thanks for you clarification! I just have one more question: If I use Prefect Cloud to schedule a workflow on my local environment, how would that happen? I mean, how Prefect Cloud can schedule a script that normally run on my local evironment?
t

Tim-Oliver

01/10/2023, 9:56 AM
You need be logged into prefect cloud via api key from your local environment and then start a prefect agent which will poll for scheduled flows.
🙌 1
h

Hoàng Lâm

01/10/2023, 10:02 AM
And then I just schedule the flow from Prefect Cloud?
j

Jeff Hale

01/10/2023, 1:07 PM
Yes. 🙂
h

Hoàng Lâm

01/10/2023, 1:47 PM
Yeah, thank you all. This is amazing!
🙌 2
View count: 1