diff --git a/db/eolas.db b/db/eolas.db new file mode 100644 index 0000000..afb6270 Binary files /dev/null and b/db/eolas.db differ diff --git a/setup.py b/setup.py index a4ca775..1cace18 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( install_requires=["python-frontmatter", "termcolor"], entry_points={ "console_scripts": [ - "run=app:main", + "eolas-db=cli:main", ], }, ) diff --git a/src/cli.py b/src/cli.py index 010739e..159b6bd 100644 --- a/src/cli.py +++ b/src/cli.py @@ -4,13 +4,13 @@ from constants import EOLAS_DIRECTORY from controllers.controller import Controller from services.database_service import DatabaseService from services.parse_file_service import ParseFileService -from services.sqlite_service import SqliteService +from services.table_service import TableService database_service = DatabaseService("eolas") database_connection = database_service.connect() -sqlite_service = SqliteService(database_connection) +table_service = TableService(database_connection) parse_file_service = ParseFileService(EOLAS_DIRECTORY) -controller = Controller(database_service, sqlite_service, parse_file_service) +controller = Controller(database_service, table_service, parse_file_service) def main(): @@ -18,11 +18,11 @@ def main(): prog="eolas-db", description="Eolas database manager." ) parser.add_argument( - "command", choices=["parse", "populate"], help="Command to execute" + "command", choices=["populate-database"], help="Command to execute" ) args = parser.parse_args() - if args.command == "populate": + if args.command == "populate-database": controller.populate_database() diff --git a/src/controllers/controller.py b/src/controllers/controller.py index b2ee1f0..d16c494 100644 --- a/src/controllers/controller.py +++ b/src/controllers/controller.py @@ -1,12 +1,12 @@ class Controller: - def __init__(self, database_service, sqlite_service, parse_file_service): + def __init__(self, database_service, table_service, parse_file_service): self.database_service = database_service - self.sqlite_service = sqlite_service + self.table_service = table_service self.parse_file_service = parse_file_service def populate_database(self): try: entries = self.parse_file_service.parse_source_directory() - self.sqlite_service.populate_tables(entries) + self.table_service.populate_tables(entries) finally: self.database_service.disconnect() diff --git a/src/services/database_service.py b/src/services/database_service.py index 062914f..9737c5a 100644 --- a/src/services/database_service.py +++ b/src/services/database_service.py @@ -1,3 +1,4 @@ +import os import sqlite3 from typing import Optional @@ -15,6 +16,9 @@ class DatabaseService: return self.connection try: + if not os.path.exists(self.db_path): + os.makedirs(self.db_path) + print(colored("INFO Created database directory", "light_blue")) self.connection = sqlite3.connect(f"{self.db_path}/{self.db_name}.db") self.connection.execute("PRAGMA foreign_keys = ON") print(colored("INFO Database connection established", "light_blue")) diff --git a/src/services/sqlite_service.py b/src/services/sqlite_service.py index b181f59..159dd9f 100644 --- a/src/services/sqlite_service.py +++ b/src/services/sqlite_service.py @@ -8,11 +8,11 @@ from sql.create_tables import tables class SqliteService: - def __init__(self, connection): - self.connection = connection - self.cursor = connection.cursor() + def __init__(self, db_connection): + self.connection = db_connection + self.cursor = db_connection.cursor() - def __query(self, sql, params=None, errorMessage: Optional[str] = None): + def _query(self, sql, params=None, errorMessage: Optional[str] = None): try: if params: self.cursor.execute(sql, params) @@ -24,72 +24,3 @@ class SqliteService: if errorMessage: raise Exception(f"ERROR {errorMessage}: {e}") raise - - def __create_tables(self): - for table in tables: - self.__query( - table["create_statement"], - errorMessage=f"Problem creating table {table['name']}", - ) - print(colored("INFO Created tables", "light_blue")) - - def __drop_tables(self): - # Reverse the order of `tables` list to avoid foreign key violation when - # deleting - for table in reversed(tables): - self.__query( - f"DROP TABLE IF EXISTS {table['name']}", - errorMessage=f"Problem truncating table {table['name']}", - ) - print(colored("INFO Cleared tables", "light_blue")) - - def __entry_exists(self, title) -> bool: - self.__query("SELECT 1 FROM entries WHERE title = :title", {"title": title}) - result = self.cursor.fetchone() - return result is not None - - def __populate_base_tables(self, entries: list[Entry]): - for entry in entries: - self.__query( - "INSERT INTO entries (title, last_modified, size, body) VALUES (:title, :last_modified, :size, :body)", - entry, - errorMessage=f"The following entry could not be added to `entries` table: {entry}", - ) - tags = entry.get("tags") - if tags: - for tag in tags: - self.__query( - "INSERT OR IGNORE INTO tags (name) VALUES (:tag_name)", - {"tag_name": tag}, - ) - - print(colored("INFO Base tables populated", "light_blue")) - - def __populate_junction_tables(self, entries: list[Entry]): - for entry in entries: - tags = entry.get("tags") - links = entry.get("links") - if tags: - for tag in tags: - self.__query( - "INSERT INTO entries_tags (entry_title, tag_name) VALUES (:entry_title, :tag_name)", - {"entry_title": entry.get("title"), "tag_name": tag}, - ) - if links: - for link in links: - if self.__entry_exists(link): - self.__query( - "INSERT OR IGNORE INTO backlinks (source_entry_title, target_entry_title) VALUES (:source_entry_title, :target_entry_title)", - { - "source_entry_title": entry.get("title"), - "target_entry_title": link, - }, - ) - - print(colored("INFO Junction tables populated", "light_blue")) - - def populate_tables(self, entries: list[Entry]): - self.__drop_tables() - self.__create_tables() - self.__populate_base_tables(entries) - self.__populate_junction_tables(entries) diff --git a/src/services/table_service.py b/src/services/table_service.py new file mode 100644 index 0000000..930bde1 --- /dev/null +++ b/src/services/table_service.py @@ -0,0 +1,79 @@ +from termcolor import colored + +from models.entry import Entry +from services.sqlite_service import SqliteService +from sql.create_tables import tables + + +class TableService(SqliteService): + def __init__(self, db_connection): + super().__init__(db_connection) + + def __create_tables(self): + for table in tables: + self._query( + table["create_statement"], + errorMessage=f"Problem creating table {table['name']}", + ) + print(colored("INFO Created tables", "light_blue")) + + def __drop_tables(self): + # Reverse the order of `tables` list to avoid foreign key violation when + # deleting + for table in reversed(tables): + self._query( + f"DROP TABLE IF EXISTS {table['name']}", + errorMessage=f"Problem truncating table {table['name']}", + ) + print(colored("INFO Cleared tables", "light_blue")) + + def __entry_exists(self, title) -> bool: + self._query("SELECT 1 FROM entries WHERE title = :title", {"title": title}) + result = self.cursor.fetchone() + return result is not None + + def __populate_base_tables(self, entries: list[Entry]): + for entry in entries: + self._query( + "INSERT INTO entries (title, last_modified, size, body) VALUES (:title, :last_modified, :size, :body)", + entry, + errorMessage=f"The following entry could not be added to `entries` table: {entry}", + ) + tags = entry.get("tags") + if tags: + for tag in tags: + self._query( + "INSERT OR IGNORE INTO tags (name) VALUES (:tag_name)", + {"tag_name": tag}, + ) + + print(colored("INFO Base tables populated", "light_blue")) + + def __populate_junction_tables(self, entries: list[Entry]): + for entry in entries: + tags = entry.get("tags") + links = entry.get("links") + if tags: + for tag in tags: + self._query( + "INSERT INTO entries_tags (entry_title, tag_name) VALUES (:entry_title, :tag_name)", + {"entry_title": entry.get("title"), "tag_name": tag}, + ) + if links: + for link in links: + if self.__entry_exists(link): + self._query( + "INSERT OR IGNORE INTO backlinks (source_entry_title, target_entry_title) VALUES (:source_entry_title, :target_entry_title)", + { + "source_entry_title": entry.get("title"), + "target_entry_title": link, + }, + ) + + print(colored("INFO Junction tables populated", "light_blue")) + + def populate_tables(self, entries: list[Entry]): + self.__drop_tables() + self.__create_tables() + self.__populate_base_tables(entries) + self.__populate_junction_tables(entries)