• h

    hakki cankaya

    2 years ago
    Thanks @Zachary Hughes. Say if the host executor fails in mid execution or there is a server power outage. How can it survive and be resilient?
    h
    Zachary Hughes
    +1
    14 replies
    Copy to Clipboard
  • d

    Dan DiPasquo

    2 years ago
    I have Flows that are seemingly being restarted in the middle of a long-running task, trying to track down why. Abbreviated log looks like below, whereas at 19:39 it's explicitly killed and rescheduled by Cloud, prior that at 19:22 there's a seemingly spontaneous and unexplained second "Beginning Flow run" --- suggestions on how to narrow down the cause of this?
    TIMESTAMP                         LEVEL    MESSAGE
    2020-05-19T18:58:29.732839+00:00  INFO     Submitted for execution: Job prefect-job-35b1423b
    2020-05-19T19:04:28.107917+00:00  INFO     Beginning Flow run for 'compute_**_flow'
    2020-05-19T19:04:28.240831+00:00  INFO     Starting flow run.
    2020-05-19T19:04:28.241095+00:00  DEBUG    Flow 'compute_**_flow': Handling state change from Scheduled to Running
    2020-05-19T19:05:12.954233+00:00  INFO     Task 'compute_**_task': Starting task run...
    2020-05-19T19:05:12.95458+00:00   DEBUG    Task 'compute_**_task': Handling state change from Pending to Running
    2020-05-19T19:05:13.210047+00:00  DEBUG    Task 'compute_**_task': Calling task.run() method...
    2020-05-19T19:22:07.863766+00:00  INFO     Beginning Flow run for 'compute_**_flow'
    2020-05-19T19:22:08.577243+00:00  INFO     Task 'compute_**_task': Starting task run...
    2020-05-19T19:22:08.578027+00:00  DEBUG    Task 'compute_**_task': task is already running.
    2020-05-19T19:22:08.59477+00:00   INFO     Task 'compute_**_task': finished task run for task with final state: 'Running'
    2020-05-19T19:24:34.702197+00:00  ERROR    Marked "Failed" by a Zombie Killer process.
    2020-05-19T19:39:33.646426+00:00  INFO     Rescheduled by a Lazarus process. This is attempt 1.
    2020-05-19T19:39:56.331103+00:00  INFO     Submitted for execution: Job prefect-job-21a433ec
    2020-05-19T19:42:33.754737+00:00  INFO     Beginning Flow run for 'compute_**_flow'
    2020-05-19T19:42:33.869824+00:00  INFO     Starting flow run.
    ...
    d
    Zachary Hughes
    4 replies
    Copy to Clipboard
  • m

    Marwan Sarieddine

    2 years ago
    Hi again everyone, I am trying to replicate the [Dask Cluster on Kubernetes deployment recipe](https://docs.prefect.io/orchestration/recipes/k8s_dask.html) and I face the following error
    AttributeError: 'FunctionTask' object has no attribute 'result'
    Wondering if you have encountered this before - One thing to note is if the flow is empty then it runs successfully … I am using prefect version
    0.10.7
    19 May 2020,07:027 	agent	INFO	Submitted for execution: Job prefect-job-d3fc6dc5
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	INFO	Beginning Flow run for 'Static Dask Cluster'
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	INFO	Starting flow run.
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	DEBUG	Flow 'Static Dask Cluster': Handling state change from Scheduled to Running
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	ERROR	Unexpected error: AttributeError("'FunctionTask' object has no attribute 'result'")
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 465, in get_flow_run_state
        for t in final_tasks
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/executors/dask.py", line 196, in wait
        return self.client.gather(futures)
      File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1931, in gather
        asynchronous=asynchronous,
      File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 780, in sync
        self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 347, in sync
        raise exc.with_traceback(tb)
      File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 331, in f
        result[0] = yield future
      File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
        value = future.result()
      File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1790, in _gather
        raise exception.with_traceback(traceback)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/flow_runner.py", line 569, in run_task
        default_handler = task.result_handler or self.flow.result_handler
    AttributeError: 'FunctionTask' object has no attribute 'result'
    19 May 2020,07:02:12 	prefect.CloudFlowRunner	DEBUG	Flow 'Static Dask Cluster': Handling state change from Running to Failed
    m
    Chris White
    5 replies
    Copy to Clipboard
  • j

    Jacques Jamieson

    2 years ago
    can Prefect be used for building a processing pipeline that pulls data from various REST api's and then perform analysis on that data. can it also support this in a multi-tenant architecture?
    j
    nicholas
    10 replies
    Copy to Clipboard
  • a

    Azuma

    2 years ago
    Hello everyone! I'm new to the prefect, can someone tell me or give an example where I can set url_endpoint if I want to be able to access the prefect as http: // public_dns_name/prefect on my VM EC2 on AWS?
    a
    nicholas
    3 replies
    Copy to Clipboard
  • s

    Sandeep Aggarwal

    2 years ago
    Is there a way to pass on parameters/task outputs from current task to another task when using FlowRunTask? I get below error when trying to do the same:
    TypeError: Object of type 'Parameter' is not JSON serializable
    Below is a sample snippet:
    with Flow("sample flow") as sample_flow:
        param = Parameter("task_param")
    
        FlowRunTask(flow_name="next flow", parameters={"task_param": param})()
    s
    nicholas
    12 replies
    Copy to Clipboard
  • q

    Questionnaire

    2 years ago
    Hello folks, I'm trying to register my flow with UI, I ran the
    docker-compose
    and
    docker
    files added these lines:
    flow.run_agent()
    c = Client()
    c.create_flow_run()
    but facing this error:
    q
    nicholas
    +1
    58 replies
    Copy to Clipboard
  • q

    Questionnaire

    2 years ago
    Traceback (most recent call last):
      File "main.py", line 315, in <module>
        flow.register()
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1429, in register
        no_url=no_url,
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/client/client.py", line 619, in register
        serialized_flow = flow.serialize(build=build)  # type: Any
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1279, in serialize
        self.storage.add_flow(self)
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/environments/storage/local.py", line 100, in add_flow
        flow_location = flow.save(flow_location)
      File "/home/tanmay/.local/lib/python3.6/site-packages/prefect/core/flow.py", line 1346, in save
        cloudpickle.dump(self, f)
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 1067, in dump
        CloudPickler(file, protocol=protocol).dump(obj)
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 477, in dump
        return Pickler.dump(self, obj)
      File "/usr/lib/python3.6/pickle.py", line 409, in dump
        self.save(obj)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 876, in save_set
        save(item)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 544, in save_function
        return self.save_function_tuple(obj)
      File "/home/tanmay/.local/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 756, in save_function_tuple
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 521, in save
        self.save_reduce(obj=obj, *rv)
      File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
        save(state)
      File "/usr/lib/python3.6/pickle.py", line 476, in save
        f(self, obj) # Call unbound method with explicit self
      File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
        self._batch_setitems(obj.items())
      File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
        save(v)
      File "/usr/lib/python3.6/pickle.py", line 496, in save
        rv = reduce(self.proto)
    TypeError: can't pickle _thread.lock objects
  • Darragh

    Darragh

    2 years ago
    Back with another bunch of stupid! Today’s questions… • I build my flow into a Docker storage, and register it with the Prefect Server I have running on AWS - what’s the simplest execution model I can use to get it running? I’m assuming if I’ve built it locally then the registry I push it to needs to be accessible by both the builder [local machine] and the server? • Am I right in assuming I want a RemoteEnvironment with LocalExecutor? And does the execution agent need to be a Docker agent? • The agent running alongside my server is crashing out because it’s trying to connect to http://localhost:4200 rather than the IP of the AWS EC2 instance it’s running on - is there an environment variable override for this?
    [2020-05-18 17:52:01,285] ERROR - agent | HTTPConnectionPool(host=‘localhost’, port=4200): Max retries exceeded with url: /graphql/alpha (Caused by NewConnectionError(‘<urllib3.connection.HTTPConnection object at 0x7f52470ae0f0>: Failed to establish a new connection: [Errno 111] Connection refused’,))
    Thanks in advance 🙂
    Darragh
    Tyler Wanner
    +1
    20 replies
    Copy to Clipboard
  • Mark McDonald

    Mark McDonald

    2 years ago
    Is there a way to add all states to a cloud hook via the graphql api? Right now, I'm having to provide a list of all possible states and would like to not have to maintain this if states get introduced/deprecated in the future. so instead of this
    create_cloud_hook = f"""
    mutation {{
      create_cloud_hook(input: {{
        type: WEBHOOK,
        name: "{cloud_hook_name}",
        version_group_id: "{version_group_id}",
        states: ["Cached", "Cancelled", "Failed", "Finished", "Looped", "Mapped", "Paused", "Pending", "Queued", "Resume", "Retrying", "Running", "Scheduled", "Skipped", "Submitted", "Success", "Timedout", "Triggerfailed"],
        config: {{
          url: "{web_hook_url}"
        }}
      }}) {{
        id
      }}
    }}
    """
    something like this:
    create_cloud_hook = f"""
    mutation {{
      create_cloud_hook(input: {{
        type: WEBHOOK,
        name: "{cloud_hook_name}",
        version_group_id: "{version_group_id}",
        states: ["All"],
        config: {{
          url: "{web_hook_url}"
        }}
      }}) {{
        id
      }}
    }}
    """
    Mark McDonald
    nicholas
    +2
    9 replies
    Copy to Clipboard