Felix Vemmer
11/24/2020, 8:23 AMSteve Taylor
01/05/2021, 9:21 PMFelix Vemmer
01/15/2021, 8:46 AMThomas Reynaud
02/08/2021, 10:44 AMFelix Vemmer
02/08/2021, 10:46 AMThomas Reynaud
02/08/2021, 10:50 AMFelix Vemmer
02/08/2021, 10:54 AMfrom notion.client import NotionClient
import prefect
import logging
import pandas as pd
class Notion():
def __init__(self, notion_token_v2, loger_type='prefect') -> None:
if loger_type == 'prefect':
self.logger = prefect.context.get("logger")
else:
self.logger = logging
self.notion_token_v2 = notion_token_v2
self.notion_client = self.create_notion_client()
def create_notion_client(self):
client = NotionClient(token_v2=self.notion_token_v2)
<http://self.logger.info|self.logger.info>(
f"Logged in as: {client.current_user.full_name}")
return client
def get_social_media_posts(self, notion_url):
cv = self.notion_client.get_collection_view(notion_url)
filter_params = {"filters": [
{"filter": {"value": {"type": "relative", "value": "today"},
"operator": "date_is"}, "property": "jUaf"},
{"filter": {"value": {"type": "exact", "value": False},
"operator": "checkbox_is"}, "property": "U<;g"}
], "operator": "and"}
result = cv.build_query(filter=filter_params).execute()
if len(result) > 0:
result = result[0]
<http://self.logger.info|self.logger.info>(
f"Post for today: \nTitle: {result.title}\nLinkedIn Post Text: {result.linkedin_post_text}\nTwitter Post Text: {result.twitter_post_text}\nSuggested by: {result.content_found_by[0].full_name}")
else:
<http://self.logger.info|self.logger.info>("Nothing to post for today...")
result = None
return result
rom typing import Any
import pandas as pd
from core_automation.notion_client import Notion
from prefect.utilities.tasks import defaults_from_attrs
from prefect import Task
class ImportNotionLinkedInPosts(Task):
def __init__(
self,
notion_token_v2: str = None,
loger_type: str = None,
df: pd.DataFrame = None,
notion_url: str = None,
**kwargs: Any
):
self.notion_token_v2 = notion_token_v2
self.loger_type = loger_type
self.df = df
self.notion_url = notion_url
super().__init__(**kwargs)
@defaults_from_attrs("notion_token_v2", "loger_type", "df", "notion_url")
def run(
self,
notion_token_v2: str = None,
loger_type: str = None,
df: pd.DataFrame = None,
notion_url: str = None,
):
notion = Notion(
notion_token_v2=notion_token_v2,
loger_type=loger_type
)
notion.export_linkedin_posts(
df=df,
notion_url=notion_url
)
import_notion_linkedin_posts = ImportNotionLinkedInPosts()
with Flow("LinkedIn Flow", run_config=LocalRun(),
schedule=weekly_schedule
) as flow:
......
.....
.....
import_notion_linkedin_posts(
notion_token_v2=notion_token_v2,
loger_type='prefect',
df=liked_post_feed,
notion_url=notion_url
)