Chris Hart
07/31/2019, 6:32 PMLocalExecutor
which was working fine, to the DaskExecutor
example from this page in the docs: https://docs.prefect.io/guide/tutorials/dask-cluster.html
after making sure the scheduler and workers are running, when I try the flow, I get this error before it exits:
[2019-07-31 18:29:25,853] INFO - prefect.FlowRunner | Starting flow run.
distributed.protocol.pickle - INFO - Failed to serialize <bound method FlowRunner.run_task of <FlowRunner: open_states_to_elasticsearch>>. Exception: can't pickle _thread.lock objects
[2019-07-31 18:29:25,901] INFO - prefect.FlowRunner | Unexpected error: TypeError("can't pickle _thread.lock objects")
[2019-07-31 18:29:25,902] ERROR - prefect.Flow | Unexpected error occured in FlowRunner: TypeError("can't pickle _thread.lock objects")
Chris Hart
07/31/2019, 6:33 PMChris Hart
07/31/2019, 6:41 PMChris Hart
07/31/2019, 6:42 PMdask==2.1.0
distributed==2.1.0
prefect==0.6.0
Chris White
Brian McFeeley
07/31/2019, 9:30 PM2019-07-31T21:26:08.029Z [dask-cluster-worker b47e4b0c1a70]: distributed.worker - INFO - Stopping worker at <tcp://elb-trialspark-19870.aptible.in:45604>
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fb4fcd2d950>, <Future finished exception=TypeError("'NoneType' object is not subscriptable")>)
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: Traceback (most recent call last):
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 758, in _run_callback
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: ret = callback()
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: File "/usr/local/lib/python3.7/site-packages/tornado/stack_context.py", line 300, in null_wrapper
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: return fn(*args, **kwargs)
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: File "/usr/local/lib/python3.7/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: future.result()
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: yielded = self.gen.send(value)
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: File "/usr/local/lib/python3.7/site-packages/distributed/worker.py", line 796, in heartbeat
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: if response["status"] == "missing":
2019-07-31T21:26:08.031Z [dask-cluster-worker b47e4b0c1a70]: TypeError: 'NoneType' object is not subscriptable
2019-07-31T21:26:08.036Z [dask-cluster-worker b47e4b0c1a70]: distributed.nanny - INFO - Closing Nanny at '<tcp://172.17.0.67:43533>'
2019-07-31T21:26:08.037Z [dask-cluster-worker b47e4b0c1a70]: distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
This causes us to rerun a large portion of the previously completed tasks whose results were not persisted.
I still strongly suspect the issue lies in our deployment environment -- Aptible, our PaaS, routinely kills and restarts containers that meet or exceed their memory limits, for example. I've reached out to them to get some logs to see if they're restarting these containers, but if this problem is at all familiar let me know if you have a workaround.Brian McFeeley
07/31/2019, 9:31 PMBrian McFeeley
07/31/2019, 9:45 PMChris Hart
07/31/2019, 11:09 PMAlex Cano
08/01/2019, 6:42 PMBrian McFeeley
08/01/2019, 7:27 PMitems = get_list(...)
t1 = transformation.map(items)
t2 = transformation2.map(t1)
...
say we expect due to some data noise that there's a low but nonzero failure chance of one of the steps late in the series of transformations, and only for certain subsets of the data.
from a business logic perspective, the thing we'd like to happen is: every success happens as it happens, and individual task failures don't torpedo the whole pipeline, we just do some failure handling (maybe quarantining the input data in a separate bucket in s3 for analysis & notifying the data engineers, for example). The run is still a "success" in business terms if we get a large majority/plurality of tasks completed, but we do want to act on the failures by either creating new test cases or finding better filters for dirty data.
i've seen docs on state transition handling, but i'm wondering how to use that in the context of a larger flow to both 1. get a batch-wide view of the incident frequency of error transitions and 2. not tank the whole pipeline as failed, unless maybe we exceed some configurable threshold of error rate.
Does this run counter to y'alls programming model?Andrew Fulton
08/07/2019, 10:43 PMAlex Kravetz
08/08/2019, 1:22 PMSnowflakeQuery
added back upstream? (I'm assuming to tasks/database?).Brett Naul
08/09/2019, 2:09 AMrich
08/09/2019, 7:36 PMdbt
? I'd like to use prefect to control dbt and other workflows.
https://github.com/fishtown-analytics/dbtrich
08/09/2019, 7:37 PMAlex Kravetz
08/10/2019, 10:17 PMHenry H
08/11/2019, 5:53 PMJoe Schmid
08/14/2019, 6:04 PMKJ
08/15/2019, 7:46 PMJames Watt
08/19/2019, 4:03 AMJames Watt
08/19/2019, 6:03 AMAkshay Verma
08/19/2019, 1:04 PM'FunctionTask' object is not iterable
error. Can anyone point me out, what it is?James Watt
08/20/2019, 3:03 AMAkshay Verma
08/20/2019, 9:50 AM# Flow #########################
@task
def generate_context(wfr_df, lvnr, tol):
print(wfr_df, lvnr, tol)
return wfr_df, tol
with Flow('TDBB') as flow:
wfr_df = Parameter("wfr_df")
modeldef = Parameter("modeldef")
tol = Parameter("tol")
lvnr = Parameter("lvnr")
context_level = generate_context(wfr_df, lvnr, tol)
context_level_ml = context_level[0]
context_level_ci = context_level[1]
print(context_level_ml)
print(context_level_ci)
flow.run(parameters=dict(
wfr_df=mls,
modeldef=("tx", "ty", "mwx", "mwy", "rwx", "rwy", "mx", "my", "rx", "ry"),
tol=1e6,
lvnr=3,
))
I am getting the following error :
Flow.run received the following unexpected parameters: modeldef
when I check self.parameters()
from flow.py
I have the following response:
{<Parameter: tol>, <Parameter: lvnr>, <Parameter: wfr_df>}
Can anyone point out what I am doing wrong here?Jerry Thomas
08/22/2019, 4:49 AMpython
@task
def convert(data, value):
data['x'] *= value
return data
data = [{"x": x, "y": x % 5, "z": x % 3} for x in range(10)]
with Flow("convert-using-value") as flow:
res = convert.map(data, [2 for i in range(10)])
However, it there a better way to say mark the value parameter to be shared across multiple executions for a call.mooncake4132
08/22/2019, 5:35 AMAkashB
08/22/2019, 6:04 AMJeremiah
Jason Damiani
08/22/2019, 3:42 PM@task
decorator when mapping:
import culvert.tasks as ct
with Flow("some flow") as f:
<some other task>
copy_batches_local = ct.copy_batch_local.map(
src_conn_str=unmapped(src_conn_str),
batch_range=create_batches,
src_table=unmapped(src_table),
split_by=unmapped(split_by),
data_path=unmapped(data_path),
task_args={"max_retries": 3},
)
TypeError: got an unexpected keyword argument 'task_args'
Also tried:
copy_batches_local = ct.copy_batch_local(task_args={"max_retries": 3}).map(
src_conn_str=unmapped(src_conn_str),
batch_range=create_batches,
src_table=unmapped(src_table),
split_by=unmapped(split_by),
data_path=unmapped(data_path),
)
TypeError: missing a required argument: 'src_conn_str'
Is there a way to partially bind my ct.copy_batch_local task?