feat: add graph service

This commit is contained in:
thomasabishop 2024-12-29 15:13:49 +00:00
parent 428f1c435f
commit 7c199adba9
13 changed files with 7755 additions and 37 deletions

Binary file not shown.

7618
out/eolas-graph.json Normal file

File diff suppressed because it is too large Load diff

View file

@ -3,6 +3,7 @@ import argparse
from constants import EOLAS_DIRECTORY from constants import EOLAS_DIRECTORY
from controllers.controller import Controller from controllers.controller import Controller
from services.database_service import DatabaseService from services.database_service import DatabaseService
from services.graph_service import GraphService
from services.parse_file_service import ParseFileService from services.parse_file_service import ParseFileService
from services.table_service import TableService from services.table_service import TableService
@ -10,7 +11,10 @@ database_service = DatabaseService("eolas")
database_connection = database_service.connect() database_connection = database_service.connect()
table_service = TableService(database_connection) table_service = TableService(database_connection)
parse_file_service = ParseFileService(EOLAS_DIRECTORY) parse_file_service = ParseFileService(EOLAS_DIRECTORY)
controller = Controller(database_service, table_service, parse_file_service) graph_service = GraphService(database_connection)
controller = Controller(
database_service, table_service, parse_file_service, graph_service
)
def main(): def main():
@ -18,13 +22,18 @@ def main():
prog="eolas-db", description="Eolas database manager." prog="eolas-db", description="Eolas database manager."
) )
parser.add_argument( parser.add_argument(
"command", choices=["populate-database"], help="Command to execute" "command",
choices=["populate-database", "generate-graph"],
help="Command to execute",
) )
args = parser.parse_args() args = parser.parse_args()
if args.command == "populate-database": if args.command == "populate-database":
controller.populate_database() controller.populate_database()
if args.command == "generate-graph":
controller.generate_graph()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -1 +1,2 @@
EOLAS_DIRECTORY = "/home/thomas/repos/eolas/zk" EOLAS_DIRECTORY = "/home/thomas/repos/eolas/zk"
GRAPH_OUTPUT_DIRECTORY = "/home/thomas/repos/eolas-db/out"

View file

@ -1,12 +1,30 @@
from termcolor import colored
class Controller: class Controller:
def __init__(self, database_service, table_service, parse_file_service): def __init__(
self, database_service, table_service, parse_file_service, graph_service
):
self.database_service = database_service self.database_service = database_service
self.table_service = table_service self.table_service = table_service
self.parse_file_service = parse_file_service self.parse_file_service = parse_file_service
self.graph_service = graph_service
def populate_database(self): def populate_database(self):
try: try:
entries = self.parse_file_service.parse_source_directory() entries = self.parse_file_service.parse_source_directory()
self.table_service.populate_tables(entries) self.table_service.populate_tables(entries)
print(colored("SUCCESS Database populated", "green"))
except Exception as e:
raise Exception(colored(f"ERROR {e}", "red"))
finally: finally:
self.database_service.disconnect() self.database_service.disconnect()
print(colored("INFO Database connection closed", "blue"))
def generate_graph(self):
try:
self.graph_service.generate_graph()
print(colored("SUCCESS Graph generated", "green"))
except Exception as e:
raise Exception(colored(f"ERROR {e}"), "red")
finally:
self.database_service.disconnect()
print(colored("INFO Database connection closed", "blue"))

View file

@ -1,7 +1,7 @@
from typing import List, TypedDict from typing import List, TypedDict
class Entry(TypedDict): class IEntry(TypedDict):
title: str title: str
last_modified: str last_modified: str
size: int size: int

7
src/models/graph_edge.py Normal file
View file

@ -0,0 +1,7 @@
from typing import TypedDict
class IGraphEdge(TypedDict):
source: str
target: str

7
src/models/graph_node.py Normal file
View file

@ -0,0 +1,7 @@
from typing import TypedDict
class IGraphNode(TypedDict):
id: str
type: str

View file

@ -18,10 +18,10 @@ class DatabaseService:
try: try:
if not os.path.exists(self.db_path): if not os.path.exists(self.db_path):
os.makedirs(self.db_path) os.makedirs(self.db_path)
print(colored("INFO Created database directory", "light_blue")) print(colored("INFO Created database directory", "blue"))
self.connection = sqlite3.connect(f"{self.db_path}/{self.db_name}.db") self.connection = sqlite3.connect(f"{self.db_path}/{self.db_name}.db")
self.connection.execute("PRAGMA foreign_keys = ON") self.connection.execute("PRAGMA foreign_keys = ON")
print(colored("INFO Database connection established", "light_blue")) print(colored("INFO Database connection established", "blue"))
return self.connection return self.connection
except Exception as e: except Exception as e:

