Chris White
07/31/2019, 6:58 PMChris Hart
07/31/2019, 7:03 PMChris White
07/31/2019, 7:04 PMChris Hart
07/31/2019, 7:04 PMos
to get env vars and depending on those sometimes writing mock data to the filesystem (although that’s not in the codepath when I get the threadlock pickling error)Chris White
07/31/2019, 7:12 PMChris Hart
07/31/2019, 7:13 PMcloudpickle.dumps(task_return_value_dict)
seems to work fine for both tasks in my flowdumps()
fail if it wasn’t pickleable?False
, but that pickles fine tooChris White
07/31/2019, 7:39 PMChris Hart
07/31/2019, 7:40 PMChris White
07/31/2019, 7:40 PMChris Hart
07/31/2019, 7:40 PMChris White
07/31/2019, 7:41 PMChris Hart
07/31/2019, 7:41 PMcloudpickle.dumps(index_result)
TypeError: Cannot serialize socket object
Chris White
07/31/2019, 7:47 PMChris Hart
07/31/2019, 7:48 PMFalse
but inside it does some yielding and I guess sockets, maybe I can make it safe somehowChris White
07/31/2019, 7:49 PMChris Hart
07/31/2019, 7:49 PMfrom elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
def wrap_docs(index_name, docs):
for doc in docs:
yield {"_index": index_name, "_id": doc["id"], "doc": doc}
for key in docs.keys():
bulk(es, wrap_docs(index_name=key, docs=docs[key]))
Fatal Python error: Cannot recover from stack overflow.
requests
module.. the entire flow and all tasks and return values are pickleable.. and yet Dask still crashes with a stack overflow, but that’s not on Prefect 😉Chris White
08/05/2019, 9:55 PMMarvin
08/05/2019, 9:55 PMChris Hart
08/05/2019, 10:01 PMChris White
08/05/2019, 10:05 PMChris Hart
08/05/2019, 10:06 PMChris White
08/05/2019, 10:06 PMChris Hart
08/07/2019, 3:08 PMChris White
08/07/2019, 3:26 PMChris Hart
08/07/2019, 3:41 PMChris White
08/07/2019, 3:42 PMChris Hart
08/07/2019, 3:43 PMChris White
08/07/2019, 3:44 PMChris Hart
08/07/2019, 3:45 PMLocalExecutor
that if there is a future plan to go parallel, better not get complacent and start testing for (de)serialization early.. could also be done automatically if DEBUG=True or something to catch those issues earlyChris White
08/07/2019, 3:45 PMChris Hart
08/07/2019, 3:47 PMChris White
08/07/2019, 3:49 PMChris Hart
08/07/2019, 3:50 PMChris White
08/07/2019, 3:50 PMChris Hart
08/07/2019, 4:07 PMChris White
08/07/2019, 4:24 PMChris Hart
08/07/2019, 4:35 PMChris White
08/07/2019, 4:36 PMChris Hart
08/07/2019, 4:40 PMChris White
08/07/2019, 4:42 PMChris Hart
08/07/2019, 4:42 PMChris White
08/07/2019, 4:43 PMChris Hart
08/07/2019, 4:51 PMfrom sgqlc.operation import Operation
from ..open_states_schema import open_states_schema
@task()
def query():
schema_query = open_states_schema.Query
op = Operation(schema_query)
# <calls to methods provided by "op" to build the query>
endpoint = HTTPEndpoint(
"<https://openstates.org/graphql>",
base_headers={"X-API-KEY": "my_cool_key"},
)
gql_query = op.__to_graphql__(auto_select_depth=4) # converts from class to flat graphql query body text
response = endpoint(gql_query)
return response
Chris White
08/07/2019, 4:55 PMendpoint
should just return a basic python dictionary…Chris Hart
08/07/2019, 4:55 PMsgqlc/types/__init__.py", line 657
, and other dunder method magic which is just a corner case that cloudpickle doesn’t supportChris White
08/07/2019, 4:55 PMChris Hart
08/07/2019, 4:56 PMendpoint()
does return a dictChris White
08/07/2019, 4:56 PMChris Hart
08/07/2019, 4:56 PMChris White
08/07/2019, 4:58 PM