Hey guys, I’m currently struggling with integratin...
# ask-community
j
Hey guys, I’m currently struggling with integrating datadog into my flow. I want to have a datadog tracer throughout my entire flow, but when I try to pass around I’m encountering a
thread.lock
issue. Is there anything I could do about this? I also tried ResourceManager to see if I could avoid it, but that also is not working.
k
If you have a task that returns a DataDog class, try turning checkpointing off for that task so that it does not serialize that class by default.
j
Ahh I see, and just to confirm, if I pass that datadog instance around my flow, there wouldn’t be deserialization issues, right? As a piece of context I’m using the LocalDaskExecutor.
On a relevant note, if I wanted to one day move to DaskExecutor, I wouldn’t be able to use the DataDog class being passed around with the
thread.lock
since it can’t be serialized to other machines? Sorry if this is a stupid question, just haven’t been able to find anything on this matter
Also thanks for the suggestion of turning off checkpointing. I’m doing that as we speak.
k
Yes you are right and this is true independent of Prefect that stuff is transferred to workers for Dask that they are serialized with
cloudpickle
For local dask, it might work with threads, but maybe not processes. Not 100% sure. It might work for processes also.
For DaskExecutor, definitely not
j
Darn, so since I’m using the localdaskexecutor I can’t pass the DD instance around even if checkpointing is turned off?
k
For LocalDaskExecutor I am positive it will work with either threads or processes but I am not sure which. I swear someone got something working for a boto3 client which is similar
Please report back when you try, I wanna know haha
j
hahah, alrighty! I will keep you posted
I set the checkpoint to false for the task returning the DD instance, but the remaining tasks remain as pending, and it’s been over 15 mins.
k
I am not sure that is related. I suppose we just got here because it was erroring out before. What is your RunConfig and Executor right now?
j
It was running normally when I wasn’t returning the datadog context.
Copy code
FLOW_CONFIG = {
    'name': 'execrisk-stg',
    'storage': Docker(
        registry_url='<http://xxx.xxx.aws.com|xxx.xxx.aws.com>',
        image_name='image-name',
        image_tag='flow-stg-latest',
        dockerfile='Dockerfile'
    ),
    'executor': LocalDaskExecutor(
        scheduler='processes',
        num_workers=4
    ),
    'run_config': ECSRun(
        env={
            'PREFECT__CLOUD__HEARTBEAT_MODE': 'thread'
        },
        task_definition_path='task-definitions/staging/flow-task-definition.json',
        run_task_kwargs={
            'networkConfiguration': {
                }
            }
        }
    ),
    'result': CloudPicklePrefectResult()
}
the result is just a pickler that uses cloudpickle instead of pickle/json serializer.
k
Could you try with the default
LocalExector()
?
j
Sure, may I ask why do you think this would help?
k
If this works, then it seems there is something with multiprocessing and we would need to instantiate the DataDog class inside the tasks. I would consider making your own
@task
decorator so that you don’t need always type it out. I can give a sample of that if it goes there.
j
gotcha okay!
k
LocalExecution with the checkpint turned off should definitely work
j
It gave me this issue now 😞
Copy code
Unexpected error: TypeError("cannot pickle '_thread.lock' object")
k
that’s weird. How did you turn off checkpointing?
j
@prefect.task(checkpoint=False)
as a decorator on a functio
k
Can you try:
Copy code
@task(checkpoint=True, result=None)
just do be explicit?
j
sure thing!
Just to confirm, this doesn’t mean that my function’s return will be ignored right?
Also thank you sooo much for helping me out with this, I’ve been at this for three days, and still not much luck
k
Oh sorry I meant
checkpoint=False
The function will still definitely return
And of course!
j
no worries, just pushed up the new change
I think it works now… I see another bug but I think it’s on my end
thank you soooooo much, will keep you posted if I manage to make it work e2e
k
I still believe it should work on either threads or processes on Local Dask if you try