https://prefect.io logo
Title
c

Chris White

07/31/2019, 6:58 PM
What objects are your tasks returning? This looks to me like you are returning an object which has a reference to a thread lock, which isn’t allowable because thread locks are not serializable
c

Chris Hart

07/31/2019, 7:03 PM
hmm, not to my knowledge, I’m not doing any multiprocessing in the app
c

Chris White

07/31/2019, 7:04 PM
Are you using any client-type objects? For example, this error occurs if you return any reference to google objects because google python clients use a bunch of multithreading
c

Chris Hart

07/31/2019, 7:04 PM
just returning normal data, but when debugging I only see a couple threads
hmm
checking..
yeah ok only returning regular lists or dictionaries from tasks.. although there are api calls being done inside the tasks by a graphql client and the elasticsearch client
but I’m also importing
os
to get env vars and depending on those sometimes writing mock data to the filesystem (although that’s not in the codepath when I get the threadlock pickling error)
c

Chris White

07/31/2019, 7:12 PM
Any chance you could share your code? Alternatively you could add cloudpickle.dumps(object) right before you return from each task for each return object, and I’d bet that one of your return values has something weird in it
c

Chris Hart

07/31/2019, 7:13 PM
ahh ok thanks
lemme try that
okay so
cloudpickle.dumps(task_return_value_dict)
seems to work fine for both tasks in my flow
well, it has a big long pickled string
is there some way to validate or would
dumps()
fail if it wasn’t pickleable?
the only task return value that isn’t a list or object is one that is
False
, but that pickles fine too
c

Chris White

07/31/2019, 7:39 PM
Dumps would fail if it wasn’t pickelable
c

Chris Hart

07/31/2019, 7:40 PM
ok
c

Chris White

07/31/2019, 7:40 PM
Are the tasks themselves pickleable?
c

Chris Hart

07/31/2019, 7:40 PM
oh no idea, there is probably some threading happening behind the scenes within the tasks
going to try with the functions
c

Chris White

07/31/2019, 7:41 PM
Could you try to pickle the tasks? Just use the dumps method on the task object, or better yet the entire flow object
c

Chris Hart

07/31/2019, 7:41 PM
k thanks trying
aha!
cloudpickle.dumps(index_result)
TypeError: Cannot serialize socket object
that is the final task that does bulk indexing to elasticsearch
c

Chris White

07/31/2019, 7:47 PM
💥
c

Chris Hart

07/31/2019, 7:48 PM
even though the task just returns
False
but inside it does some yielding and I guess sockets, maybe I can make it safe somehow
c

Chris White

07/31/2019, 7:49 PM
Yea maybe try explicitly closing the sockets or something? Hard to say without the code but I’m glad we pinpointed the cause!
c

Chris Hart

07/31/2019, 7:49 PM
yes thanks!
I can share the salient code inside that task, which would theoretically affect all other users of the ES client who do bulk indexing
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

def wrap_docs(index_name, docs):
    for doc in docs:
        yield {"_index": index_name, "_id": doc["id"], "doc": doc}

for key in docs.keys():
    bulk(es, wrap_docs(index_name=key, docs=docs[key]))
gonna look for a workaround
ah ok so moving the es client inside the task made it pickleable, but then running on Dask blew up with
Fatal Python error: Cannot recover from stack overflow.
can probably just flatten it out and do the bulk index call the normal way without their helper module
oook completely removed any trace of the ES client, converted to only use plain old
requests
module.. the entire flow and all tasks and return values are pickleable.. and yet Dask still crashes with a stack overflow, but that’s not on Prefect 😉
c

Chris White

08/05/2019, 9:55 PM
@Marvin archive “Issue in running elastic search with dask executor”
c

Chris Hart

08/05/2019, 10:01 PM
fwiw after I converted it to plain driver and opening up a new client inside the task (rather than sharing), it did become pickleable.. and then the explosion ended up coming from a different place (which was harder to track down and had to do with unpickling a recursive function)
but it’s true you can’t use the elasticsearch-dsl bulk helper
dreaming of a future that uses Ray or general asyncio for parallelizing where it seems dask precludes a bunch of normal practises
c

Chris White

08/05/2019, 10:05 PM
Ray actually uses cloudpickle as well so I don’t think you’ll find different behavior there
c

Chris Hart

08/05/2019, 10:06 PM
ah ok cool thanks for the info!
c

Chris White

08/05/2019, 10:06 PM
anytime!
c

Chris Hart

08/07/2019, 3:08 PM
did a quick review of how airflow handles parallelization and it seems Celery is a first-class citizen, and Celery defaults to a more limited JSON serializer just for the task return values.. https://docs.celeryproject.org/en/latest/userguide/calling.html#serializers
so that’s on my radar to look into, if the task code itself is not (de)serializable, and it would be fine to just pass results around.. it would work for our use case… but not sure how the execution itself is parallelized when using json serialization (e.g. maybe can’t be done on remote machines, but that would be ok for a certain scale)
c

Chris White

08/07/2019, 3:26 PM
yea I see what you mean -> however, note that your serialization problem sounds like it came from using a shared / global elastic search object, so I think your only option here for distributed compute (regardless of output serialization scheme) is to refactor the task code itself
that being said, I’m super interested in implementing both a Celery and Ray executor but haven’t had the time to dig deep enough yet
c

Chris Hart

08/07/2019, 3:41 PM
oh sure, but actually overcame the ES thing by refactoring.. the last blocker that got uncovered later was this one https://github.com/profusion/sgqlc/issues/58
c

Chris White

08/07/2019, 3:42 PM
ah interesting interesting
c

Chris Hart

