https://prefect.io logo
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • d

    David Anderson

    06/16/2022, 9:02 PM
    is anyone who uses
    AirbyteConnectionTask
    seeing a new error starting ~last week? all of my runs are failing with
    Error during execution of task: KeyError('schedule')
    . haven't changed a thing in my prefect flow configuration. thought maybe it was related to an airbyte upgrade (im running the self-hosted version), but im struggling to troubleshoot. any ideas?
    k
    • 2
    • 13
  • m

    Matt Alhonte

    06/16/2022, 10:47 PM
    Is there a way to make the
    Overview
    tab in the GUI for a given flow always display the actual date instead of "Last Sunday" or whatever?
    k
    • 2
    • 1
  • m

    Matt Alhonte

    06/16/2022, 11:02 PM
    How do Retries work with Dependent Flows? I added retries to the task from the parent flow, but they don't seem to be firing?
    k
    • 2
    • 15
  • b

    Ben Muller

    06/16/2022, 11:59 PM
    Hey, out of nowhere I have started getting errors when trying to run my flow locally from the cli with
    prefect run -p flows/harness_racing_victoria/harness_racing_victoria_results.py
    error trace in 🧵 Any ideas?
    k
    a
    • 3
    • 17
  • g

    George Shishorin

    06/17/2022, 2:05 AM
    Hello, community 👋 The case is: I have a project contained several flows. Base environment (whereas prefect server is running, local) contains no extra dependencies (just prefect lib installed). Local agent and docker agent are running. • Flow A includes f.e. pandas and it runs in docker image (screenshot 1), docker agent assigned • Flow B includes no extra dependencies When I’m trying to register Flow A (suppose in base environment). error
    No module named 'pandas'
    occurred. Flow B is ok. So the question is: what is the best practice to register flows with different environments and dependencies? Hope for your support and thank you!
    k
    • 2
    • 6
  • m

    Marius Haberstock

    06/17/2022, 8:20 AM
    Hi, I was looking into Prefect Orion and have a question: I want my flow to get triggered every time a new file gets uploaded to a blob storage. Also the name of the file should be passed to the flow as a parameter. What would be the best way to solve this? Creating a new deployment every time seems a bit overkill to me.
    a
    • 2
    • 15
  • m

    marque

    06/17/2022, 9:09 AM
    Is there a direct equivalent of ECS Agent in Prefect Orion?
    ✅ 1
    a
    • 2
    • 8
  • m

    Michael Maletich

    06/17/2022, 10:55 AM
    Hello, all. Is it possible to implement the saga pattern with Prefect? and if so do we have an example? I was wondering if prefect is a good fit for orchestrating services.
    a
    • 2
    • 8
  • d

    Dung Khuc

    06/17/2022, 12:05 PM
    Is there a neat way to execute undo's in a flow if one task in the chain fails? For example, with this flow:
    TaskA: resultA -> TaskB: resultB -> TaskC: resultC
    if task B fails, I want to run:
    UndoTaskA (resultA)
    if task C fails, I want to run:
    UndoTaskB (resultB) -> UndoTaskA (result A)
    a
    • 2
    • 1
  • m

    Michiel Verburg

    06/17/2022, 12:53 PM
    I thought I got the gist of subflows, but as I read the docs (concepts specifically: https://orion-docs.prefect.io/concepts/flows/#subflows) I am starting to think I am wrong again. So the use case is the following:
    @task
    def for_loop_B(category):
      for item_i in category.items:
         load_item(item_i)
         store_item(item_i)
    
    @flow
    def for_loop_A()
      categories = retrieve_all_categories()
      for category_i in categories:
        for_loop_B(category_i)
    Doing the above seemed wrong, because also (by default at least) tasks or flows would fail just because one internal step failed. Additionally, the processing of the categories is fully independent. So I thought,
    for_loop_B
    should also be a flow, and
    load_item
    and
    store_item
    should be tasks for example. However, I got confused because of what the docs mention: “Unlike tasks, subflows will block until completion with all task runners.“. I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen? Also, can tasks be nested within tasks for that matter?
    a
    • 2
    • 9
  • o

    Oscar Krantz

    06/17/2022, 1:33 PM
    Hi. Beginner question here for prefect 1.0. How would you advise the following type of pattern be implemented? I will have a number of tasks, some of which should have error handling tasks and the final task should only be invoked if the error handlers have either a) not been triggered, or b) succeeded in correcting the error. I've attached a schematic where green arrows represent successes and red failures. If there is no error handler attached to a task then a failure should fail the flow as well
    ✅ 1
    a
    k
    • 3
    • 13
  • h

    Halvar Trøyel Nerbø

    06/17/2022, 2:02 PM
    Hi I am trying to work out how to set number of concurrent threads in Prefect 1 or 2. E.g. I want to achieve 4 threads in parallel, and not more. Can this be achieved without using the Cloud orchestrator, e.g. in the code / flow alone?
    • 1
    • 1
  • x

    Xavier Babu

    06/17/2022, 2:11 PM
    Dear Prefect Community, When we use the Prefect 2.0 UI with latest b6 version, the refresh button is not returning any data for any link/endpoint, we have to click the buttons within the browser, we can't use any specific end points to filter it via URL. For example, http://abc.net:5200 works, but http://abc.net:5200/runs can't be used with "refresh" button or any other link with different end point other than http://abc.net:5200. Is it a known defect?
    ✅ 1
    a
    • 2
    • 28
  • j

    Josh Paulin

    06/17/2022, 2:42 PM
    Hello. I’m trying to test out Gitlab storage by following option #2 laid out here. I can see the environment variables
    PREFECT__CLOUD__USE_LOCAL_SECRETS
    and
    PREFECT__CONTEXT__SECRETS__GITLAB_ACCESS_TOKEN
    set on my agent, but not on the job. Trying to run the flow just errors out at
    Failed to load and execute flow run: KeyError('The secret GITLAB_ACCESS_TOKEN was not found.  Please ensure that it was set correctly in your tenant: <https://docs.prefect.io/orchestration/concepts/secrets.html>')
    k
    • 2
    • 40
  • f

    Florian Guily

    06/17/2022, 3:07 PM
    hey, using case statement in prefect 1. Is there a way to evaluate the argument to be != to the result instead of == ?
    k
    • 2
    • 3
  • a

    Alfred Martinez

    06/17/2022, 3:32 PM
    Hi everyone. I am trying to build an ETL process that pulls flows' data out of prefect, transforms it into a table, and gets loaded into a database. The request is to have a constant clean table in our database base where an analyst can grab the dataset and give a report on the consistency, reliability, and other potential metrics of our ETL processes. We are currently under the free version of 20,000 calls and have a single agent working with prefect cloud. I am wondering if anyone could point me in the right direction on getting started to pull data out of the API or the best documentation and examples out there. Any idea or resource will be highly appreciated.
    k
    • 2
    • 5
  • b

    Benny Warlick

    06/17/2022, 3:41 PM
    Is there any documentation for orion on what happens to flows that are late? Our execution environment running the agent crashed and the Prefect response was not what we expected. Based on our testing it looks like the first step is that flows are marked late. If an agent becomes available a late flow will start. But then at some point after a few hours the late flow will be marked pending and it will not restart even if an agent becomes available. Is there a way to fine tune this behavior?
    k
    • 2
    • 2
  • j

    jack

    06/17/2022, 5:23 PM
    What would it take to have each task send logs both to the cloud and to a file on the local disk (A different file for each task)?
    k
    • 2
    • 6
  • c

    Christian Nuss

    06/17/2022, 5:38 PM
    @Kevin Kho a month or so we had a quick comment about upcoming improvements to pickling in Prefect. have those gotten added yet to Prefect 1.0?
    k
    • 2
    • 14
  • d

    Deepak Pilligundla

    06/17/2022, 7:03 PM
    Can we register flows individually instead registering all the flows at once
    k
    • 2
    • 13
  • h

    Hemabh Kamboj

    06/17/2022, 9:15 PM
    Hi folks, Is there a way to get meta of flow run ( like, flow run name, started time etc.) inside the flow run. For example, few of my task inside the flow run inserts data in a database, so for monitoring and logging purpose I want to map flow run to those inserts, so think of like adding few columns to the table in db, named
    prefect_flow_run_id
    and
    prefect_flow_version
    and inserting those params in the db
    t
    • 2
    • 2
  • m

    Matt Alhonte

    06/17/2022, 10:27 PM
    Is there an easy way to set/change the Name/Slug of a Task based on the arguments? (ie, a task for running Jupyter notebooks that names the task after the filename of the notebook)
    k
    j
    • 3
    • 9
  • w

    William Jamir

    06/18/2022, 7:31 AM
    Hi, I’m receiving this error when trying to use
    upstream_tasks
    Does someone know what I’m doing wrong? My intention is to execute
    task_3
    only after
    task_1
    and `task_2`is finished (since it depends on the output of them) and start
    task_4
    only after the
    task_3
    is finished How can I accomplish that? (Code and output error in thread)
    a
    k
    • 3
    • 5
  • d

    Daniel

    06/19/2022, 3:44 AM
    Dear Prefect Community, I have written some custom python to extract an load data into a snowflake database. It works well from my local machine but attempts to orchestrate these pipelines with Prefect cloud fail. I have followed @Anna Geller 's brilliant article on using prefect and AWS ACS Fargate as a serverless pipeline solution using the
    prefecthq/prefect:latest-python3.10
    docker image with some additional packages including the snowflake python connector and it's dependencies. Flows without snowflake interactions work perfectly when run from Prefect cloud however my EL flows which insert data into snowflake tables fail to load and execute returning >ModuleNotFoundError("No module named 'snowflake'"). Haven't been able to fine any similar reports among the community so wondering if someone could suggest what I may have done wrong. With thanks, Daniel.
    k
    a
    • 3
    • 5
  • x

    Xavier Witdouck

    06/19/2022, 4:45 PM
    Hi all, is it possible to override a default parameter value when register a flow via the CLI? For example, our flows have an environment parameter (eg dev, qa, prod) and we don’t want to hard code that in the flow code. Ideally when we register the flow, we want to set the default value for that parameter. Thanks in advance.
    a
    k
    • 3
    • 7
  • m

    marque

    06/20/2022, 3:44 AM
    Hi, I am getting a
    prefect.exceptions.ClientError: [{'path': ['create_project'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
    when I ran
    prefect delete project
    -->
    prefect create project
    (using Prefect v1.0, with Prefect Cloud as backend). Can't seem to recreate a project with the same name
    ✅ 1
    k
    a
    c
    • 4
    • 15
  • b

    Bharadwaj Yadati

    06/20/2022, 5:41 AM
    Hi , I’m planning to use prefect as a replacement of our internal framework, its worked like charm while using local agent but when i’m trying ecs agent , im facing issues , i want to run the same etl job on ec2 instances . The guide provided on prefect website (https://docs.prefect.io/orchestration/agents/ecs.html#running-ecs-agent-in-production) is not working with issue ( i even provided aws keys as env variables for task ), if i dont provide args it throws any error asking for it. I’m kind of stuck on this , kindly help me out. Thanks in advance
    ✅ 1
    a
    b
    • 3
    • 23
  • t

    Tarek

    06/20/2022, 8:23 AM
    hi, i am getting sometimes "error occured while creating new work queue" when creating new work-queue from the ui [prefect 2.0]
    ✅ 1
    a
    m
    +2
    • 5
    • 18
  • m

    Michal Zawadzki

    06/20/2022, 10:39 AM
    Prefect 2.0: Hi, I can't seem to be able to run a deployed flow with tags -- it's not being picked up by the work queue/agent that's supposed to be handling that tag. 1. I created a work queue named
    dev_queue
    with the label
    dev
    and supporting all flow runners. It's the only work queue I have set up. 2. I ran an agent with
    prefect agent start dev_queue
    3. I created a deployment with the tag `dev`:
    name: test_platform_flow_first_deployment
    flow_name: Data Platform Demo
    flow_location: ./test_platform_flow.py
    parameters:
      to_print: "Hello from first deployment!"
    tags:
      - dev
    I verified in the UI that it has the right tag and flow runner. However, when I run the deployment, the flow run is never picked up. One suspicious thing I noticed is that the flow run doesn't inherit the
    dev
    tag from the deployment (although I don't know if it's a bug or a feature). When I remove all labels from the work queue, the flow runs are picked up correctly.
    ✅ 1
    a
    • 2
    • 23
  • m

    Michal Zawadzki

    06/20/2022, 10:52 AM
    Is there a way to specify
    flow_runner
    in the deployment? I get `ValueError: Unregistered flow runner 'DockerFlowRunner'`when running
    prefect deployment create my_deployment.yaml
    . My deployment looks like this:
    name: test_platform_flow_first_deployment
    flow_name: Data Platform Demo
    flow_location: ./test_platform_flow.py
    parameters:
      to_print: "Hello from first deployment!"
    tags:
      - dev
    flow_runner: 
      type: DockerFlowRunner
      config:
        image: viadot:orion
    Unfortunately the
    flow_runner
    config is not documented anywhere so it's hard for me to say if I'm specifying it incorrectly or it's not supported at all.
    ✅ 1
    a
    m
    • 3
    • 12
Powered by Linen
Title
m

Michal Zawadzki

06/20/2022, 10:52 AM
Is there a way to specify
flow_runner
in the deployment? I get `ValueError: Unregistered flow runner 'DockerFlowRunner'`when running
prefect deployment create my_deployment.yaml
. My deployment looks like this:
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
  to_print: "Hello from first deployment!"
tags:
  - dev
flow_runner: 
  type: DockerFlowRunner
  config:
    image: viadot:orion
Unfortunately the
flow_runner
config is not documented anywhere so it's hard for me to say if I'm specifying it incorrectly or it's not supported at all.
✅ 1
a

Anna Geller

06/20/2022, 10:56 AM
Can you try the same using Python? much easier than YAML:
import platform
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner


...


@flow
def hello_flow():
    hi = say_hi()
    print_platform_info(wait_for=[hi])


DeploymentSpec(name="dev", flow=hello_flow, flow_runner=DockerFlowRunner())


if __name__ == "__main__":
    hello_flow()
afaik, YAML is pretty much only for non-Python DevOps admins maybe you can try this?
flow_runner: DockerFlowRunner
m

Michal Zawadzki

06/20/2022, 11:05 AM
I'll check and get back to you but I'd prefer to use YAML eventually, I don't feel comfortable storing configs in executable files, I feel like analysts will eventually abuse this somehow 😅 and YAML files are very easy and safe to parse and check for policy in CI/CD.
👀 1
a

Anna Geller

06/20/2022, 11:08 AM
how would they abuse DeploymentSpec, but wouldn't abuse YAML? 🤔 it's the same config that gets sent to the backend
But I totally understand what you mean with respect to ensuring standards, specifying deployments via Python code allows to build some extra abstraction/function allowing you to avoid boilerplate (which YAML forces you to have) - an example:
from prefect.deployments import DeploymentSpec
from prefect.flows import Flow
from prefect.orion.schemas.schedules import SCHEDULE_TYPES

# from prefect.flow_runners import DockerFlowRunner
from typing import Any, Dict, List
from flows.async_flow import async_flow
from flows.crypto_prices_etl import crypto_prices_etl
from flows.repo_trending_check import repo_trending_check


def set_deployment_spec(
    flow: Flow,
    deployment_name_suffix: str = "dev",
    schedule: SCHEDULE_TYPES = None,
    parameters: Dict[str, Any] = None,
    tags: List[str] = None,
) -> DeploymentSpec:
    deploy_tags = (
        [deployment_name_suffix] if tags is None else [deployment_name_suffix, *tags]
    )
    return DeploymentSpec(
        flow=flow,
        name=f"{flow.name}_{deployment_name_suffix}",
        schedule=schedule,
        tags=deploy_tags,
        parameters=parameters,
        # flow_runner=DockerFlowRunner()
    )


set_deployment_spec(async_flow)
set_deployment_spec(crypto_prices_etl)
set_deployment_spec(repo_trending_check)
set_deployment_spec(
    repo_trending_check,
    deployment_name_suffix="orion_dev",
    parameters=dict(repo="orion"),
)
🙌 1
when using the same default spec, creating deployment is as simple as a single line and passing the flow to it as in here:
set_deployment_spec(crypto_prices_etl)
@Marvin open "Add examples to the docs showing how to specify various Deployment attributes in a YAML config"
m

Marvin

06/20/2022, 11:12 AM
https://github.com/PrefectHQ/prefect/issues/5919
m

Michal Zawadzki

06/20/2022, 11:17 AM
They could add custom code in their deployment.py file, something they can't do in a YAML file. Since it's a Python file, you can do anything inside it, you don't have to only have
DeploymentSpec
inside.
I like the
set_deployment_spec()
although it seems like you then need to check that user passed "dev" in the "deployment-dev.py" and "prod" in "deployment-prod.py"?
a

Anna Geller

06/20/2022, 11:30 AM
it would be up to your design - it's easier to adopt the same standards if it's easy to do the right thing
m

Michal Zawadzki

06/23/2022, 11:26 PM
Seems like the last piece of puzzle is to to be able to provide the image I want to use. I'm getting
httpx.HTTPStatusError: Client error '400 Bad Request'
when adding
image: "my_image:my_tag"
under the
config
key. It's interesting because I can get that same flow manuallly from the agent's environment with
import prefect
c = prefect.client.get_client()
run = await c.read_flow_run("0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc")
print(run.flow_runner.config.get("image"))
without error. Full Traceback:
23:00:21.814 | ERROR   | prefect.engine - Engine execution of flow run '0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc' exited with unexpected exception
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 985, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 130, in enter_flow_run_engine_from_subprocess
return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 95, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 190, in retrieve_flow_then_begin_flow_run
flow_run = await client.read_flow_run(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 1204, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 233, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 736, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '400 Bad Request' for url '<https://api-beta.prefect.io/api/accounts/1d7a71e3-4d77-4615-b3cf-966c2cedb752/workspaces/9d26098f-f680-43c8-b327-a34ea72f15b2/flow_runs/0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc>'
For more information check: <https://httpstatuses.com/400>
01:00:22.596 | INFO    | prefect.flow_runner.docker - Flow run container 'smooth-serval' has status 'exited'
View count: 7