https://prefect.io logo
Docs
Join the conversationJoin Slack
Channels
announcements
ask-marvin
best-practices-coordination-plane
data-ecosystem
data-tricks-and-tips
events
find-a-prefect-job
geo-australia
geo-bay-area
geo-berlin
geo-boston
geo-chicago
geo-colorado
geo-dc
geo-israel
geo-japan
geo-london
geo-nyc
geo-seattle
geo-texas
gratitude
introductions
marvin-in-the-wild
prefect-ai
prefect-aws
prefect-azure
prefect-cloud
prefect-community
prefect-contributors
prefect-dbt
prefect-docker
prefect-gcp
prefect-getting-started
prefect-integrations
prefect-kubernetes
prefect-recipes
prefect-server
prefect-ui
random
show-us-what-you-got
Powered by Linen
prefect-community
  • r

    Ryan Connolly

    01/19/2020, 4:18 PM
    the prefect code above should be identical the example in cylc
    • 1
    • 1
  • s

    Sebastian

    01/19/2020, 11:55 PM
    Bumping my previous question, I was wondering if this would be a good example for flow concurrency (simulating a streaming ETL pipeline)?
    concurrency.py
    c
    • 2
    • 1
  • j

    Jake Schmidt

    01/20/2020, 1:40 PM
    Hello! Wondering if there’s a timeframe for task affinity — would love to be able to specify my model training task on a k8s resource with GPU.
    j
    j
    • 3
    • 8
  • c

    Chris O'Brien

    01/20/2020, 10:38 PM
    Hi All! Really enjoying Prefect and have gotten to the point of deploying a Flow on a CronSchedule into AWS. My question is, is there a way to force a run of the Flow in the event that a deployment goes back meaning we need to rollback? The rollback would mean that we would have passed the trigger time which is why I am asking.
    c
    • 2
    • 3
  • e

    emre

    01/21/2020, 7:54 AM
    Hi everyone! I noticed that when even one of my mapped tasks fail and is scheduled for retry, a lot fo redundant logging is done. Only one task among 50 mapped tasks fail, and upon retry all 50 of these tasks generates starting task run, finished task run logs. Along with upstream and downstream mapped tasks, which also print these logs. Of course this isn’t anything breaking, just mildly annoying. Wondering if anyone has faced this issue, and has anyone come up with a fix?
    j
    • 2
    • 4
  • j

    Joe Schmid

    01/21/2020, 4:39 PM
    Question on Dask work stealing, mapped task distribution, and adaptive mode in the following scenario: • Simple Flow with a single task that maps over items, e.g. 50 items • Long-running Dask cluster in adaptive mode with a minimum of 1 worker always running and work stealing disabled • Kick off a Flow run • One mapped task (i.e. a mapped task for 1 item) begins running on the 1 worker that is immediately available • Other workers begin to start up (takes several minutes since we use k8s cluster autoscaler to request new spot instances in AWS) • Once new workers are available they don't seem to get allocated tasks and we notice that only one worker is busy Do we need to ensure Dask workers are already running and available prior to starting a Flow with a single mapped task? Said another way, does disabling work stealing prevent mapped tasks (that haven't started running) from being allocated to Dask workers that started after the mapped tasks have been submitted to the scheduler?
    c
    • 2
    • 3
  • b

    Braun Reyes

    01/21/2020, 11:21 PM
    Is there a way to implement task max concurrency when using task.map? I have a list of 6 configurations than I want to execute a task against in parallel. I was thinking I could use Dask executor for this...but having trouble finding kwargs to pass in for this. Using dask local cluster would allow me to test this
    c
    • 2
    • 14
  • b

    Braun Reyes

    01/21/2020, 11:21 PM
    Locally
  • b

    Braun Reyes

    01/21/2020, 11:22 PM
    This would be like if I only wanted to run 2 or 3 at a time
  • c

    Chris O'Brien

    01/22/2020, 1:55 AM
    Quick question on the Dask Cluster tutorial: Getting the following error when the flow tries to run
    distributed.core - INFO - Starting established connection
    distributed.core - ERROR - add_client() got an unexpected keyword argument 'versions'
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
        result = handler(comm, **msg)
    TypeError: add_client() got an unexpected keyword argument 'versions'
    With
    dask-scheduler, version 2.2.0
    and
    Prefect, version 0.9.0
    . Makes me think a version issue?
    c
    j
    m
    • 4
    • 12
  • b

    Braun Reyes

    01/23/2020, 6:00 PM
    with Dask is there a good way to determine a reasonable number of workers and number of threads per worker in a containerized world like Fargate and/or K8s? The fargate containers that we use for our flows have .25 cpu. I started with 1 worker and 2 threads per worker on a local dask cluster, but having trouble understanding/finding documentation on how to guide folks to choose proper number of workers and threads as our container sizes increase for single machine local dask cluster. Any feedback would be greatly appreciated.
    c
    d
    • 3
    • 8
  • i

    itay livni

    01/25/2020, 4:39 PM
    Hello again - I have a question regarding
    tags
    . I am trying to retrieve a
    pd.DataFrame
    from the result of a
    merge
    . Like so:
    definitions = get_defs_flow.get_tasks(tags={"final_definitions"})#[1]
    definitions_df = get_defs_state.result[definitions].result
    The
    tag
    in the
    Flow
    looks like this:
    with tags("final_definitions"):
            definitions_df = merge(df1, df2)
    When
    df1
    is the result of the
    merge
    a
    list
    is returned with
    merge
    as the first item in the
    list
    .
    [<Task: Merge>, <Task: GetItem>]
    When
    df2
    is the result of the
    merge
    a
    list
    is returned with
    merge
    as the second item in the
    list
    .
    [<Task: GetItem>, <Task: Merge>]
    How do I explicitly call the
    merge
    result? Thanks I could loop through the list and get a
    df
    but that would be a coding travesty 🙂
    c
    • 2
    • 2
  • r

    Ryan Connolly

    01/27/2020, 3:59 PM
    I find myself frequently wanting to chain some tasks together and parametrize the set of tasks. And then have other tasks depend on the set of tasks. I am currently using the imperative api to add the set of tasks to the flow, and then function that adds it to the flow returns the first and last task, and then I set dependencies on those first and last tasks. It is not a super clean solution, but it works. Dagster - another python workflow manager has a concept of
    composite_tasks
    which let's you treat a handful of tasks as one task in your workflow. Something similar in
    prefect
    I think could be super useful. Just wanted to kindly share my thoughts. And I am curious if this is something
    prefect
    is thinking about or if there are any upcoming plans that address this problem?
    c
    b
    • 3
    • 7
  • j

    John Ramirez

    01/27/2020, 6:55 PM
    Hello - Is there a way to take a generated dict from a task and use the
    dict.items()
    function to create separate branches
    👀 2
    n
    • 2
    • 1
  • i

    itay livni

    01/27/2020, 9:46 PM
    Hi - I am trying to piece together a state handler that handles a
    task
    failing and returns a success and sets the final result to a
    None
    type or
    df
    . Can you point me to the specific pattern to copy? Thamks
    Untitled
    j
    • 2
    • 4
  • k

    Kushagara

    01/28/2020, 9:29 AM
    Hi - I am trying to use kubernetes CreateNamespacedJob from my prefect flow and I want it to run as a cron, It works fine for the first run but on the second run it gives me the following error.
    [2020-01-27 17:45:00,419] ERROR - prefect.TaskRunner | Unexpected error: ApiException()
    Traceback (most recent call last):
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
        new_state = method(self, state, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 870, in get_task_run_state
        self.task.run, timeout=self.task.timeout, **raw_inputs
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 250, in timeout_handler
        return fn(*args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 267, in method
        return run_method(self, *args, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/prefect/tasks/kubernetes/job.py", line 111, in run
        api_client.create_namespaced_job(namespace=namespace, body=body, **kube_kwargs)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/batch_v1_api.py", line 60, in create_namespaced_job
        (data) = self.create_namespaced_job_with_http_info(namespace, body, **kwargs)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/batch_v1_api.py", line 151, in create_namespaced_job_with_http_info
        collection_formats=collection_formats)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 334, in call_api
        _return_http_data_only, collection_formats, _preload_content, _request_timeout)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 168, in __call_api
        _request_timeout=_request_timeout)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
        body=body)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 266, in POST
        body=body)
      File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 222, in request
        raise ApiException(http_resp=r)
    kubernetes.client.rest.ApiException: (409)
    Reason: Conflict
    HTTP response headers: HTTPHeaderDict({'Audit-Id': 'eba5fdc2-eb25-417a-b51d-1faee8901466', 'Content-Type': 'application/json', 'Date': 'Mon, 27 Jan 2020 17:45:00 GMT', 'Content-Length': '244'})
    HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dbt-1580146621.6663644\" already exists","reason":"AlreadyExists","details":{"name":"dbt-1580146621.6663644","group":"batch","kind":"jobs"},"code":409}
  • k

    Kushagara

    01/28/2020, 9:30 AM
    Has anyone experience the same issue before? Any help will be very much appreciated.
    a
    • 2
    • 1
  • k

    Kushagara

    01/28/2020, 1:43 PM
    Hi I am relatively new to Prefect and this might be a very basic question but I need to understand it. How does a function call within a Flow is invoked, I am trying to test a cron schedule which calls a function but the function is never called except for the first run.
    from prefect import Flow, task
    from random import randrange
    from prefect.schedules import CronSchedule
    
    
    def test():
      x = randrange(1000)
      y = randrange(2000)
      print(x, y)
      return x
    
    
    
    def func():
      daily_schedule = CronSchedule("*/1 */1 * * *")
      with Flow("My test flow", daily_schedule) as test_flow:
        data = test()
        print(data)
    
      test_flow.run()
    
    func()
    The output I am getting is 
    367 1629
    367
    [2020-01-28 13:35:34,589] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:36:00+00:00
    [2020-01-28 13:36:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:36:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:37:00+00:00
    [2020-01-28 13:37:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:37:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:38:00+00:00
    [2020-01-28 13:38:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:38:00,006] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:38:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:38:00,009] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:39:00+00:00
    [2020-01-28 13:39:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
    [2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-28 13:39:00,008] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:40:00+00:00
    Is this the expected behaviour? Why isn’t the test function called in each run? How can I achieve it?
    a
    • 2
    • 5
  • a

    Aliza Rayman

    01/28/2020, 4:11 PM
    Hi All! I've been trying to run Prefect using a pretty intense pipeline with about 210,000 tasks in total split into 7 flows which are created and run in a loop. (The maximum number of tasks mapped to 1 task is 1000.) I'm running it via a
    DaskEnvironment
    . I've been getting various errors, including a
    CancelledError
    ,
    distributed.client - WARNING - Couldn't gather 3 keys (refereing to TCP keys)
    , and
    TCP Broken Pipe Error
    Has anyone experienced this/ debugged this?
    j
    • 2
    • 1
  • n

    Nate Joselson

    01/29/2020, 10:03 AM
    Hi All! 👋 Thanks for building such amazing tools! I am truly enjoying using them! My questions have to do with caching and passing results between flows. I have not understood how to use the
    cache_key
    key, though it is mentioned several places as a way of sharing results between flows. I want to be able to create multiple pipeline flows in separate python files (for readability) and run them all together through a job. As well, however, I want them to be able to share inputs and outputs so that I don't need to re-calculate the same tasks over and over again in the different flows. I will try to explain an example of what I am trying to do: First, imagine I have 2 different flows, the map_reduce flow and the ETL flows from the docs. I want them both to start from the same variable, the list
    [1, 2, 3]
    so I have a third flow that I define as just an extract flow.
    extract_flow.py
    from prefect import task, Flow
    
    @task
    def extract():
        return [1, 2, 3]
    etl_flow.py
    from prefect import task, Flow, Parameter
    
    # ETL Flow
    
    @task
    def transform(data):
        return [i * 10 for i in data]
    
    
    @task
    def load(data):
        print("Here's your data: {}".format(data))
    
    
    with Flow("ETL") as etl_flow:
        e = Parameter('data')
        print(b)
        t = transform(e)
        l = load(t)
    map_reduce_flow.py
    from prefect import task, Flow, Parameter
    
    # Map Reduce Flow
    
    @task
    def map_task(x):
        return x + 1
    
    
    @task
    def reduce_task(x):
        return sum(x)
    
    @task
    def print_task(x):
        print("Here's your data: {}".format(x))
    
    with Flow("Map / Reduce 🤓") as mr_flow:
        numbers = Parameter('data')
        first_map = map_task.map(numbers)
        second_map = map_task.map(first_map)
        reduction = reduce_task(second_map)
        printing = print_task(reduction)
    From here, I want to combine them into a combination flow that I can run with
    python combine_flow.py
    from prefect import task, Flow
    from etl_flow import etl_flow
    from map_reduce_flow import mr_flow
    from extract import extract
    
    
    with Flow("combination_flow") as extract_flow:
        data= extract()    
    
    extract_flow_state = extract_flow.run()
    
    etl_flow_state = etl_flow.run(data=extract_flow_state.result[data].result)
    mr_flow_state = mr_flow.run(data=extract_flow_state.result[data].result)
    This gives the output (as expected!)
  • n

    Nate Joselson

    01/29/2020, 10:03 AM
    [2020-01-29 09:42:21,363] INFO - prefect.FlowRunner | Beginning Flow run for 'combination_flow'
    [2020-01-29 09:42:21,366] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-29 09:42:21,373] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
    [2020-01-29 09:42:21,376] INFO - prefect.TaskRunner | Task 'extract': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,376] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-29 09:42:21,377] INFO - prefect.FlowRunner | Beginning Flow run for 'ETL'
    [2020-01-29 09:42:21,378] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-29 09:42:21,385] INFO - prefect.TaskRunner | Task 'data': Starting task run...
    [2020-01-29 09:42:21,387] INFO - prefect.TaskRunner | Task 'data': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,394] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
    [2020-01-29 09:42:21,396] INFO - prefect.TaskRunner | Task 'transform': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,402] INFO - prefect.TaskRunner | Task 'load': Starting task run...
    Here's your data: [10, 20, 30]
    [2020-01-29 09:42:21,405] INFO - prefect.TaskRunner | Task 'load': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,405] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
    [2020-01-29 09:42:21,406] INFO - prefect.FlowRunner | Beginning Flow run for 'Map / Reduce 🤓'
    [2020-01-29 09:42:21,408] INFO - prefect.FlowRunner | Starting flow run.
    [2020-01-29 09:42:21,414] INFO - prefect.TaskRunner | Task 'data': Starting task run...
    [2020-01-29 09:42:21,416] INFO - prefect.TaskRunner | Task 'data': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,422] INFO - prefect.TaskRunner | Task 'map_task': Starting task run...
    [2020-01-29 09:42:21,427] INFO - prefect.TaskRunner | Task 'map_task[0]': Starting task run...
    [2020-01-29 09:42:21,430] INFO - prefect.TaskRunner | Task 'map_task[0]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,435] INFO - prefect.TaskRunner | Task 'map_task[1]': Starting task run...
    [2020-01-29 09:42:21,437] INFO - prefect.TaskRunner | Task 'map_task[1]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,442] INFO - prefect.TaskRunner | Task 'map_task[2]': Starting task run...
    [2020-01-29 09:42:21,445] INFO - prefect.TaskRunner | Task 'map_task[2]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,447] INFO - prefect.TaskRunner | Task 'map_task': finished task run for task with final state: 'Mapped'
    [2020-01-29 09:42:21,453] INFO - prefect.TaskRunner | Task 'map_task': Starting task run...
    [2020-01-29 09:42:21,458] INFO - prefect.TaskRunner | Task 'map_task[0]': Starting task run...
    [2020-01-29 09:42:21,461] INFO - prefect.TaskRunner | Task 'map_task[0]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,465] INFO - prefect.TaskRunner | Task 'map_task[1]': Starting task run...
    [2020-01-29 09:42:21,468] INFO - prefect.TaskRunner | Task 'map_task[1]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,474] INFO - prefect.TaskRunner | Task 'map_task[2]': Starting task run...
    [2020-01-29 09:42:21,477] INFO - prefect.TaskRunner | Task 'map_task[2]': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,479] INFO - prefect.TaskRunner | Task 'map_task': finished task run for task with final state: 'Mapped'
    [2020-01-29 09:42:21,486] INFO - prefect.TaskRunner | Task 'reduce_task': Starting task run...
    [2020-01-29 09:42:21,489] INFO - prefect.TaskRunner | Task 'reduce_task': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,496] INFO - prefect.TaskRunner | Task 'print_task': Starting task run...
    Here's your data: 12
    [2020-01-29 09:42:21,498] INFO - prefect.TaskRunner | Task 'print_task': finished task run for task with final state: 'Success'
    [2020-01-29 09:42:21,499] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
  • n

    Nate Joselson

    01/29/2020, 10:03 AM
    This is already extremely useful, however, what if I now want to pass the results from imported flows on to a new flow? If I try to print the results of the
    reduce_task()
    in the combined flow with
    print(mr_flow_state.result[reduction].result)
    Then I get the error
    Traceback (most recent call last):
      File "combine_flow.py", line 14, in <module>
        print(mr_flow_state.result[reduction].result)
    NameError: name 'reduction' is not defined
    This makes sense to me, but is using some kind of cache the answer to getting these results? How do I go about doing that? Thanks so much!
    e
    c
    • 3
    • 17
  • j

    Jake Schmidt

    01/29/2020, 8:00 PM
    This might not be the proper place to ask, but can anyone help me understand why I’m seeing so much better performance with joblib than prefect/dask? Obviously dask and joblib aren’t exactly apples-to-apples but MAN joblib blows it out of the water in terms of speed for parallel computing on a single machine. A cursory internet search for “joblib vs dask” didn’t yield any satisfactory answer…
    j
    • 2
    • 1
  • w

    wilsojb

    01/29/2020, 9:24 PM
    curious / hopefully quick question: Is it possible to combine flows into a "master flow"? Or several tasks into a "master task"? Asking here because I looked at examples, didn't see anything obvious, so figured I might be getting into anti-pattern territory. My use case is: • I have several scripts that can run independently. Each define a
    Flow
    and a
    ___main___
    that calls
    flow.run()
    . • These scripts need to get orchestrated. I figured I'd make a master flow that knows how to execute the smaller flows and get that master flow scheduled.
    j
    • 2
    • 6
  • r

    Romain

    01/30/2020, 11:36 AM
    Hi folks, I most probably miss something here, but here is an issue I encounter: while running a docker flow (similar to this one: https://docs.prefect.io/core/examples/functional_docker.html) on a Kubernetes distributed dask cluster, there is a chance that the
    GetContainerLogs
    and
    WaitOnContainer
    tasks do not execute on the same dask worker, and therefore on the same cluster node than the
    StartContainer
    task. In this case, they won't be able to access the container right? Am I missing something here? How do you handle this?
    j
    • 2
    • 1
  • f

    Fred Israel

    01/30/2020, 6:50 PM
    Hi there, I am trying to implement a small proof of concept project using Prefect
    n
    • 2
    • 10
  • f

    Fred Israel

    01/30/2020, 6:51 PM
    I've stumbled upon a problem that probably means I still need to become better used to the concepts
  • f

    Fred Israel

    01/30/2020, 6:51 PM
    So I would like to know what is the best way to deal with this
  • f

    Fred Israel

    01/30/2020, 6:53 PM
    with Flow("Category Classifier") as flow:
        # data
        dataset_size = Parameter("dataset_size")
        dataset = get_data(dataset_size)
        train_validation = split_dataset(dataset)
        train, validation = train_validation['train'], train_validation['validation']
    
        #train
        preprocessor = init_preprocessor()
        model = init_preprocessor()
        train_preprocessor(preprocessor, train)
        train_preprocessed = preprocess(preprocessor, train)
        train_model(model, train_preprocessed, train.results.category_id)
        pipeline = create_pipeline(preprocessor, model)
    
        # validation
        predictions = predict(pipeline, validation)
        scores = evaluate(predictions, validation)
  • f

    Fred Israel

    01/30/2020, 6:53 PM
    I have created a simple flow to train a ML model. The problem is on line
    train_model(model, train_preprocessed, train.category_id)
    I get an error that the task
    train
    has no attribute
    category_id
    I understand that the task is not the actual data being returned by the task, but a task object. However, how can I access such data's attributes? I have used the [''] notation to handle multiple returns at runtime, but this time I would like to get a specific attribute instead of a dict's item
    c
    • 2
    • 1
Powered by Linen
Title
f

Fred Israel

01/30/2020, 6:53 PM
I have created a simple flow to train a ML model. The problem is on line
train_model(model, train_preprocessed, train.category_id)
I get an error that the task
train
has no attribute
category_id
I understand that the task is not the actual data being returned by the task, but a task object. However, how can I access such data's attributes? I have used the [''] notation to handle multiple returns at runtime, but this time I would like to get a specific attribute instead of a dict's item
c

Chris White

01/30/2020, 7:05 PM
Hi @Fred Israel - whenever you call a task with another task as the argument, that specifies a data dependency in Prefect; so in this case, running
train_model(model, train_preprocessed, train)
will pass whatever data is returned from all three tasks
model
,
train_preprocessed
and
train
as arguments into the
train_model
task’s run method. So, within
train_model
you can access
train.category_id
at runtime
View count: 1