08/07/2019, 3:43 PM
so using a non-pickle-based parallelization technique would facilitate more rapid prototyping.. now I’m aware that it’s advisable to include serialization of all the things as part of test suites.. since it’s tough to start using 3rd party libs and building out the logic and then later finding out that it’s not compatible with dask
c

Chris White

08/07/2019, 3:44 PM
what other non-pickle serialization schemes are you thinking of?
c

Chris Hart

08/07/2019, 3:45 PM
could be a “dragons be here” notice in the docs under
LocalExecutor
that if there is a future plan to go parallel, better not get complacent and start testing for (de)serialization early.. could also be done automatically if DEBUG=True or something to catch those issues early
oh the only other technique I’m aware of now would be Celery with JSON serializer, since IIRC Ray also uses pickle
c

Chris White

08/07/2019, 3:45 PM
ahh i see what you mean
c

Chris Hart

08/07/2019, 3:47 PM
for us right now it’s just going to spin up one-off local/sequential flows with scraping parameters, which can be done in parallel processes that know nothing of one another
feels like missing out on the holy grail of everything prefect-managed but not a major roadblock
c

Chris White

08/07/2019, 3:49 PM
is Prefect Cloud an option for you? only asking because managing multiple workflows can get tricky very fast, especially if you need visibility into the state of the system / ad-hoc runs / etc.
c

Chris Hart

08/07/2019, 3:50 PM
oh quite possibly but if it relies on pickling, our scraper won’t work
c

Chris White

08/07/2019, 3:50 PM
i will admit I’m still not 100% clear on the pickling issue
c

Chris Hart

08/07/2019, 4:07 PM
it’s ok it’s in a 3rd party lib we are using in a task
the issue is unpickling.. pickling works fine
in fact there’s one other potential workaround I can think of but haven’t tired yet (besides fixing that unpickling “bug” in the lib): extracting the offending code from any task/flow, and just passing in the static data results as a param (it’s just building a query from a big fat class hierarchy, and it only happens once for the whole flow, so doesn’t need to be parallelized like the rest)
c

Chris White

08/07/2019, 4:24 PM
and are you actually returning these objects which can’t be unpickled from your tasks? or are you instantiating them outside of your task code?
c

Chris Hart

08/07/2019, 4:35 PM
currently these objects are not returned by the task, the thing that generated them is just executed within the task
c

Chris White

08/07/2019, 4:36 PM
interesting interesting; I’m surprised to hear you’re seeing a pickling error then — are there any global state objects that the tasks are relying on?
for example, we have many tasks which create Google Clients for interacting with GCP APIs, and Google Clients are not pickleable either. However, this doesn’t prevent the tasks from running correctly in a distributed setting because the clients are only created during the task’s run method, and JSON payloads are the only thing returned
c

Chris Hart

08/07/2019, 4:40 PM
not that I can tell.. the module that has the task definition in it imports this python schema object from another module, outside the task, and then inside the task it loads it and starts building a query from it
right
the issue is not pickling
it’s unpickling at runtime, results in a RecursionError
AFAICT both the schema object and the function that builds a query from it have no state
c

Chris White

08/07/2019, 4:42 PM
right; I guess what I’m trying to determine is why the object was pickled to begin with --> the only way I can imagine is if it is instantiated outside of a task and then used within the task
thanks for being patient with me! haha
c

Chris Hart

08/07/2019, 4:42 PM
it translates to graphql query body text
ahh, ok, I don’t know why the schema or the query builder function that takes it would need to be pickled, except that they seem to be when the task itself gets pickled
all good it really stumped me too
c

Chris White

08/07/2019, 4:43 PM
yea, if they are created outside of the task’s run method then that would result in them being pickled
c

Chris Hart

08/07/2019, 4:51 PM
ok so here’s a super distilled snippet of the task:
from sgqlc.operation import Operation
from ..open_states_schema import open_states_schema

@task()
def query():
    schema_query = open_states_schema.Query
    op = Operation(schema_query)

    # <calls to methods provided by "op" to build the query>

    endpoint = HTTPEndpoint(
         "<https://openstates.org/graphql>",
         base_headers={"X-API-KEY": "my_cool_key"},
     )
     gql_query = op.__to_graphql__(auto_select_depth=4) # converts from class to flat graphql query body text

    response = endpoint(gql_query)
    return response
it’s possible that the HTTPEndpoint.endpoint() method thing has state, but a quick scan of their code looks all synchronous to me
(and AFAICT there was nothing wrong with doing http calls, which seemed to work as expected in the later Elasticsearch task after factoring out its stateful helpers and dropping down to direct calls for that)
c

Chris White

08/07/2019, 4:55 PM
interesting, yea I see what you mean - calling
endpoint
should just return a basic python dictionary…
c

Chris Hart

08/07/2019, 4:55 PM
again, pickling of this whole task works.. it’s just that unpickling RecursionError at runtime where it blows up.. probably because there’s a recursive call in getattrs at
sgqlc/types/__init__.py", line 657
, and other dunder method magic which is just a corner case that cloudpickle doesn’t support
c

Chris White

08/07/2019, 4:55 PM
yea
c

Chris Hart

08/07/2019, 4:56 PM
yep calling
endpoint()
does return a dict
c

Chris White

08/07/2019, 4:56 PM
could you move the imports inside the task?
c

Chris Hart

08/07/2019, 4:56 PM
hmm yeah I can try that
will let you know if that pans out.. also going to try just doing all the query building outside prefect and passing in the resulting dicts as params since it doesn’t technically need to be in the flow, alghough it would be cool
c

Chris White

08/07/2019, 4:58 PM
👍 👍 and yea I agree, the “best practice” here would be to include it in the flow
🙌 1