• a

    Andrey Tatarinov

    1 year ago
    Question about caching behaviour: I'm in the process of active development of a certain flow. Flow runs with K8s agent packaged in Docker. There's one task that takes a lot of time. Task is decorated with
    result=GCSResult
    and
    cache_for=timedelta(hours=1)
    . I notice, that when I'm not rebuilding docker image Prefect is respecting cache, i.e. second run goes much faster than first. But it seems that each rebuild of an image invalidates cache. Q: is it true? How can I get more insight on how caching works?
    a
    Kyle Moon-Wright
    8 replies
    Copy to Clipboard
  • Will Milner

    Will Milner

    1 year ago
    Hi, I'm seeming to have trouble to get any stdout when using a docker agent. This should show something in the logs of the flow run correct?
    task = ShellTask(return_all=True, log_stdout=True, log_stderr=True, stream_output=True)
    with Flow("test") as flow:
       print_test = task(command="echo hi")
    I except to see
    hi
    in my prefect logs but I see nothing. I have my logging level set to DEBUG
    Will Milner
    Kyle Moon-Wright
    4 replies
    Copy to Clipboard
  • Levi Leal

    Levi Leal

    1 year ago
    $ kubectl -n agent logs prefect-job-2d655498-sqqlc
    [2020-12-14 20:20:37+0000] INFO - prefect.S3 | Downloading slow-flow/2020-12-07t12-16-09-863598-00-00 from lime-prefect
    Unable to locate credentials
    Traceback (most recent call last):
      File "/usr/local/bin/prefect", line 8, in <module>
        sys.exit(cli())
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in __call__
        return self.main(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
        rv = self.invoke(ctx)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
        return _process_result(sub_ctx.command.invoke(sub_ctx))
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
        return ctx.invoke(self.callback, **ctx.params)
      File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in invoke
        return callback(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 34, in flow_run
        return _execute_flow_run()
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 99, in _execute_flow_run
        raise exc
      File "/usr/local/lib/python3.8/site-packages/prefect/cli/execute.py", line 84, in _execute_flow_run
        flow = storage.get_flow(storage.flows[flow_data.name])
      File "/usr/local/lib/python3.8/site-packages/prefect/environments/storage/s3.py", line 105, in get_flow
        self._boto3_client.download_fileobj(
      File "/usr/local/lib/python3.8/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
        return future.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 106, in result
        return self._coordinator.result()
      File "/usr/local/lib/python3.8/site-packages/s3transfer/futures.py", line 265, in result
        raise self._exception
      File "/usr/local/lib/python3.8/site-packages/s3transfer/tasks.py", line 255, in _main
        self._submit(transfer_future=transfer_future, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/s3transfer/download.py", line 340, in _submit
        response = client.head_object(
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 357, in _api_call
        return self._make_api_call(operation_name, kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 662, in _make_api_call
        http, parsed_response = self._make_request(
      File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 682, in _make_request
        return self._endpoint.make_request(operation_model, request_dict)
      File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 102, in make_request
        return self._send_request(request_dict, operation_model)
      File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 132, in _send_request
        request = self.create_request(request_dict, operation_model)
      File "/usr/local/lib/python3.8/site-packages/botocore/endpoint.py", line 115, in create_request
        self._event_emitter.emit(event_name, request=request,
      File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 356, in emit
        return self._emitter.emit(aliased_event_name, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 228, in emit
        return self._emit(event_name, kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/hooks.py", line 211, in _emit
        response = handler(**kwargs)
      File "/usr/local/lib/python3.8/site-packages/botocore/signers.py", line 90, in handler
        return self.sign(operation_name, request)
      File "/usr/local/lib/python3.8/site-packages/botocore/signers.py", line 162, in sign
        auth.add_auth(request)
      File "/usr/local/lib/python3.8/site-packages/botocore/auth.py", line 357, in add_auth
        raise NoCredentialsError
    botocore.exceptions.NoCredentialsError: Unable to locate credentials
    Anyone who managed to make EKS Fargate work with prefect agents?
    Levi Leal
    Kyle Moon-Wright
    6 replies
    Copy to Clipboard
  • Levi Leal

    Levi Leal

    1 year ago
    I'm using EKS to deploy a prefect server. I had a nodegroup and when I added one agent and it worked fine. I could start flows and the agent were getting them and executing. My flows are hosted on S3. I decided to add the agents to a fargate profile so the k8s nodes are created on demand. I created a FargateProfile that "queries" the namespace 'agent' and when I deploy an agent EKS creates a fargate node for the agent. The fargate agent is able to retrieve flows to be executed and create new fargate nodes with jobs to execute the flow. My problem here is that this new fargate node is unable to connect to S3. This is the log on the job:
  • b

    Berty

    1 year ago
    Hi Prefect(ers?), I stumbled across a piece of documentation I needed last week but I can't seem to locate now. I have a task that returns a tuple, from which I only want to pass in the first item to the task that consumes it. I recall seeing some way of telling prefect how many results to expect. Can someone point me in the right direction? e.g.
    with Flow('...') as f:
       t = task(some_func) # <-- returns tuple
       c = task(consume_func, keyword_tasks={'config': t[0]})
    b
    Michael Adkins
    3 replies
    Copy to Clipboard
  • DJ Erraballi

    DJ Erraballi

    1 year ago
    2020-12-12T16:53:10.075-08:00	[2020-12-13 00:53:10] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}],)
    
    2020-12-12T16:53:10.075-08:00	Traceback (most recent call last):
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
    
    2020-12-12T16:53:10.075-08:00	cache_for=self.task.cache_for,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
    
    2020-12-12T16:53:10.075-08:00	version=version,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
    
    2020-12-12T16:53:10.075-08:00	raise ClientError(result["errors"])
    
    2020-12-12T16:53:10.075-08:00	prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
    
    2020-12-12T16:53:10.075-08:00	ERROR:prefect.CloudTaskRunner:Failed to set task state with error: ClientError([{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}],)
    
    2020-12-12T16:53:10.075-08:00	Traceback (most recent call last):
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/engine/cloud/task_runner.py", line 123, in call_runner_target_handlers
    
    2020-12-12T16:53:10.075-08:00	cache_for=self.task.cache_for,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1104, in set_task_run_state
    
    2020-12-12T16:53:10.075-08:00	version=version,
    
    2020-12-12T16:53:10.075-08:00	File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 226, in graphql
    
    2020-12-12T16:53:10.075-08:00	raise ClientError(result["errors"])
    
    2020-12-12T16:53:10.075-08:00	prefect.utilities.exceptions.ClientError: [{'path': ['set_task_run_states'], 'message': 'Unable to complete operation', 'extensions': {'code': 'API_ERROR'}}]
    
    2020-12-12T16:53:10.087-08:00	[2020-12-13 00:53:10] INFO - prefect.CloudTaskRunner | Task 'aggregate_patients': finished task run for task with final state: 'ClientFailed'
    Ran into this yesterdsay, seemed intermittent but is thsi due to prefect cloud being down
    DJ Erraballi
    1 replies
    Copy to Clipboard
  • jars

    jars

    1 year ago
    Hello, are there plans for Prefect.io to allow the restoration of an archived flow? This doesn't seem to be an option on the UI (please let me know if I'm missing it!) Some context: we deployed a flow into our development environment last week that ran successfully locally, but then failed in the cloud. If this had happened in our production flows, we would have had a broken flow until the issue was successfully debugged locally, and resolved (this could take hours). If we could just have a flow "rollback" feature from the UI, that would be great. An aside, but the issue btw came down to two things: (1) some project dependencies and (2) we found an experimental feature in Docker called gRPC FUSE on mac computers that needed to be turned off.
    jars
    Kyle Moon-Wright
    2 replies
    Copy to Clipboard
  • h

    Hui Zheng

    1 year ago
    Hello, Could someone explain more about
    Version Locking
    is it saying enforcing the assertion that a submitted/triggered flow run would run once and only once?
    <https://docs.prefect.io/orchestration/concepts/flows.html#toggle-version-locking>
    h
    Jim Crist-Harif
    2 replies
    Copy to Clipboard
  • r

    Robert Bastian

    1 year ago
    Hi Prefect Community. I have a question related to the "right" way to do things in Prefect. Here is an example task using Postgres and Secrets. It works but doesn't seem "correct" to me since both PostgresFetch and Secrets are subclasses of Task. Any advice would be appreciated!
    def get_record_count():
        logger = prefect.context.get("logger")
        pg = PostgresFetch(host='<http://hostname.us-east-1.rds.amazonaws.com|hostname.us-east-1.rds.amazonaws.com>', db_name='testdatabase', user='test')
        pg.query="select count(*) from testdatabase.test_table"
        secret = PrefectSecret('TEST_PASSWORD')
        password = secret.run()
        result = pg.run(password=password)
        return result[0]
    r
    Jim Crist-Harif
    4 replies
    Copy to Clipboard
  • Rolf Schick

    Rolf Schick

    1 year ago
    Hi. I’m new to Prefect and try to deploy a GreatExpectations environment into my docker/Kubernetes deployment. Unfortunately the only way I could get files into the container until now was by using the “files” parameter in the “Docker” storage class. To fill my container this way seems a bit odd to me. Is there another way I can just COPY my files into my container environment?
    Rolf Schick
    Jim Crist-Harif
    10 replies
    Copy to Clipboard