View file

@ -0,0 +1,49 @@
import json
from constants import GRAPH_OUTPUT_DIRECTORY
from services.sqlite_service import SqliteService
from models.graph_node import IGraphNode
from models.graph_edge import IGraphEdge
class GraphService(SqliteService):
error_message_stub = "Could not retrieve contents of table:"
def __init__(self, db_connection):
super().__init__(db_connection)
def __get_nodes(self) -> list[IGraphNode]:
tags = self._query(
"SELECT * FROM tags",
error_message=f"{self.error_message_stub} tags",
)
tags = [IGraphNode(id=f"#{tag[0]}", type="tag") for tag in tags]
entries = self._query(
"SELECT title FROM entries",
error_message=f"{self.error_message_stub} entries",
)
entries = [IGraphNode(id=entry[0], type="entry") for entry in entries]
return tags + entries
def __get_edges(self):
tags = self._query(
"SELECT * FROM entries_tags",
error_message=f"{self.error_message_stub} entries_tags",
)
tags = [IGraphEdge(source=f"#{tag[1]}", target=tag[0]) for tag in tags]
backlinks = self._query(
"SELECT * FROM backlinks",
error_message=f"{self.error_message_stub} backlinks",
)
backlinks = [IGraphEdge(source=f"{backlink[0]}", target = backlink[1]) for backlink in backlinks]
return tags + backlinks
def generate_graph(self):
graph = {"nodes": self.__get_nodes(), "edges": self.__get_edges()}
with open(f"{GRAPH_OUTPUT_DIRECTORY}/eolas-graph.json", "w") as f:
json.dump(graph, f, indent=4)

View file

@ -4,7 +4,7 @@ from pathlib import Path
from termcolor import colored from termcolor import colored
from models.entry import Entry from models.entry import IEntry
from services.parse_markdown_service import ParseMarkdownService from services.parse_markdown_service import ParseMarkdownService
@ -16,7 +16,7 @@ class ParseFileService:
def __get_title(self, file): def __get_title(self, file):
return os.path.splitext(os.path.basename(file))[0] return os.path.splitext(os.path.basename(file))[0]
def __parse_file(self, file) -> Entry: def __parse_file(self, file) -> IEntry:
markdown_data = self.parse_markdown_service.parse(file) markdown_data = self.parse_markdown_service.parse(file)
return { return {
"title": self.__get_title(file), "title": self.__get_title(file),
@ -29,8 +29,8 @@ class ParseFileService:
"body": markdown_data.get("body", []), "body": markdown_data.get("body", []),
} }
def parse_source_directory(self) -> list[Entry]: def parse_source_directory(self) -> list[IEntry]:
print(colored("INFO Indexing entries in source directory", "light_blue")) print(colored("INFO Indexing entries in source directory", "blue"))
parsed_entries = [] parsed_entries = []
with os.scandir(self.source_directory) as dir: with os.scandir(self.source_directory) as dir:
for file in dir: for file in dir:

View file

@ -1,18 +1,13 @@
import sqlite3
from typing import Optional from typing import Optional
from termcolor import colored
from models.entry import Entry
from sql.create_tables import tables
class SqliteService: class SqliteService:
def __init__(self, db_connection): def __init__(self, db_connection):
self.connection = db_connection self.connection = db_connection
self.cursor = db_connection.cursor() self.cursor = db_connection.cursor()
def _query(self, sql, params=None, errorMessage: Optional[str] = None): def _execute(self, sql, params=None, error_message: Optional[str] = None):
"""Use for CREATE, INSERT, UPDATE, DELETE"""
try: try:
if params: if params:
self.cursor.execute(sql, params) self.cursor.execute(sql, params)
@ -21,6 +16,20 @@ class SqliteService:
self.connection.commit() self.connection.commit()
except Exception as e: except Exception as e:
if errorMessage: if error_message:
raise Exception(f"ERROR {errorMessage}: {e}") raise Exception(f"ERROR {error_message}: {e}")
raise
def _query(self, sql, params=None, error_message: Optional[str] = None):
"""Use for SELECT"""
try:
if params:
self.cursor.execute(sql, params)
else:
self.cursor.execute(sql)
return self.cursor.fetchall()
except Exception as e:
if error_message:
raise Exception(f"ERROR {error_message}: {e}")
raise raise

