Varun Joshi
02/23/2021, 7:28 AM#!/usr/bin/env python
# coding: utf-8
import pyodbc
from prefect.storage import GCS
import json
import time
import os
import datetime
from google.cloud import pubsub_v1
import pymysql
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def extract_metadata(source_system,source_database_id):
#provides a list of metadata to loop through and extract delta
return metadata
def delta_push(metadata):
# This function extract data from every metadata detail provided and pushes it further
@task
def delta_push_wrapper(metadata):
#Looping through ever metadata row and calling the push function
for metadata_row in metadata:
delta_push(metadata_row)
with Flow("data_flow") as flow:
flow.storage = GCS(bucket="bucketname")
parameter1 = Parameter("paramater1",default="default")
parameter2 = Parameter("parameter2",default=1)
metadata = extract_metadata(parameter1,parameter2)
delta_push_wrapper(metadata)
flow.register(project_name="test_project")
I'm getting error at the flow.register(project_name="test_project") line where it says 'TypeError: Cannot serialize socket object'. Any help will be much appreciated 🙂emre
02/23/2021, 8:00 AMflow.run()
.
You are importing pyodbc
and pymysql
. Database connections cannot be serialized, they throw the error you posted. So if you have one of these connections in your tasks __init__
, that explains it. Try creating connections within your tasks run()
, But I can't say for sure since you modified your code before posting.Varun Joshi
02/23/2021, 10:57 AM#!/usr/bin/env python
# coding: utf-8
import pyodbc
from prefect.storage import GCS
import json
import time
import os
import datetime
from google.cloud import pubsub_v1
import pymysql
from prefect import task, Flow, Parameter
metadata_extraction_query = """ """
host=''
username=''
password = ''
database = ''
port = ''
charset = ''
metadata_cnxn = pymysql.connect(host=host, user=username, passwd=password,db=database, port=int(port), charset=charset,autocommit=True, cursorclass=pymysql.cursors.DictCursor)
@task(log_stdout=True)
def extract_metadata(source_system,source_database_id):
cur = metadata_cnxn.cursor()
result = cur.execute(metadata_extraction_query).fetchall()
metadata = process(result)
#provides a list of metadata to loop through and extract delta
return metadata
def delta_push(metadata):
server=''
username=''
password = ''
database = ''
vista_cnxn = pyodbc.connect('DRIVER={};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
vista_cursor = vista_cnxn.cursor()
incremental_data = vista_cursor.execute(metadata['query']).fetchall()
#push incrmental data to messaging system
# This function extract data from every metadata detail provided and pushes it further
#This part will be further provided by
@task
def delta_push_wrapper(metadata):
for metadata_row in metadata:
delta_push(metadata_row)
with Flow("data_flow") as flow:
flow.storage = GCS(bucket="bucketname")
parameter1 = Parameter("source_system",default="default")
parameter2 = Parameter("source_database_id",default=1)
metadata = extract_metadata(parameter1,parameter2)
delta_push_wrapper(metadata)
flow.register(project_name="test_project")
you're saying I should establish the connection inside my task, right?emre
02/23/2021, 11:32 AMmetadata_cnxn
), which cannot be serialized. Instead, metadata_cnxn
should be established inside your task function. Or every task function that needs to use a connection.Varun Joshi
02/23/2021, 11:33 AMemre
02/23/2021, 11:38 AMResourceManager
to share connecitons, or something similar, but don't take my word for it I haven't tried anything about sharing connections in Prefect.
https://docs.prefect.io/api/latest/tasks/resources.html#resourcemanagerVarun Joshi
02/23/2021, 11:38 AMvish
02/23/2021, 1:47 PM