Chris White
Chris Hart
07/31/2019, 7:03 PMChris White
Chris Hart
07/31/2019, 7:04 PMChris Hart
07/31/2019, 7:04 PMChris Hart
07/31/2019, 7:04 PMChris Hart
07/31/2019, 7:09 PMChris Hart
07/31/2019, 7:11 PMos
to get env vars and depending on those sometimes writing mock data to the filesystem (although that’s not in the codepath when I get the threadlock pickling error)Chris White
Chris Hart
07/31/2019, 7:13 PMChris Hart
07/31/2019, 7:14 PMChris Hart
07/31/2019, 7:22 PMcloudpickle.dumps(task_return_value_dict)
seems to work fine for both tasks in my flowChris Hart
07/31/2019, 7:22 PMChris Hart
07/31/2019, 7:38 PMdumps()
fail if it wasn’t pickleable?Chris Hart
07/31/2019, 7:39 PMFalse
, but that pickles fine tooChris White
Chris Hart
07/31/2019, 7:40 PMChris White
Chris Hart
07/31/2019, 7:40 PMChris Hart
07/31/2019, 7:41 PMChris White
Chris Hart
07/31/2019, 7:41 PMChris Hart
07/31/2019, 7:46 PMChris Hart
07/31/2019, 7:46 PMcloudpickle.dumps(index_result)
TypeError: Cannot serialize socket object
Chris Hart
07/31/2019, 7:47 PMChris White
Chris Hart
07/31/2019, 7:48 PMFalse
but inside it does some yielding and I guess sockets, maybe I can make it safe somehowChris White
Chris Hart
07/31/2019, 7:49 PMChris Hart
07/31/2019, 7:50 PMChris Hart
07/31/2019, 7:51 PMfrom elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
def wrap_docs(index_name, docs):
for doc in docs:
yield {"_index": index_name, "_id": doc["id"], "doc": doc}
for key in docs.keys():
bulk(es, wrap_docs(index_name=key, docs=docs[key]))
Chris Hart
07/31/2019, 7:54 PMChris Hart
07/31/2019, 7:54 PMChris Hart
07/31/2019, 7:56 PMChris Hart
07/31/2019, 8:02 PMFatal Python error: Cannot recover from stack overflow.
Chris Hart
07/31/2019, 8:05 PMChris Hart
07/31/2019, 10:25 PMrequests
module.. the entire flow and all tasks and return values are pickleable.. and yet Dask still crashes with a stack overflow, but that’s not on Prefect 😉Chris White
Marvin
08/05/2019, 9:55 PMChris Hart
08/05/2019, 10:01 PMChris Hart
08/05/2019, 10:02 PMChris Hart
08/05/2019, 10:03 PMChris White
Chris Hart
08/05/2019, 10:06 PMChris White
Chris Hart
08/07/2019, 3:08 PMChris Hart
08/07/2019, 3:09 PMChris White
Chris White
Chris Hart
08/07/2019, 3:41 PMChris White
Chris Hart
08/07/2019, 3:43 PMChris White
Chris Hart
08/07/2019, 3:45 PMLocalExecutor
that if there is a future plan to go parallel, better not get complacent and start testing for (de)serialization early.. could also be done automatically if DEBUG=True or something to catch those issues earlyChris Hart
08/07/2019, 3:45 PMChris White
Chris Hart
08/07/2019, 3:47 PMChris Hart
08/07/2019, 3:48 PMChris White
Chris Hart
08/07/2019, 3:50 PMChris White
Chris Hart
08/07/2019, 4:07 PMChris Hart
08/07/2019, 4:08 PMChris Hart
08/07/2019, 4:12 PMChris White
Chris Hart
08/07/2019, 4:35 PMChris White
Chris White
Chris Hart
08/07/2019, 4:40 PMChris Hart
08/07/2019, 4:40 PMChris Hart
08/07/2019, 4:40 PMChris Hart
08/07/2019, 4:40 PMChris Hart
08/07/2019, 4:42 PMChris White
Chris White
Chris Hart
08/07/2019, 4:42 PMChris Hart
08/07/2019, 4:43 PMChris Hart
08/07/2019, 4:43 PMChris White
Chris Hart
08/07/2019, 4:51 PMfrom sgqlc.operation import Operation
from ..open_states_schema import open_states_schema
@task()
def query():
schema_query = open_states_schema.Query
op = Operation(schema_query)
# <calls to methods provided by "op" to build the query>
endpoint = HTTPEndpoint(
"<https://openstates.org/graphql>",
base_headers={"X-API-KEY": "my_cool_key"},
)
gql_query = op.__to_graphql__(auto_select_depth=4) # converts from class to flat graphql query body text
response = endpoint(gql_query)
return response
Chris Hart
08/07/2019, 4:52 PMChris Hart
08/07/2019, 4:53 PMChris White
endpoint
should just return a basic python dictionary…Chris Hart
08/07/2019, 4:55 PMsgqlc/types/__init__.py", line 657
, and other dunder method magic which is just a corner case that cloudpickle doesn’t supportChris White
Chris Hart
08/07/2019, 4:56 PMendpoint()
does return a dictChris White
Chris Hart
08/07/2019, 4:56 PMChris Hart
08/07/2019, 4:57 PMChris White