https://prefect.io logo
v

Varun Joshi

02/23/2021, 7:28 AM
Hey Prefect experts, I'm using prefect to push incremental data from SQL Server to a streaming messaging system which further pushes to Big Query. I have attached tasks to two functions. One which extracts metadata and other which loops through the pushing incremental load function.
Copy code
#!/usr/bin/env python
# coding: utf-8

import pyodbc
from prefect.storage import GCS
import json
import time
import os
import datetime
from google.cloud import pubsub_v1
import pymysql
from prefect import task, Flow, Parameter



@task(log_stdout=True)
def extract_metadata(source_system,source_database_id):
    
    #provides a list of metadata to loop through and extract delta
    return metadata


def delta_push(metadata):
    # This function extract data from every metadata detail provided and pushes it further

    


@task
def delta_push_wrapper(metadata):
#Looping through ever metadata row and calling the push function 
    for metadata_row in metadata:
        delta_push(metadata_row)



with Flow("data_flow") as flow:
    flow.storage = GCS(bucket="bucketname")
    parameter1 = Parameter("paramater1",default="default")
    parameter2 = Parameter("parameter2",default=1)
    metadata = extract_metadata(parameter1,parameter2)
    delta_push_wrapper(metadata)
    

flow.register(project_name="test_project")
I'm getting error at the flow.register(project_name="test_project") line where it says 'TypeError: Cannot serialize socket object'. Any help will be much appreciated 🙂
e

emre

02/23/2021, 8:00 AM
Prefect needs two things to be serializable: 1- Your Flow, since you need to serialize and store it in GCS. This includes the tasks in your flow too. 2- Inputs and outputs of your tasks. When running a flow prefect serializes the task inputs and outputs to safely deliver between workers. I am assuming the problem is the first one, I think the second one can only be encountered at
flow.run()
. You are importing
pyodbc
and
pymysql
. Database connections cannot be serialized, they throw the error you posted. So if you have one of these connections in your tasks
__init__
, that explains it. Try creating connections within your tasks
run()
, But I can't say for sure since you modified your code before posting.
🙌 1
upvote 1
v

Varun Joshi

02/23/2021, 10:57 AM
Hi @emre, thank you for your response. So I am creating one connection outside my task as shown in the code below
Copy code
#!/usr/bin/env python
# coding: utf-8

import pyodbc
from prefect.storage import GCS
import json
import time
import os
import datetime
from google.cloud import pubsub_v1
import pymysql
from prefect import task, Flow, Parameter

metadata_extraction_query = """ """

host=''
username=''
password = ''
database = ''
port = ''
charset = ''
metadata_cnxn = pymysql.connect(host=host, user=username, passwd=password,db=database, port=int(port), charset=charset,autocommit=True, cursorclass=pymysql.cursors.DictCursor)


@task(log_stdout=True)
def extract_metadata(source_system,source_database_id):

    cur = metadata_cnxn.cursor()
    result = cur.execute(metadata_extraction_query).fetchall()

    metadata = process(result)

    
    #provides a list of metadata to loop through and extract delta
    return metadata


def delta_push(metadata):

    server=''
    username=''
    password = ''
    database = ''
    vista_cnxn = pyodbc.connect('DRIVER={};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
    vista_cursor = vista_cnxn.cursor()

    incremental_data = vista_cursor.execute(metadata['query']).fetchall()
    #push incrmental data to messaging system 

    # This function extract data from every metadata detail provided and pushes it further

    #This part will be further provided by


@task
def delta_push_wrapper(metadata):
    for metadata_row in metadata:
        delta_push(metadata_row)



with Flow("data_flow") as flow:
    flow.storage = GCS(bucket="bucketname")
    parameter1 = Parameter("source_system",default="default")
    parameter2 = Parameter("source_database_id",default=1)
    metadata = extract_metadata(parameter1,parameter2)
    delta_push_wrapper(metadata)
    

flow.register(project_name="test_project")
you're saying I should establish the connection inside my task, right?
e

emre

02/23/2021, 11:32 AM
Yeah, currently you store an established connection in your task (
metadata_cnxn
), which cannot be serialized. Instead,
metadata_cnxn
should be established inside your task function. Or every task function that needs to use a connection.
v

Varun Joshi

02/23/2021, 11:33 AM
Okay, understood. I'll try that and let you know
e

emre

02/23/2021, 11:38 AM
Also, I know establishing connections is an expensive thing to do, and connections should be shared as much as possible. But it is what it is, Prefect has to support distributed execution, so everything should be serializable. I think I remember someone using
ResourceManager
to share connecitons, or something similar, but don't take my word for it I haven't tried anything about sharing connections in Prefect. https://docs.prefect.io/api/latest/tasks/resources.html#resourcemanager
v

Varun Joshi

02/23/2021, 11:38 AM
Understood, we're just trying a POC now
Thanks a lot for the this info 🙂
👍 1
v

vish

02/23/2021, 1:47 PM
^yup, the ResourceManager is a great pattern for this. Especially useful if you plan to map your tasks
2 Views