View file

@ -1,6 +1,6 @@
from termcolor import colored from termcolor import colored
from models.entry import Entry from models.entry import IEntry
from services.sqlite_service import SqliteService from services.sqlite_service import SqliteService
from sql.create_tables import tables from sql.create_tables import tables
@ -11,58 +11,58 @@ class TableService(SqliteService):
def __create_tables(self): def __create_tables(self):
for table in tables: for table in tables:
self._query( self._execute(
table["create_statement"], table["create_statement"],
errorMessage=f"Problem creating table {table['name']}", error_message=f"Problem creating table {table['name']}",
) )
print(colored("INFO Created tables", "light_blue")) print(colored("INFO Created tables", "blue"))
def __drop_tables(self): def __drop_tables(self):
# Reverse the order of `tables` list to avoid foreign key violation when # Reverse the order of `tables` list to avoid foreign key violation when
# deleting # deleting
for table in reversed(tables): for table in reversed(tables):
self._query( self._execute(
f"DROP TABLE IF EXISTS {table['name']}", f"DROP TABLE IF EXISTS {table['name']}",
errorMessage=f"Problem truncating table {table['name']}", error_message=f"Problem truncating table {table['name']}",
) )
print(colored("INFO Cleared tables", "light_blue")) print(colored("INFO Cleared tables", "blue"))
def __entry_exists(self, title) -> bool: def __entry_exists(self, title) -> bool:
self._query("SELECT 1 FROM entries WHERE title = :title", {"title": title}) self._execute("SELECT 1 FROM entries WHERE title = :title", {"title": title})
result = self.cursor.fetchone() result = self.cursor.fetchone()
return result is not None return result is not None
def __populate_base_tables(self, entries: list[Entry]): def __populate_base_tables(self, entries: list[IEntry]):
for entry in entries: for entry in entries:
self._query( self._execute(
"INSERT INTO entries (title, last_modified, size, body) VALUES (:title, :last_modified, :size, :body)", "INSERT INTO entries (title, last_modified, size, body) VALUES (:title, :last_modified, :size, :body)",
entry, entry,
errorMessage=f"The following entry could not be added to `entries` table: {entry}", error_message=f"The following entry could not be added to `entries` table: {entry}",
) )
tags = entry.get("tags") tags = entry.get("tags")
if tags: if tags:
for tag in tags: for tag in tags:
self._query( self._execute(
"INSERT OR IGNORE INTO tags (name) VALUES (:tag_name)", "INSERT OR IGNORE INTO tags (name) VALUES (:tag_name)",
{"tag_name": tag}, {"tag_name": tag},
) )
print(colored("INFO Base tables populated", "light_blue")) print(colored("INFO Base tables populated", "blue"))
def __populate_junction_tables(self, entries: list[Entry]): def __populate_junction_tables(self, entries: list[IEntry]):
for entry in entries: for entry in entries:
tags = entry.get("tags") tags = entry.get("tags")
links = entry.get("links") links = entry.get("links")
if tags: if tags:
for tag in tags: for tag in tags:
self._query( self._execute(
"INSERT INTO entries_tags (entry_title, tag_name) VALUES (:entry_title, :tag_name)", "INSERT INTO entries_tags (entry_title, tag_name) VALUES (:entry_title, :tag_name)",
{"entry_title": entry.get("title"), "tag_name": tag}, {"entry_title": entry.get("title"), "tag_name": tag},
) )
if links: if links:
for link in links: for link in links:
if self.__entry_exists(link): if self.__entry_exists(link):
self._query( self._execute(
"INSERT OR IGNORE INTO backlinks (source_entry_title, target_entry_title) VALUES (:source_entry_title, :target_entry_title)", "INSERT OR IGNORE INTO backlinks (source_entry_title, target_entry_title) VALUES (:source_entry_title, :target_entry_title)",
{ {
"source_entry_title": entry.get("title"), "source_entry_title": entry.get("title"),
@ -70,9 +70,9 @@ class TableService(SqliteService):
}, },
) )
print(colored("INFO Junction tables populated", "light_blue")) print(colored("INFO Junction tables populated", "blue"))
def populate_tables(self, entries: list[Entry]): def populate_tables(self, entries: list[IEntry]):
self.__drop_tables() self.__drop_tables()
self.__create_tables() self.__create_tables()
self.__populate_base_tables(entries) self.__populate_base_tables(entries)