• j

    John Ramirez

    2 years ago
    Hello - Is there a way to take a generated dict from a task and use the
    dict.items()
    function to create separate branches
    j
    1 replies
    Copy to Clipboard
  • i

    itay livni

    2 years ago
    Hi - I am trying to piece together a state handler that handles a
    task
    failing and returns a success and sets the final result to a
    None
    type or
    df
    . Can you point me to the specific pattern to copy? Thamks
    i
    j
    4 replies
    Copy to Clipboard
  • k

    Kushagara

    2 years ago
    Hi - I am trying to use kubernetes CreateNamespacedJob from my prefect flow and I want it to run as a cron, It works fine for the first run but on the second run it gives me the following error.
    [2020-01-27 17:45:00,419] ERROR - prefect.TaskRunner | Unexpected error: ApiException()
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 870, in get_task_run_state
        self.task.run, timeout=self.task.timeout, **raw_inputs
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 250, in timeout_handler
        return fn(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 267, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/tasks/kubernetes/job.py", line 111, in run
        api_client.create_namespaced_job(namespace=namespace, body=body, **kube_kwargs)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/batch_v1_api.py", line 60, in create_namespaced_job
        (data) = self.create_namespaced_job_with_http_info(namespace, body, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/batch_v1_api.py", line 151, in create_namespaced_job_with_http_info
        collection_formats=collection_formats)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 334, in call_api
        _return_http_data_only, collection_formats, _preload_content, _request_timeout)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 168, in __call_api
        _request_timeout=_request_timeout)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
        body=body)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 266, in POST
        body=body)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 222, in request
        raise ApiException(http_resp=r)
    kubernetes.client.rest.ApiException: (409)
    Reason: Conflict
    HTTP response headers: HTTPHeaderDict({'Audit-Id': 'eba5fdc2-eb25-417a-b51d-1faee8901466', 'Content-Type': 'application/json', 'Date': 'Mon, 27 Jan 2020 17:45:00 GMT', 'Content-Length': '244'})
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dbt-1580146621.6663644\" already exists","reason":"AlreadyExists","details":{"name":"dbt-1580146621.6663644","group":"batch","kind":"jobs"},"code":409}
  • k

    Kushagara

    2 years ago
    Has anyone experience the same issue before? Any help will be very much appreciated.
    k
    1 replies
    Copy to Clipboard
  • k

    Kushagara

    2 years ago
    Hi I am relatively new to Prefect and this might be a very basic question but I need to understand it. How does a function call within a Flow is invoked, I am trying to test a cron schedule which calls a function but the function is never called except for the first run.
    from prefect import Flow, task
    from random import randrange
    from prefect.schedules import CronSchedule
    
    
    def test():
      x = randrange(1000)
      y = randrange(2000)
      print(x, y)
      return x
    
    
    
    def func():
      daily_schedule = CronSchedule("*/1 */1 * * *")
      with Flow("My test flow", daily_schedule) as test_flow:
        data = test()
        print(data)
    
      test_flow.run()
    
    func()
    The output I am getting is 
    367 1629
    367
    [2020-01-28 13:35:34,589] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:36:00+00:00
    [2020-01-28 13:36:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:36:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:37:00+00:00
    [2020-01-28 13:37:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:37:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:38:00+00:00
    [2020-01-28 13:38:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:38:00,006] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:38:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:38:00,009] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:39:00+00:00
    [2020-01-28 13:39:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:39:00,008] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:40:00+00:00
    Is this the expected behaviour? Why isn’t the test function called in each run? How can I achieve it?
    k
    Alex Goodman
    5 replies
    Copy to Clipboard
  • Aliza Rayman

    Aliza Rayman

    2 years ago
    Hi All! I've been trying to run Prefect using a pretty intense pipeline with about 210,000 tasks in total split into 7 flows which are created and run in a loop. (The maximum number of tasks mapped to 1 task is 1000.) I'm running it via a
    DaskEnvironment
    . I've been getting various errors, including a
    CancelledError
    ,
    distributed.client - WARNING - Couldn't gather 3 keys (refereing to TCP keys)
    , and
    TCP Broken Pipe Error
    Has anyone experienced this/ debugged this?
    Aliza Rayman
    1 replies
    Copy to Clipboard
  • n

    Nate Joselson

    2 years ago
    This is already extremely useful, however, what if I now want to pass the results from imported flows on to a new flow? If I try to print the results of the
    reduce_task()
    in the combined flow with
    print(mr_flow_state.result[reduction].result)
    Then I get the error
    Traceback (most recent call last):
      File "combine_flow.py", line 14, in <module>
        print(mr_flow_state.result[reduction].result)
    NameError: name 'reduction' is not defined
    This makes sense to me, but is using some kind of cache the answer to getting these results? How do I go about doing that? Thanks so much!
    n
    emre
    17 replies
    Copy to Clipboard
  • n

    Nate Joselson

    2 years ago
    [2020-01-29 09:42:21,363] INFO - prefect.FlowRunner | Beginning Flow run for 'combination_flow'
    [2020-01-29 09:42:21,366] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-29 09:42:21,373] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
    [2020-01-29 09:42:21,376] INFO - prefect.TaskRunner | Task 'extract': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,376] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-29 09:42:21,377] INFO - prefect.FlowRunner | Beginning Flow run for 'ETL'
    [2020-01-29 09:42:21,378] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-29 09:42:21,385] INFO - prefect.TaskRunner | Task 'data': Starting task run...
    [2020-01-29 09:42:21,387] INFO - prefect.TaskRunner | Task 'data': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,394] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
    [2020-01-29 09:42:21,396] INFO - prefect.TaskRunner | Task 'transform': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,402] INFO - prefect.TaskRunner | Task 'load': Starting task run...
    Here's your data: [10, 20, 30]
    [2020-01-29 09:42:21,405] INFO - prefect.TaskRunner | Task 'load': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,405] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-29 09:42:21,406] INFO - prefect.FlowRunner | Beginning Flow run for 'Map / Reduce 🤓'
    [2020-01-29 09:42:21,408] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-29 09:42:21,414] INFO - prefect.TaskRunner | Task 'data': Starting task run...
    [2020-01-29 09:42:21,416] INFO - prefect.TaskRunner | Task 'data': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,422] INFO - prefect.TaskRunner | Task 'map_task': Starting task run...
    [2020-01-29 09:42:21,427] INFO - prefect.TaskRunner | Task 'map_task[0]': Starting task run...
    [2020-01-29 09:42:21,430] INFO - prefect.TaskRunner | Task 'map_task[0]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,435] INFO - prefect.TaskRunner | Task 'map_task[1]': Starting task run...
    [2020-01-29 09:42:21,437] INFO - prefect.TaskRunner | Task 'map_task[1]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,442] INFO - prefect.TaskRunner | Task 'map_task[2]': Starting task run...
    [2020-01-29 09:42:21,445] INFO - prefect.TaskRunner | Task 'map_task[2]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,447] INFO - prefect.TaskRunner | Task 'map_task': finished task run for task with final state: 'Mapped'
    [2020-01-29 09:42:21,453] INFO - prefect.TaskRunner | Task 'map_task': Starting task run...
    [2020-01-29 09:42:21,458] INFO - prefect.TaskRunner | Task 'map_task[0]': Starting task run...
    [2020-01-29 09:42:21,461] INFO - prefect.TaskRunner | Task 'map_task[0]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,465] INFO - prefect.TaskRunner | Task 'map_task[1]': Starting task run...
    [2020-01-29 09:42:21,468] INFO - prefect.TaskRunner | Task 'map_task[1]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,474] INFO - prefect.TaskRunner | Task 'map_task[2]': Starting task run...
    [2020-01-29 09:42:21,477] INFO - prefect.TaskRunner | Task 'map_task[2]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,479] INFO - prefect.TaskRunner | Task 'map_task': finished task run for task with final state: 'Mapped'
    [2020-01-29 09:42:21,486] INFO - prefect.TaskRunner | Task 'reduce_task': Starting task run...
    [2020-01-29 09:42:21,489] INFO - prefect.TaskRunner | Task 'reduce_task': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,496] INFO - prefect.TaskRunner | Task 'print_task': Starting task run...
    Here's your data: 12
    [2020-01-29 09:42:21,498] INFO - prefect.TaskRunner | Task 'print_task': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,499] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
  • n

    Nate Joselson

    2 years ago
    Hi All! 👋 Thanks for building such amazing tools! I am truly enjoying using them! My questions have to do with caching and passing results between flows. I have not understood how to use the
    cache_key
    key, though it is mentioned several places as a way of sharing results between flows. I want to be able to create multiple pipeline flows in separate python files (for readability) and run them all together through a job. As well, however, I want them to be able to share inputs and outputs so that I don't need to re-calculate the same tasks over and over again in the different flows. I will try to explain an example of what I am trying to do: First, imagine I have 2 different flows, the map_reduce flow and the ETL flows from the docs. I want them both to start from the same variable, the list
    [1, 2, 3]
    so I have a third flow that I define as just an extract flow.
    extract_flow.py
    from prefect import task, Flow
    
    @task
    def extract():
        return [1, 2, 3]
    etl_flow.py
    from prefect import task, Flow, Parameter
    
    # ETL Flow
    
    @task
    def transform(data):
        return [i * 10 for i in data]
    
    
    @task
    def load(data):
        print("Here's your data: {}".format(data))
    
    
    with Flow("ETL") as etl_flow:
        e = Parameter('data')
        print(b)
        t = transform(e)
        l = load(t)
    map_reduce_flow.py
    from prefect import task, Flow, Parameter
    
    # Map Reduce Flow
    
    @task
    def map_task(x):
        return x + 1
    
    
    @task
    def reduce_task(x):
        return sum(x)
    
    @task
    def print_task(x):
        print("Here's your data: {}".format(x))
    
    with Flow("Map / Reduce 🤓") as mr_flow:
        numbers = Parameter('data')
        first_map = map_task.map(numbers)
        second_map = map_task.map(first_map)
        reduction = reduce_task(second_map)
        printing = print_task(reduction)
    From here, I want to combine them into a combination flow that I can run with
    python combine_flow.py
    from prefect import task, Flow
    from etl_flow import etl_flow
    from map_reduce_flow import mr_flow
    from extract import extract
    
    
    with Flow("combination_flow") as extract_flow:
        data= extract()    
    
    extract_flow_state = extract_flow.run()
    
    etl_flow_state = etl_flow.run(data=extract_flow_state.result[data].result)
    mr_flow_state = mr_flow.run(data=extract_flow_state.result[data].result)
    This gives the output (as expected!)
  • Jake Schmidt

    Jake Schmidt

    2 years ago
    This might not be the proper place to ask, but can anyone help me understand why I’m seeing so much better performance with joblib than prefect/dask? Obviously dask and joblib aren’t exactly apples-to-apples but MAN joblib blows it out of the water in terms of speed for parallel computing on a single machine. A cursory internet search for “joblib vs dask” didn’t yield any satisfactory answer…
    Jake Schmidt
    1 replies
    Copy to Clipboard