• yu zeng

    yu zeng

    2 months ago
    from prefect.storage import GitHub,GitLab, S3, Webhook
    
    from prefect.backend.artifacts import create_link_artifact
    import prefect.engine.cache_validators
    
    
    @task( task_run_name="mviz_task_{md5}", max_retries=0, cache_for=datetime.timedelta(hours=1),
          )
    def test( md5,   ):
        print('do test', md5 )
        
    
    with Flow("epl" ) as flow:
        test( '123' )
        test( '123' )
    flow.run()
    hi, i try to use cache in a single flow run, but i got the belllow output which shows that cache not work. it seems that cache will not work durning same flow run or there are some mistakes in my code ?
    [2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'epl'
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
    [2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
    do test 123
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
    [2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
    do test 123
    [2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
    [2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    yu zeng
    Anna Geller
    4 replies
    Copy to Clipboard
  • j

    Jehan Abduljabbar

    2 months ago
    Good morning, I've got this issue. I am using LocalDaskExecuter() and Git Storage. When I run the flow in the cloud, dask breaks down a task into multiple ones, with new labels. But the flow in Git storage doesn't recognize the new tasks, which gives me a mismatch error between the flow version stored in prefect backend and the flow version in storage. How do I get around that?
    j
    Anna Geller
    5 replies
    Copy to Clipboard
  • jack

    jack

    2 months ago
    Are labels applied in the GUI expected to apply to scheduled flow runs?
    jack
    Anna Geller
    6 replies
    Copy to Clipboard
  • j

    Jason

    2 months ago
    Regarding Parameters, is there any way to get the result during a flow? I'm trying to put various parameters into a Pydantic BaseModel that we're using as a config object for a task, but the BaseModel is failing the validation because the type is Parameter instead of str, for example.
    j
    Anna Geller
    2 replies
    Copy to Clipboard
  • a

    alex

    2 months ago
    I'm running into an issue where "ghost" tasks/flows are taking up my concurrency limits. I'm using prefect cloud 1.0 and using either kubernetes or local agents for my flows. Some issues I'm running into: • Some of my task concurrency limits have been used up for many weeks in a row, even when I have no running flows at all - and especially not any that have that task tag. I have tried the following but the slots still seem to be used up: ◦ I have stopped all runs in progress using the UI ◦ For my flows deployed on k8s, I have removed all jobs that have been running for >x days (these were "ghost" jobs too as there was no flow that was running that long) ◦ I have restarted my agent • Some cancelled flows are still showing up in the Running tab. They're greyed out and have the "cancelling..." written below them for quite a few days. My question: • How can I identify why my task concurrency slots are being used and how can I clean them up?
    a
    Anna Geller
    7 replies
    Copy to Clipboard
  • Josh

    Josh

    2 months ago
    Is it possible to create a new service account key from the prefect CLI or the API?
    Josh
    Anna Geller
    +1
    5 replies
    Copy to Clipboard
  • Andy Dang

    Andy Dang

    2 months ago
    Hi! I just started to look into Prefect as an alternative to AWS Step Functions. We use Step Functions to orchestrate our Databricks jobs with AWS Lambda - and Prefect supports Databricks (so the main actual logic is in DB - we just need Prefect to orchestrate tasks) However what I can’t figure out is where to run the prefect agent to manage my Databricks jobs? Do I spin up an EC2 instance and kick it off there? Can I run it as a Docker container in my ECS cluster? Doubt that I can run it as a Lambda (though that would be amazing), but it doesn’t look like there’s an easy way to do it. Any recommendations?
    Andy Dang
    Anna Geller
    4 replies
    Copy to Clipboard
  • d

    Divya

    2 months ago
    Hello, I am trying to use Amundsen with Prefect. Is it possible using Prefect if yes are there any working examples? Right now I am trying to schedule an Amundsen code in Prefect but I am getting the error as below. Thank you, Divya
    d
    Anna Geller
    2 replies
    Copy to Clipboard
  • Michael Reynolds

    Michael Reynolds

    2 months ago
    hello, i am hacking away at something and i had a question about prefect 2.0 that i cannot seem to find an answer to.
    Michael Reynolds
    Anna Geller
    7 replies
    Copy to Clipboard
  • Michael Reynolds

    Michael Reynolds

    2 months ago
    23:29:26.313 | ERROR   | Flow run 'mustard-quetzal' - Encountered exception during execution:
    Traceback (most recent call last):
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 521, in orchestrate_flow_run
        flow_call
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in run_sync_in_interruptible_worker_thread
        cancellable=True,
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
        raise exceptions[0]
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
        await coro
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
        func, *args, cancellable=cancellable, limiter=limiter
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
        result = __fn(*args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/goron/main.py", line 35, in run_pipeline
        messages = poll_kafka( conf[ 'kafka' ] ).result()
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 210, in result
        self._result, timeout=timeout, raise_on_failure=raise_on_failure
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 221, in sync
        return run_async_from_worker_thread(__async_fn, *args, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
        return self.__get_result()
      File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
        raise self._exception
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 220, in _result
        return final_state.result(raise_on_failure=raise_on_failure)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/states.py", line 136, in result
        raise data
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/task_runners.py", line 314, in _run_and_store_result
        self._results[run_key] = await run_fn(**run_kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 799, in begin_task_run
        client=client,
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 905, in orchestrate_task_run
        result, serializer="cloudpickle"
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/states.py", line 130, in return_value_to_state
        return Completed(data=DataDocument.encode(serializer, result))
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/data.py", line 42, in encode
        blob = lookup_serializer(encoding).dumps(data, **kwargs)
      File "/home/myapp/.local/lib/python3.7/site-packages/prefect/serializers.py", line 59, in dumps
        data_bytes = cloudpickle.dumps(data)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
        cp.dump(obj)
      File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
        return Pickler.dump(self, obj)
    TypeError: can't pickle cimpl.Message objects
                                                                                                                (venv) michael.reynolds@mac1319 goron %