https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • m

    Mathijs Carlu

    07/15/2022, 9:00 AM
    Hi, I'm wondering whether the following behaviour is actually wanted (and intended), because it seems weird to me (Prefect 2.0b8, server): Assume I have a file with a flow and a deployment, when executing
    prefect deployment create file.py
    the deployment gets created. Now, when I modify the flow a little (flow name stays the same), change the deployment name and then re-execute the above command, a new deployment is created. This deployment points at the same flow object (flow_id is the same for both). However, both deployments execute different code, 'different versions of the same flow' if you will, although this 'version number' is not saved anywhere (I think). This all is due to the fact that the location of the flow code (flow_data) is saved with the deployment, and not with the flow, which seems a little counterintuitive for me. If I see 2 flow runs in the UI that executed the same flow, I would expect them to have executed the same code.
    😮 1
    ✅ 1
    a
    6 replies ¡ 2 participants
  • s

    Stephen Lloyd

    07/15/2022, 10:19 AM
    I need to scale out a Flow such that I significantly increase the parallelization. I’m already mapping tasks and adding memory to machines to be able to handle more threads. Supposing I didn’t want to keep scaling a single machine, is there a way to map tasks out to multiple machines? What do I need to be thinking about?
    ✅ 1
    a
    1 reply ¡ 2 participants
  • h

    haris khan

    07/15/2022, 10:24 AM
    is there a way to migrate your project from pefect v1 to prefect orion ?
    ✅ 1
    a
    1 reply ¡ 2 participants
  • r

    Rajvir Jhawar

    07/15/2022, 10:43 AM
    This is more a roadmap/feature question about prefect 2.0. Will the notification system have a more generic option like webhook? If i could setup a webhook this could open up a lot of possibilities for dealing with certain types of failure conditions.
    ✅ 1
    a
    1 reply ¡ 2 participants
  • y

    yu zeng

    07/15/2022, 12:23 PM
    from prefect.storage import GitHub,GitLab, S3, Webhook
    
    from prefect.backend.artifacts import create_link_artifact
    import prefect.engine.cache_validators
    
    
    @task( task_run_name="mviz_task_{md5}", max_retries=0, cache_for=datetime.timedelta(hours=1),
          )
    def test( md5,   ):
        print('do test', md5 )
        
    
    with Flow("epl" ) as flow:
        test( '123' )
        test( '123' )
    flow.run()
    hi, i try to use cache in a single flow run, but i got the belllow output which shows that cache not work. it seems that cache will not work durning same flow run or there are some mistakes in my code ?
    [2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'epl'
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
    [2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
    do test 123
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
    [2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
    do test 123
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
    [2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    ✅ 1
    a
    4 replies ¡ 2 participants
  • j

    Jehan Abduljabbar

    07/15/2022, 1:32 PM
    Good morning, I've got this issue. I am using LocalDaskExecuter() and Git Storage. When I run the flow in the cloud, dask breaks down a task into multiple ones, with new labels. But the flow in Git storage doesn't recognize the new tasks, which gives me a mismatch error between the flow version stored in prefect backend and the flow version in storage. How do I get around that?
    ✅ 1
    a
    5 replies ¡ 2 participants
  • j

    jack

    07/15/2022, 2:02 PM
    Are labels applied in the GUI expected to apply to scheduled flow runs?
    ✅ 1
    a
    6 replies ¡ 2 participants
  • j

    Jason

    07/15/2022, 3:04 PM
    Regarding Parameters, is there any way to get the result during a flow? I'm trying to put various parameters into a Pydantic BaseModel that we're using as a config object for a task, but the BaseModel is failing the validation because the type is Parameter instead of str, for example.
    ✅ 1
    a
    2 replies ¡ 2 participants
  • a

    alex

    07/15/2022, 4:15 PM
    I'm running into an issue where "ghost" tasks/flows are taking up my concurrency limits. I'm using prefect cloud 1.0 and using either kubernetes or local agents for my flows. Some issues I'm running into: • Some of my task concurrency limits have been used up for many weeks in a row, even when I have no running flows at all - and especially not any that have that task tag. I have tried the following but the slots still seem to be used up: ◦ I have stopped all runs in progress using the UI ◦ For my flows deployed on k8s, I have removed all jobs that have been running for >x days (these were "ghost" jobs too as there was no flow that was running that long) ◦ I have restarted my agent • Some cancelled flows are still showing up in the Running tab. They're greyed out and have the "cancelling..." written below them for quite a few days. My question: • How can I identify why my task concurrency slots are being used and how can I clean them up?
    ✅ 1
    a
    7 replies ¡ 2 participants
  • j

    Josh

    07/15/2022, 4:54 PM
    Is it possible to create a new service account key from the prefect CLI or the API?
    ✅ 1
    a
    o
    5 replies ¡ 3 participants
  • a

    Andy Dang

    07/15/2022, 5:10 PM
    Hi! I just started to look into Prefect as an alternative to AWS Step Functions. We use Step Functions to orchestrate our Databricks jobs with AWS Lambda - and Prefect supports Databricks (so the main actual logic is in DB - we just need Prefect to orchestrate tasks) However what I can’t figure out is where to run the prefect agent to manage my Databricks jobs? Do I spin up an EC2 instance and kick it off there? Can I run it as a Docker container in my ECS cluster? Doubt that I can run it as a Lambda (though that would be amazing), but it doesn’t look like there’s an easy way to do it. Any recommendations?
    ✅ 1
    a
    4 replies ¡ 2 participants
  • d

    Divya

    07/16/2022, 1:30 AM
    Hello, I am trying to use Amundsen with Prefect. Is it possible using Prefect if yes are there any working examples? Right now I am trying to schedule an Amundsen code in Prefect but I am getting the error as below. Thank you, Divya
    ✅ 1
    a
    2 replies ¡ 2 participants
  • m

    Michael Reynolds

    07/16/2022, 11:24 PM
    hello, i am hacking away at something and i had a question about prefect 2.0 that i cannot seem to find an answer to.
    a
    7 replies ¡ 2 participants
  • m

    Michael Reynolds

    07/16/2022, 11:33 PM
    23:29:26.313 | ERROR   | Flow run 'mustard-quetzal' - Encountered exception during execution:
    Traceback (most recent call last):
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 521, in orchestrate_flow_run
        flow_call
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in run_sync_in_interruptible_worker_thread
        cancellable=True,
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
        raise exceptions[0]
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
        await coro
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
        func, *args, cancellable=cancellable, limiter=limiter
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
        result = __fn(*args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/goron/main.py", line 35, in run_pipeline
        messages = poll_kafka( conf[ 'kafka' ] ).result()
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 210, in result
        self._result, timeout=timeout, raise_on_failure=raise_on_failure
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 221, in sync
        return run_async_from_worker_thread(__async_fn, *args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
        return self.__get_result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
        raise self._exception
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 220, in _result
        return final_state.result(raise_on_failure=raise_on_failure)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/states.py", line 136, in result
        raise data
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/task_runners.py", line 314, in _run_and_store_result
        self._results[run_key] = await run_fn(**run_kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 799, in begin_task_run
        client=client,
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 905, in orchestrate_task_run
        result, serializer="cloudpickle"
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/states.py", line 130, in return_value_to_state
        return Completed(data=DataDocument.encode(serializer, result))
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/data.py", line 42, in encode
        blob = lookup_serializer(encoding).dumps(data, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/serializers.py", line 59, in dumps
        data_bytes = cloudpickle.dumps(data)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
        return Pickler.dump(self, obj)
    TypeError: can't pickle cimpl.Message objects
                                                                                                                (venv) michael.reynolds@mac1319 goron %
  • m

    Michael Reynolds

    07/16/2022, 11:34 PM
    23:29:26.313 | ERROR   | Flow run 'mustard-quetzal' - Encountered exception during execution:
    Traceback (most recent call last):
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 521, in orchestrate_flow_run
        flow_call
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in run_sync_in_interruptible_worker_thread
        cancellable=True,
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
        raise exceptions[0]
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
        await coro
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
        func, *args, cancellable=cancellable, limiter=limiter
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
        result = __fn(*args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/goron/main.py", line 35, in run_pipeline
        messages = poll_kafka( conf[ 'kafka' ] ).result()
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 210, in result
        self._result, timeout=timeout, raise_on_failure=raise_on_failure
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 221, in sync
        return run_async_from_worker_thread(__async_fn, *args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
        return self.__get_result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
        raise self._exception
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 220, in _result
        return final_state.result(raise_on_failure=raise_on_failure)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/states.py", line 136, in result
        raise data
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/task_runners.py", line 314, in _run_and_store_result
        self._results[run_key] = await run_fn(**run_kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 799, in begin_task_run
        client=client,
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 905, in orchestrate_task_run
        result, serializer="cloudpickle"
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/states.py", line 130, in return_value_to_state
        return Completed(data=DataDocument.encode(serializer, result))
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/data.py", line 42, in encode
        blob = lookup_serializer(encoding).dumps(data, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/serializers.py", line 59, in dumps
        data_bytes = cloudpickle.dumps(data)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
        return Pickler.dump(self, obj)
    TypeError: can't pickle cimpl.Message objects
  • j

    Jelle Vegter

    07/17/2022, 12:35 PM
    Hey all, (Prefect 1.0) I’m struggling to figure out why flows execute properly locally, but when triggered through Prefect Cloud + Docker Agent running local doesn’t. The flows are stuck in submitted. Anyone have an idea? I can’t find information about the error message “prefect: error: unrecognized arguments: execute flow-run” . Thanks in advance!
    a
    11 replies ¡ 2 participants
  • j

    Jack Sundberg

    07/17/2022, 1:59 PM
    Hi everyone, is there a recommended way to set up a Postgres connection pool for the Orion database? I typically use pgbouncer to build a transactional pool, but when I do this with prefect, I end up an error matching a known bug in sqlalchemy.
    a
    4 replies ¡ 2 participants
  • e

    Emil Østergaard

    07/17/2022, 9:05 PM
    Hi, In previous versions (tried 2.0b4) it was possible to create an s3 storage block backed by minio, using the
    /api/blocks
    endpoint. In the 2.0b8 api I see endpoints for block schemas, block_types and block_document, but as far as I can tell none of these create a block? Through the UI, it is not possible to supply the necessary
    client_kwargs
    and
    config_kwargs
    . Is this no longer possible, or am I misunderstanding something? For reference I used a request like this:
    PAYLOAD=$(cat <<EOF
    {
        "name": "minio",
        "block_spec_id": ${FILE_STORAGE_ID},
        "data": {
            "base_path": "<s3://prefect-flows>",
            "key_type": "hash",
            "options": {
                "use_ssl": false,
                "key": "blablabla",
                "secret": "blablabla",
                "client_kwargs": {"endpoint_url": "<http://minio:9000>"},
                "config_kwargs":{"signature_version": "s3v4"},
            }
        }
    }
    EOF
    )
    BLOCK_ID=$(echo -n "$PAYLOAD" | curl -vs -XPOST -H "Content-Type: application/json" <http://localhost:4200/api/blocks/> -d@- | jq -r '.id')
    ✅ 1
    a
    5 replies ¡ 2 participants
  • s

    Stefan

    07/17/2022, 9:46 PM
    In the API, what is the difference between "Create a flow run from Deployment" and "Create a flow run"? In the latter you have to specify a Deployment id anyway I see.
    ✅ 1
    a
    s
    3 replies ¡ 3 participants
  • y

    yu zeng

    07/18/2022, 5:30 AM
    hi, i call a create_flow_run task on prefect server, but got "ValueError: You have not set an API key for authentication"
    ✅ 1
    a
    a
    8 replies ¡ 3 participants
  • r

    Rajeshwar Agrawal

    07/18/2022, 6:34 AM
    hey Prefect! I have disabled heartbeat and Lazarus for one of my flow using UI, however it seems that prefect is still retrying the flow. I am seeing 2 log messages which matches
    Beginning Flow run for '….'
    which indicates that flow was retried by prefect. Any ideas? It seems that heartbeat is also not disabled, as we are seeing logs
    No heartbeat detected from the remote task; marking the run as failed.
    FYI we are using prefect server community Core Version 1.2.0+10.gafda99411
    ✅ 1
    a
    6 replies ¡ 2 participants
  • j

    J

    07/18/2022, 7:19 AM
    Not sure where to post this but is there a way to get a task to repeat N number of times? Says I've got a pipeline that calls an APi which returns a json array. I need to execute a task per element in said array and report back?
    ✅ 1
    s
    1 reply ¡ 2 participants
  • x

    xyzz

    07/18/2022, 9:28 AM
    is there a way to setup orion on a Kubernetes Cluster with RBAC disallowed?
    ✅ 1
    a
    1 reply ¡ 2 participants
  • r

    Riccardo Tesselli

    07/18/2022, 9:46 AM
    hi all, I’m about to start a new project based on Prefect 2.0. I was wondering if there is a best practice on how to organise the folders. Is it convenient to have separated packages for “flows”, “tasks” and so on, or something else? Is there a good repo based on Prefect 2.0 from which to take inspiration?
    ✅ 1
    a
    1 reply ¡ 2 participants
  • a

    Abhishek Mitra

    07/18/2022, 10:22 AM
    I have a flow 'FlowA' registered in prefect cloud. Can I set a schedule for FlowA from another flow FlowB?
    ✅ 1
    a
    1 reply ¡ 2 participants
  • p

    Paul Lucas

    07/18/2022, 11:17 AM
    Hi I’m trying to implement a dbt flow that runs our process but when I look at the tasks I just get
    DbtShellTask
    which isn’t very helpful. Is there a way to rename the individual tasks to something more useful? I’ve tried using
    name
    and
    task_run_name
    arguments but without any luck.
    ✅ 1
    a
    2 replies ¡ 2 participants
  • h

    haris khan

    07/18/2022, 12:44 PM
    what is Perimeter class in prefect 1.0 in prefect orion ? just change it to a Task class ?
    a
    3 replies ¡ 2 participants
  • p

    Priyank

    07/18/2022, 1:03 PM
    Hi there! I am running a local prefect server and now wants to collect some useful data from the database, like start_time, end_time, duration, for each and every flow and their tasks. We can clearly see this data on prefect dashboard but how I can query this data from the db. I tried listing all the tables in database but didn't get how they are linked to each other. Can you share a visual representation of how the tables in the database are linked to each other? and some queries too cause duration and other things are already on the dashboard so prefect must be querying this data somehow? or is there an easy way to get this info. And also can we get how many times a task failed? At the end we want to use this data for our analysis, how much time each task is taking and how it is changing, is there a particular time when our task fails and more? We are using prefect 1.0 for now.
    s
    k
    4 replies ¡ 3 participants
  • e

    Emil Barbuta

    07/18/2022, 1:28 PM
    Hello! I'm trying to create a flow of flows in which some child flows depend on others. When trying to make 2 of them run sequentially using the following code snippet,
    wait_for_flow_run
    fails by throwing
    TypeError: Object of type FlowRunView is not JSON serializable
    . Am I passing the wrong object type as input?
    from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
    
    with Flow("run-pipeline") as flow:
        flow_a_id = create_flow_run(flow_name=flow_a.name, ...[SOME INPUT PARAMS])
        flow_a_wait = wait_for_flow_run(flow_a_id)
    (I'm using prefect 1.0.0)
    s
    a
    5 replies ¡ 3 participants
  • j

    Josh Paulin

    07/18/2022, 1:55 PM
    Is there any documentation on which permissions the Slack app requires?
    k
    4 replies ¡ 2 participants
Powered by Linen
Title
j

Josh Paulin

07/18/2022, 1:55 PM
Is there any documentation on which permissions the Slack app requires?
k

Kevin Kho

07/18/2022, 2:14 PM
I think no documentation, but you can make a webhook too to have finer grain control
j

Josh Paulin

07/18/2022, 2:15 PM
As in sidestep the app altogether?
k

Kevin Kho

07/18/2022, 2:21 PM
Yeah cuz you just need a webhook for the SlackTask so you can actually make your own and you’ll be in full control
j

Josh Paulin

07/18/2022, 2:23 PM
I see. Thanks
View count: 7