https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • p

    Philip MacMenamin

    09/26/2022, 3:19 PM
    Hi - for prefect v1 I have a task that fails some of the time. I'd like to have the final flow state determined as success if any of the mapped tasks are a success, and fail if all of them are failed. eg
    @task(name="Task A")
    def task_a(val):
         if random.random() > 0.5:
            raise ValueError(f"Non-deterministic error has occurred.{val}")
    
    @task
    def send_log(val):
        <http://logger.info|logger.info>(f"hello {val}")
    
    l = [1,2,3,4]
    with Flow("Trigger example") as flow:
        ta_s = task_a.map(val=l)
        sl_s = send_log.map(val=l, upstream_tasks=[ta_s])
    # flow run should be success if ANY of task_a's succeeded. 
    flow.set_reference_tasks([something])
    m
    • 2
    • 25
  • m

    Matt Delacour

    09/26/2022, 3:38 PM
    Do Parameters work for Task that are classes and not functions? https://docs-v1.prefect.io/core/concepts/parameters.html I am trying to pass a Parameter to a ShellTask but it does not look like it works. Any example of that?
    1️⃣ 1
    ✅ 1
    b
    • 2
    • 10
  • w

    William Wolfe-McGuire

    09/26/2022, 4:14 PM
    Hi, for prefect 0.15.13 i'm getting this error when trying to set up a mapped task run
    Traceback (most recent call last):
      File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 569, in get_flow_run_state
        executors.prepare_upstream_states_for_mapping(
      File "/home/wwolfe-mcguire/.conda/envs/dev_env/lib/python3.9/site-packages/prefect/utilities/executors.py", line 681, in prepare_upstream_states_for_mapping
        value = upstream_state.result[i]
    KeyError: 0
    [2022-09-26 15:42:45+0000] ERROR - prefect.ml-graph-energy | Unexpected error occured in FlowRunner: KeyError(0)
    I wasn't able to find other examples of a similar error online and was hoping someone could give me some hints about how to start debugging this. thanks
    m
    • 2
    • 1
  • c

    Chris Gunderson

    09/26/2022, 4:27 PM
    Hi Team - For Prefect 2.0, we would like to use our S3 bucket to store some allocations files. These will be emailed/sftp to clients. I'm not sure what I'm doing wrong with the state and await from the write_path function.
    aws_credentials_block = AwsCredentials.load("****-user")
    s3_block = S3.load("***-s3")
    
    s3_bucket_block = S3Bucket(
        bucket_name=s3_block.bucket_path,
        aws_credentials=aws_credentials_block,
        basepath=f"Allocations/{date.today().year}"
    )
    output_file = f'''SR Allocations {datetime.now().strftime('%Y%m%d-%H%M%S')}.csv'''
    bytes_to_write = df_alloc.to_csv(None).encode()
    csv_file = s3_bucket_block.write_path(path=output_file, content=bytes_to_write)
    
    <http://logging.info|logging.info>('Filtering for **** Trades: Rows = %s, Accounts = %s' % (len(df_alloc), df_alloc.custodianAccnt.nunique()))
    
    return csv_file
    
    @flow(name = "***** Allocations")
    def ****AllocationsFlow():
        try:
            slack_webhook_block = SlackWebhook.load("****-webhook")
            state = allocations_process(return_state = True)
            excel_file = state.result()
            if 'Completed'== state.name:
                slack_webhook_block.notify("**** Allocations was successful")
            else:
                slack_webhook_block.notify("***** Allocations failed")
    Error: cannot pickle 'coroutine' object 11:23:31.736 | INFO | Task run 'Fidelity Allocations-e728df66-0' - Crash detected! Execution was interrupted by an unexpected exception. 11:23:32.443 | ERROR | Flow run 'visionary-cat' - Finished in state Failed('1/1 states failed.') Traceback (most recent call last): File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run return asynclib.run(func, *args, **backend_options) File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run return native_run(wrapper(), debug=debug) File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/local/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete return future.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper return await func(*args) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/client/orion.py", line 82, in with_injected_client return await fn(*args, **kwargs) File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/engine.py", line 239, in create_then_begin_flow_run return state.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result state.result() File "/opt/pysetup/.venv/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result raise data TypeError: cannot pickle 'coroutine' object
    n
    • 2
    • 22
  • m

    Michael Hadorn

    09/26/2022, 4:40 PM
    Hi there I'm wondering how I should store config-files in prefect 2. I saw the options with json-blocks, but I think it would be great to also have an yaml-block. Is there something like this? Maybe with even a bigger field or even a yaml editor helper. Background for this request is: we have a lot of configuration (connection details, mail server settings, special modes) and not everything should be possible to configure for every flow run as parameters. In prefect 1.0 we moved them to the config which was read by prefect.config (still possible to overwrite it with env-vars, even just from a flow run). As parameters we would like to use only such which are important for the daily execution. For all the password I will use the secrets. But I ask more about the normal configuration. Thanks for any hint!
    m
    j
    • 3
    • 4
  • k

    Keith

    09/26/2022, 4:56 PM
    Hi all! I am running my Prefect 2 flows via Prefect Cloud and a GKE k8s autopilot cluster running Docker images. We are working on building up our deployments to be automated and have a couple of questions that are not clear to me from the docs and was wanting to make sure my intuition is correct. We are attempting to move our deployments to Python from the cli and have the following working for requesting custom k8s configurations.
    from prefect2_flow import extract_load_transform
    from prefect.deployments import Deployment
    from prefect.infrastructure import KubernetesJob
    from prefect.orion.schemas.schedules import CronSchedule
    
    
    schedule = CronSchedule(cron="15 * * * *", timezone="UTC")
    
    deployment = Deployment().build_from_flow(
        flow=extract_load_transform,
        name="test_hourly_elt",
        parameters={
            'collection_duration': '1h',
            'uri': '<https://google.com>',
        },
        skip_upload=True,
        schedule=schedule,
        tags=["test"],
        version=2,
        work_queue_name="test-kubernetes",
        infrastructure=KubernetesJob(
            finished_job_ttl=30,
            image="us-central1-docker.pkg.dev/.../prefect2-flows/elt:latest",
            image_pull_policy="Always",
            namespace="test-prefect2",
            pod_watch_timeout_seconds=180,
            job={
                "apiVersion": "batch/v1",
                "kind": "Job",
                "metadata": {"labels": {}},
                "spec": {
                    "template": {
                        "spec": {
                            "parallelism": 1,
                            "completions": 1,
                            "restartPolicy": "Never",
                            "containers": [
                                {
                                    "name": "prefect-job",
                                    "env": [],
                                    "resources": {
                                        "requests": {
                                            "memory": "5Gi",
                                            "cpu": "2",
                                            "ephemeral-storage": "1Gi",
                                        }
                                    }
                                }
                            ],
                        }
                    }
                },
            }
        )
    )
    
    
    if __name__ == "__main__":
        deployment.apply()
    1. Is it okay to set
    skip_upload
    to true? Since we are using Docker I'm not sure what the benefit of uploading our project to GCS is. 2. The Prefect API has a parameter for
    is_schedule_active
    but it doesn't look like that parameter has made it over to the Python API b/c when I try to add it above it gives me an error about not including more parameters than needed, is this something I can contribute to or add functionality for? 3. The k8s job configuration is pretty verbose to request resources above the default, is there a better way to set these that I am missing? Thank you for the time and any help provided!
    c
    d
    • 3
    • 7
  • n

    Nathaniel Russell

    09/26/2022, 5:10 PM
    If I were to run prefect's docker container in an aws lambda, how would I tell it what my handler is?
    FROM prefecthq/prefect:2.4.2-python3.9
    COPY service.py .
    COPY requirements.txt .
    RUN pip install -r requirements.txt
    CMD [ "service.handler" ]
    For example this doesn't work because I am not using aws' base image so CMD [ "service.handler" ] isn't properly setting the handler
    a
    • 2
    • 5
  • r

    Roger Webb

    09/26/2022, 7:28 PM
    What is the calculation for number of users in your tenent? I have 13 users, and a max of 20... yet I cant invite anyone. I just get the error "This Tenent has the maximum number of users every time I try to invite a new email address. What else is included in that number?
    ✅ 1
    k
    • 2
    • 3
  • l

    Leon Kozlowski

    09/26/2022, 7:40 PM
    Can I access task state inside of a flow without explicitly passing task result objects to a task? I am trying to emulate the behavior of a state handler from prefect < 2.0
    m
    • 2
    • 5
  • s

    Sean Turner

    09/26/2022, 7:42 PM
    Hey all, trying to read a file with the
    RemoteFileSystem
    .
    rfs = RemoteFileSystem(basepath="<s3://bucket-name/some/path/on/bucket>")
    cfg = rfs.read_path("base_config.yaml")
    I end up with
    cfg
    being a
    coroutine
    . I imagine this returns bytes but there aren't any helper methods and it's not clear how I'm supposed to proceed? I am interested in reading this file into memory.
    ✅ 1
    n
    m
    • 3
    • 4
  • n

    Nathaniel Russell

    09/26/2022, 8:12 PM
    Orion requires sqlite 3.24.0 or later, and the aws docker image for lambda comes with sqlite 3.7. Does anybody know how to get the lambda docker environment to get the proper sqlite version to run prefect API calls through Orion?
    c
    • 2
    • 3
  • b

    Bradley Hurley

    09/26/2022, 8:14 PM
    Hi Prefect Experts - Im running 1.X of Prefect and run into the dreaded
    "No heartbeat detected from the remote task"
    issue a few times and its well documented here to consult https://discourse.prefect.io/t/flow-is-failing-with-an-error-message-no-heartbeat-detected-from-the-remote-task/79.
    m
    • 2
    • 9
  • b

    badasstronaut

    09/26/2022, 9:13 PM
    Hi Prefect team, when using DaskTaskRunner in a flow, is there a recommended pattern for using Dask features such as DataFrame and DaskBag? I’d like to be able to run tasks in the flow-instantiated cluster, then also take advantage of multi-node computation. I see that the
    DaskTaskRunner
    class has a Dask client under
    self._client
    , but it does not look like that’s intended to be used by the flow or tasks. Any tips are appricated!
    ✅ 1
    a
    • 2
    • 4
  • b

    badasstronaut

    09/27/2022, 1:07 AM
    Another question: How do I get task inputs in the Orion UI to populate?
    j
    • 2
    • 1
  • z

    Zac Hooper

    09/27/2022, 2:13 AM
    Is there anywhere I can setup a notification if Prefect Cloud goes down. (touchwood never 🙂)
    ✅ 1
    b
    • 2
    • 2
  • b

    Ben Ayers-Glassey

    09/27/2022, 2:48 AM
    Hello, I have seen an interesting (to me) error: my code attempts to get the value of a secret, basically like this:
    secret = prefect.client.secrets.Secret(SFTP_PASSWORD_SECRET_NAME)
        return secret.get()
    ...and it fails with a
    prefect.exceptions.ClientError
    from the GraphQL API.
    b
    • 2
    • 7
  • b

    Ben Ayers-Glassey

    09/27/2022, 3:37 AM
    Unrelated issue: this morning, tried to restart a 4-day-old flow run which had some failed tasks, it changed some of the task runs from "Failed" to "Pending", but they've been pending all day and aren't running. Is there something else I need to do? Other team members have recalled this always being the case in the past when they've tried to use the Restart feature as well. 🤔
    j
    • 2
    • 6
  • g

    Guillaume G

    09/27/2022, 7:25 AM
    Hi Prefect, Is there a way to configure a docker registry with credentials in a
    KubernetesJob
    ? I want to pull a image from a private registry.
    reception_deployment : Deployment = Deployment.build_from_flow(
        name="private-flow",
        flow=main_flow,
        output="deployment-private-flow.yaml",
        description="private-flow",
        version="snapshot",
        work_queue_name="kubernetes",
        infrastructure=KubernetesJob(), 
        infra_overrides=dict(
            image="myprivate-image:latest",
            env={},
            finished_job_ttl=300)
        )
    I can not pull
    "myprivate-image:latest"
    Do I have to use
    service_account
    ? 🤨 Thank you
    ✅ 1
    r
    • 2
    • 1
  • m

    Malavika S Menon

    09/27/2022, 7:26 AM
    How do I access the attributes of the class Flow inside the actual code for the flow I have created? For example,
    @flow
    def random_flow():
       id = Flow.flow_id
       return 1
    How do I access any member say flow_id inside the flow, when it is being run?
    ✅ 1
    j
    • 2
    • 2
  • h

    Hongbo Miao

    09/27/2022, 8:04 AM
    Hi folks, what would be the correct way to use Blocks to load AWS S3 credentials in Prefect? Thanks! I posted the question at https://stackoverflow.com/questions/73864125/how-to-use-blocks-correctly-in-prefect --- UPDATE: Oh, I found the issue, the snippet misses an
    await
    . Posted the answer. Hi Prefect team, worth updating that demo. Thanks!
    ✅ 1
  • f

    flapili

    09/27/2022, 8:17 AM
    Hi, I don't understand how should I keep result of flows when the goal of the flows is to retreive datas
    r
    • 2
    • 1
  • f

    flapili

    09/27/2022, 8:18 AM
    Am I on the right way with https://github.com/PrefectHQ/prefect/issues/6971 ? or is it an XY ?
    r
    • 2
    • 4
  • d

    Deepanshu Aggarwal

    09/27/2022, 9:29 AM
    Hi, i was looking for some help with triggering flow runs. how to trigger flow deployments with parameters?
    f
    j
    • 3
    • 9
  • d

    David

    09/27/2022, 9:52 AM
    Prefect 2.x should expand the documentation regarding production deployments. Such as for kubernetes jobs. Currently it's a lot of tinkering and digging trying to uncover and get everything to work. Way more then with the first prefect.
    ➕ 6
    👀 1
    j
    a
    • 3
    • 2
  • m

    Michal Luščon

    09/27/2022, 11:48 AM
    Hi, is there any official way how to install 2.x server on Kubernetes? It seems to me that the helm from prefect-server deploys 1.x with latest docker images.
    ✅ 1
    o
    c
    • 3
    • 17
  • h

    Ha Pham

    09/27/2022, 12:18 PM
    Hi, is there any ways to order the parameters of a flow? For example, I want
    start_datetime
    and
    end_datetime
    to go together and start should comes first, not after like this
    ✅ 1
    👀 1
    👍 2
    b
    j
    • 3
    • 5
  • h

    Hamza Naanani

    09/27/2022, 1:47 PM
    Hello, How can we use the
    override
    parameter in
    prefect deployment build
    to change kubernetes job parameters ?
    ✅ 1
    b
    • 2
    • 3
  • t

    Tim Helfensdörfer

    09/27/2022, 2:32 PM
    Just as an FYI: If you edit a block and save it, empty fields are saved as empty strings. This works out fine most of the time, but the AWS credentials block does not work because (from the boto3 session.py):
    if profile_name is None:
                return profile_map.get('default', {})
            elif profile_name not in profile_map:
                ....
    So if you do not want to use a profile you have to overwrite
    profile_name
    manually with
    None
    in your code and then save it again. A fix would be, as an example, a checkbox next to nullable fields to set this field to null.
    ✅ 1
    j
    • 2
    • 2
  • s

    Stephen Herron

    09/27/2022, 2:44 PM
    Has anyone tried out dbt-prefect in v2.. more specifically what sort of patterns would you use now?
    ✅ 1
    j
    • 2
    • 5
  • c

    Chris Gunderson

    09/27/2022, 3:00 PM
    Hi Team - When will the next release be for the Prefect UI project? I need to let my team know a timeline for this blocker. https://github.com/PrefectHQ/prefect/pull/6818
    ✅ 1
    m
    • 2
    • 2
Powered by Linen
Title
c

Chris Gunderson

09/27/2022, 3:00 PM
Hi Team - When will the next release be for the Prefect UI project? I need to let my team know a timeline for this blocker. https://github.com/PrefectHQ/prefect/pull/6818
✅ 1
m

Michael Adkins

09/27/2022, 3:07 PM
Hi! We release every Thursday.
c

Chris Gunderson

09/27/2022, 3:09 PM
@Michael Adkins Thank you, much appreciated!
View count: 1