Ryan Connolly
01/19/2020, 4:18 PMSebastian
01/19/2020, 11:55 PMJake Schmidt
01/20/2020, 1:40 PMChris O'Brien
01/20/2020, 10:38 PMemre
01/21/2020, 7:54 AMJoe Schmid
01/21/2020, 4:39 PMBraun Reyes
01/21/2020, 11:21 PMBraun Reyes
01/21/2020, 11:21 PMBraun Reyes
01/21/2020, 11:22 PMChris O'Brien
01/22/2020, 1:55 AMdistributed.core - INFO - Starting established connection
distributed.core - ERROR - add_client() got an unexpected keyword argument 'versions'
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
result = handler(comm, **msg)
TypeError: add_client() got an unexpected keyword argument 'versions'
With dask-scheduler, version 2.2.0
and Prefect, version 0.9.0
. Makes me think a version issue?Braun Reyes
01/23/2020, 6:00 PMitay livni
01/25/2020, 4:39 PMtags
. I am trying to retrieve a pd.DataFrame
from the result of a merge
. Like so:
definitions = get_defs_flow.get_tasks(tags={"final_definitions"})#[1]
definitions_df = get_defs_state.result[definitions].result
The tag
in the Flow
looks like this:
with tags("final_definitions"):
definitions_df = merge(df1, df2)
When df1
is the result of the merge
a list
is returned with merge
as the first item in the list
.
[<Task: Merge>, <Task: GetItem>]
When df2
is the result of the merge
a list
is returned with merge
as the second item in the list
.
[<Task: GetItem>, <Task: Merge>]
How do I explicitly call the merge
result? Thanks
I could loop through the list and get a df
but that would be a coding travesty 🙂Ryan Connolly
01/27/2020, 3:59 PMcomposite_tasks
which let's you treat a handful of tasks as one task in your workflow.
Something similar in prefect
I think could be super useful.
Just wanted to kindly share my thoughts. And I am curious if this is something prefect
is thinking about or if there are any upcoming plans that address this problem?John Ramirez
01/27/2020, 6:55 PMdict.items()
function to create separate branchesitay livni
01/27/2020, 9:46 PMtask
failing and returns a success and sets the final result to a None
type or df
. Can you point me to the specific pattern to copy? ThamksKushagara
01/28/2020, 9:29 AM[2020-01-27 17:45:00,419] ERROR - prefect.TaskRunner | Unexpected error: ApiException()
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 870, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 250, in timeout_handler
return fn(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/tasks.py", line 267, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/tasks/kubernetes/job.py", line 111, in run
api_client.create_namespaced_job(namespace=namespace, body=body, **kube_kwargs)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/batch_v1_api.py", line 60, in create_namespaced_job
(data) = self.create_namespaced_job_with_http_info(namespace, body, **kwargs)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/apis/batch_v1_api.py", line 151, in create_namespaced_job_with_http_info
collection_formats=collection_formats)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 334, in call_api
_return_http_data_only, collection_formats, _preload_content, _request_timeout)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 168, in __call_api
_request_timeout=_request_timeout)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 266, in POST
body=body)
File "/usr/local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 222, in request
raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (409)
Reason: Conflict
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'eba5fdc2-eb25-417a-b51d-1faee8901466', 'Content-Type': 'application/json', 'Date': 'Mon, 27 Jan 2020 17:45:00 GMT', 'Content-Length': '244'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch \"dbt-1580146621.6663644\" already exists","reason":"AlreadyExists","details":{"name":"dbt-1580146621.6663644","group":"batch","kind":"jobs"},"code":409}
Kushagara
01/28/2020, 9:30 AMKushagara
01/28/2020, 1:43 PMfrom prefect import Flow, task
from random import randrange
from prefect.schedules import CronSchedule
def test():
x = randrange(1000)
y = randrange(2000)
print(x, y)
return x
def func():
daily_schedule = CronSchedule("*/1 */1 * * *")
with Flow("My test flow", daily_schedule) as test_flow:
data = test()
print(data)
test_flow.run()
func()
The output I am getting is
367 1629
367
[2020-01-28 13:35:34,589] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:36:00+00:00
[2020-01-28 13:36:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:36:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:36:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:37:00+00:00
[2020-01-28 13:37:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:37:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:37:00,010] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:38:00+00:00
[2020-01-28 13:38:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:38:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:38:00,007] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:38:00,009] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:39:00+00:00
[2020-01-28 13:39:00,003] INFO - prefect.FlowRunner | Beginning Flow run for 'My test flow'
[2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-28 13:39:00,006] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-28 13:39:00,008] INFO - prefect.Flow: My test flow | Waiting for next scheduled run at 2020-01-28T13:40:00+00:00
Is this the expected behaviour? Why isn’t the test function called in each run? How can I achieve it?Aliza Rayman
01/28/2020, 4:11 PMDaskEnvironment
.
I've been getting various errors, including a CancelledError
, distributed.client - WARNING - Couldn't gather 3 keys (refereing to TCP keys)
, and TCP Broken Pipe Error
Has anyone experienced this/ debugged this?Nate Joselson
01/29/2020, 10:03 AM[2020-01-29 09:42:21,363] INFO - prefect.FlowRunner | Beginning Flow run for 'combination_flow'
[2020-01-29 09:42:21,366] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-29 09:42:21,373] INFO - prefect.TaskRunner | Task 'extract': Starting task run...
[2020-01-29 09:42:21,376] INFO - prefect.TaskRunner | Task 'extract': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,376] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-29 09:42:21,377] INFO - prefect.FlowRunner | Beginning Flow run for 'ETL'
[2020-01-29 09:42:21,378] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-29 09:42:21,385] INFO - prefect.TaskRunner | Task 'data': Starting task run...
[2020-01-29 09:42:21,387] INFO - prefect.TaskRunner | Task 'data': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,394] INFO - prefect.TaskRunner | Task 'transform': Starting task run...
[2020-01-29 09:42:21,396] INFO - prefect.TaskRunner | Task 'transform': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,402] INFO - prefect.TaskRunner | Task 'load': Starting task run...
Here's your data: [10, 20, 30]
[2020-01-29 09:42:21,405] INFO - prefect.TaskRunner | Task 'load': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,405] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-29 09:42:21,406] INFO - prefect.FlowRunner | Beginning Flow run for 'Map / Reduce 🤓'
[2020-01-29 09:42:21,408] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-29 09:42:21,414] INFO - prefect.TaskRunner | Task 'data': Starting task run...
[2020-01-29 09:42:21,416] INFO - prefect.TaskRunner | Task 'data': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,422] INFO - prefect.TaskRunner | Task 'map_task': Starting task run...
[2020-01-29 09:42:21,427] INFO - prefect.TaskRunner | Task 'map_task[0]': Starting task run...
[2020-01-29 09:42:21,430] INFO - prefect.TaskRunner | Task 'map_task[0]': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,435] INFO - prefect.TaskRunner | Task 'map_task[1]': Starting task run...
[2020-01-29 09:42:21,437] INFO - prefect.TaskRunner | Task 'map_task[1]': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,442] INFO - prefect.TaskRunner | Task 'map_task[2]': Starting task run...
[2020-01-29 09:42:21,445] INFO - prefect.TaskRunner | Task 'map_task[2]': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,447] INFO - prefect.TaskRunner | Task 'map_task': finished task run for task with final state: 'Mapped'
[2020-01-29 09:42:21,453] INFO - prefect.TaskRunner | Task 'map_task': Starting task run...
[2020-01-29 09:42:21,458] INFO - prefect.TaskRunner | Task 'map_task[0]': Starting task run...
[2020-01-29 09:42:21,461] INFO - prefect.TaskRunner | Task 'map_task[0]': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,465] INFO - prefect.TaskRunner | Task 'map_task[1]': Starting task run...
[2020-01-29 09:42:21,468] INFO - prefect.TaskRunner | Task 'map_task[1]': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,474] INFO - prefect.TaskRunner | Task 'map_task[2]': Starting task run...
[2020-01-29 09:42:21,477] INFO - prefect.TaskRunner | Task 'map_task[2]': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,479] INFO - prefect.TaskRunner | Task 'map_task': finished task run for task with final state: 'Mapped'
[2020-01-29 09:42:21,486] INFO - prefect.TaskRunner | Task 'reduce_task': Starting task run...
[2020-01-29 09:42:21,489] INFO - prefect.TaskRunner | Task 'reduce_task': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,496] INFO - prefect.TaskRunner | Task 'print_task': Starting task run...
Here's your data: 12
[2020-01-29 09:42:21,498] INFO - prefect.TaskRunner | Task 'print_task': finished task run for task with final state: 'Success'
[2020-01-29 09:42:21,499] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Nate Joselson
01/29/2020, 10:03 AMreduce_task()
in the combined flow with
print(mr_flow_state.result[reduction].result)
Then I get the error
Traceback (most recent call last):
File "combine_flow.py", line 14, in <module>
print(mr_flow_state.result[reduction].result)
NameError: name 'reduction' is not defined
This makes sense to me, but is using some kind of cache the answer to getting these results? How do I go about doing that?
Thanks so much!Nate Joselson
01/29/2020, 10:03 AMcache_key
key, though it is mentioned several places as a way of sharing results between flows.
I want to be able to create multiple pipeline flows in separate python files (for readability) and run them all together through a job. As well, however, I want them to be able to share inputs and outputs so that I don't need to re-calculate the same tasks over and over again in the different flows.
I will try to explain an example of what I am trying to do:
First, imagine I have 2 different flows, the map_reduce flow and the ETL flows from the docs. I want them both to start from the same variable, the list [1, 2, 3]
so I have a third flow that I define as just an extract flow.
extract_flow.py
from prefect import task, Flow
@task
def extract():
return [1, 2, 3]
etl_flow.py
from prefect import task, Flow, Parameter
# ETL Flow
@task
def transform(data):
return [i * 10 for i in data]
@task
def load(data):
print("Here's your data: {}".format(data))
with Flow("ETL") as etl_flow:
e = Parameter('data')
print(b)
t = transform(e)
l = load(t)
map_reduce_flow.py
from prefect import task, Flow, Parameter
# Map Reduce Flow
@task
def map_task(x):
return x + 1
@task
def reduce_task(x):
return sum(x)
@task
def print_task(x):
print("Here's your data: {}".format(x))
with Flow("Map / Reduce 🤓") as mr_flow:
numbers = Parameter('data')
first_map = map_task.map(numbers)
second_map = map_task.map(first_map)
reduction = reduce_task(second_map)
printing = print_task(reduction)
From here, I want to combine them into a combination flow that I can run with python combine_flow.py
from prefect import task, Flow
from etl_flow import etl_flow
from map_reduce_flow import mr_flow
from extract import extract
with Flow("combination_flow") as extract_flow:
data= extract()
extract_flow_state = extract_flow.run()
etl_flow_state = etl_flow.run(data=extract_flow_state.result[data].result)
mr_flow_state = mr_flow.run(data=extract_flow_state.result[data].result)
This gives the output (as expected!)Jake Schmidt
01/29/2020, 8:00 PMwilsojb
01/29/2020, 9:24 PMFlow
and a ___main___
that calls flow.run()
.
• These scripts need to get orchestrated. I figured I'd make a master flow that knows how to execute the smaller flows and get that master flow scheduled.Romain
01/30/2020, 11:36 AMGetContainerLogs
and WaitOnContainer
tasks do not execute on the same dask worker, and therefore on the same cluster node than the StartContainer
task. In this case, they won't be able to access the container right? Am I missing something here? How do you handle this?Fred Israel
01/30/2020, 6:50 PMFred Israel
01/30/2020, 6:51 PMFred Israel
01/30/2020, 6:51 PMFred Israel
01/30/2020, 6:53 PMwith Flow("Category Classifier") as flow:
# data
dataset_size = Parameter("dataset_size")
dataset = get_data(dataset_size)
train_validation = split_dataset(dataset)
train, validation = train_validation['train'], train_validation['validation']
#train
preprocessor = init_preprocessor()
model = init_preprocessor()
train_preprocessor(preprocessor, train)
train_preprocessed = preprocess(preprocessor, train)
train_model(model, train_preprocessed, train.results.category_id)
pipeline = create_pipeline(preprocessor, model)
# validation
predictions = predict(pipeline, validation)
scores = evaluate(predictions, validation)
Fred Israel
01/30/2020, 6:53 PMtrain_model(model, train_preprocessed, train.category_id)
I get an error that the task train
has no attribute category_id
I understand that the task is not the actual data being returned by the task, but a task object. However, how can I access such data's attributes? I have used the [''] notation to handle multiple returns at runtime, but this time I would like to get a specific attribute instead of a dict's item