feat: complete table population
This commit is contained in:
parent
f7b7cc5144
commit
f7dea32866
5 changed files with 82 additions and 22 deletions
|
@ -8,8 +8,5 @@ class Controller:
|
||||||
try:
|
try:
|
||||||
entries = self.parse_file_service.parse_source_directory()
|
entries = self.parse_file_service.parse_source_directory()
|
||||||
self.sqlite_service.populate_tables(entries)
|
self.sqlite_service.populate_tables(entries)
|
||||||
except Exception as e:
|
|
||||||
raise Exception(e)
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
self.database_service.disconnect()
|
self.database_service.disconnect()
|
||||||
|
|
5
src/models/tag.py
Normal file
5
src/models/tag.py
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
from typing import List, TypedDict
|
||||||
|
|
||||||
|
|
||||||
|
class Tag(TypedDict):
|
||||||
|
name: str
|
|
@ -14,7 +14,7 @@ class ParseFileService:
|
||||||
self.parse_markdown_service = ParseMarkdownService()
|
self.parse_markdown_service = ParseMarkdownService()
|
||||||
|
|
||||||
def __get_title(self, file):
|
def __get_title(self, file):
|
||||||
return os.path.basename(file)
|
return os.path.splitext(os.path.basename(file))[0]
|
||||||
|
|
||||||
def __parse_file(self, file) -> Entry:
|
def __parse_file(self, file) -> Entry:
|
||||||
markdown_data = self.parse_markdown_service.parse(file)
|
markdown_data = self.parse_markdown_service.parse(file)
|
||||||
|
@ -29,7 +29,7 @@ class ParseFileService:
|
||||||
"body": markdown_data.get("body", []),
|
"body": markdown_data.get("body", []),
|
||||||
}
|
}
|
||||||
|
|
||||||
def parse_source_directory(self):
|
def parse_source_directory(self) -> list[Entry]:
|
||||||
print(colored("INFO Indexing entries in source directory", "light_blue"))
|
print(colored("INFO Indexing entries in source directory", "light_blue"))
|
||||||
parsed_entries = []
|
parsed_entries = []
|
||||||
with os.scandir(self.source_directory) as dir:
|
with os.scandir(self.source_directory) as dir:
|
||||||
|
|
|
@ -19,7 +19,10 @@ class ParseMarkdownService:
|
||||||
internal_link = re.findall(link_rgx, line)
|
internal_link = re.findall(link_rgx, line)
|
||||||
if internal_link:
|
if internal_link:
|
||||||
internal_links.append(
|
internal_links.append(
|
||||||
[os.path.basename(link) for link in internal_link]
|
[
|
||||||
|
os.path.splitext(os.path.basename(link))[0]
|
||||||
|
for link in internal_link
|
||||||
|
]
|
||||||
)
|
)
|
||||||
return [item for row in internal_links for item in row]
|
return [item for row in internal_links for item in row]
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
from termcolor import colored
|
||||||
|
|
||||||
|
from models.entry import Entry
|
||||||
from sql.create_tables import tables
|
from sql.create_tables import tables
|
||||||
from src.models.entry import Entry
|
|
||||||
|
|
||||||
|
|
||||||
class SqliteService:
|
class SqliteService:
|
||||||
|
@ -10,31 +12,84 @@ class SqliteService:
|
||||||
self.connection = connection
|
self.connection = connection
|
||||||
self.cursor = connection.cursor()
|
self.cursor = connection.cursor()
|
||||||
|
|
||||||
def __query(self, sql, errorMessage: Optional[str] = None):
|
def __query(self, sql, params=None, errorMessage: Optional[str] = None):
|
||||||
try:
|
try:
|
||||||
self.cursor.execute(sql)
|
if params:
|
||||||
|
self.cursor.execute(sql, params)
|
||||||
|
else:
|
||||||
|
self.cursor.execute(sql)
|
||||||
self.connection.commit()
|
self.connection.commit()
|
||||||
|
|
||||||
except sqlite3.Error as sqliteError:
|
|
||||||
raise Exception(f"ERROR SQLite: {sqliteError}")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if errorMessage:
|
if errorMessage:
|
||||||
raise Exception(f"ERROR {errorMessage}: {e}")
|
raise Exception(f"ERROR {errorMessage}: {e}")
|
||||||
else:
|
raise
|
||||||
raise Exception(f"ERROR Problem with database operation: {e}")
|
|
||||||
|
|
||||||
def create_tables(self):
|
def __create_tables(self):
|
||||||
for table in tables:
|
for table in tables:
|
||||||
self.__query(
|
self.__query(
|
||||||
table["create_statement"], f"Problem creating table {table['name']}"
|
table["create_statement"],
|
||||||
|
errorMessage=f"Problem creating table {table['name']}",
|
||||||
)
|
)
|
||||||
print("INFO Created tables")
|
print(colored("INFO Created tables", "light_blue"))
|
||||||
|
|
||||||
def truncate_tables(self):
|
def __drop_tables(self):
|
||||||
for table in tables:
|
# Reverse the order of `tables` list to avoid foreign key violation when
|
||||||
|
# deleting
|
||||||
|
for table in reversed(tables):
|
||||||
self.__query(
|
self.__query(
|
||||||
f"DELETE FROM {table['name']}",
|
f"DROP TABLE IF EXISTS {table['name']}",
|
||||||
f"Problem truncating table {table['name']}",
|
errorMessage=f"Problem truncating table {table['name']}",
|
||||||
)
|
)
|
||||||
print("INFO Cleared tables")
|
print(colored("INFO Cleared tables", "light_blue"))
|
||||||
|
|
||||||
|
def __entry_exists(self, title) -> bool:
|
||||||
|
self.__query("SELECT 1 FROM entries WHERE title = :title", {"title": title})
|
||||||
|
result = self.cursor.fetchone()
|
||||||
|
return result is not None
|
||||||
|
|
||||||
|
def __populate_base_tables(self, entries: list[Entry]):
|
||||||
|
for entry in entries:
|
||||||
|
self.__query(
|
||||||
|
"INSERT INTO entries (title, last_modified, size, body) VALUES (:title, :last_modified, :size, :body)",
|
||||||
|
entry,
|
||||||
|
errorMessage=f"The following entry could not be added to `entries` table: {entry}",
|
||||||
|
)
|
||||||
|
tags = entry.get("tags")
|
||||||
|
if tags:
|
||||||
|
for tag in tags:
|
||||||
|
self.__query(
|
||||||
|
"INSERT OR IGNORE INTO tags (name) VALUES (:tag_name)",
|
||||||
|
{"tag_name": tag},
|
||||||
|
)
|
||||||
|
|
||||||
|
print(colored("INFO Base tables populated", "light_blue"))
|
||||||
|
|
||||||
|
def __populate_junction_tables(self, entries: list[Entry]):
|
||||||
|
for entry in entries:
|
||||||
|
tags = entry.get("tags")
|
||||||
|
links = entry.get("links")
|
||||||
|
if tags:
|
||||||
|
for tag in tags:
|
||||||
|
self.__query(
|
||||||
|
"INSERT INTO entries_tags (entry_title, tag_name) VALUES (:entry_title, :tag_name)",
|
||||||
|
{"entry_title": entry.get("title"), "tag_name": tag},
|
||||||
|
)
|
||||||
|
if links:
|
||||||
|
for link in links:
|
||||||
|
if self.__entry_exists(link):
|
||||||
|
self.__query(
|
||||||
|
"INSERT OR IGNORE INTO backlinks (source_entry_title, target_entry_title) VALUES (:source_entry_title, :target_entry_title)",
|
||||||
|
{
|
||||||
|
"source_entry_title": entry.get("title"),
|
||||||
|
"target_entry_title": link,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
print(colored("INFO Junction tables populated", "light_blue"))
|
||||||
|
|
||||||
|
def populate_tables(self, entries: list[Entry]):
|
||||||
|
self.__drop_tables()
|
||||||
|
self.__create_tables()
|
||||||
|
self.__populate_base_tables(entries)
|
||||||
|
self.__populate_junction_tables(entries)
|
||||||
|
|
Loading…
Add table
Reference in a new issue