105 lines
2.5 KiB
Python
Executable file
105 lines
2.5 KiB
Python
Executable file
#! /usr/local/bin/python3
|
|
|
|
# Export time entries from TimeWarrior CLI and upload to AWS DB via Lambda
|
|
|
|
import subprocess
|
|
import json
|
|
import warnings
|
|
from datetime import datetime
|
|
|
|
|
|
def execute_shell_command(command):
|
|
try:
|
|
process = subprocess.run(
|
|
[command],
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
)
|
|
return process.stdout.strip()
|
|
except subprocess.CalledProcessError as e:
|
|
return e.stderr.strip()
|
|
|
|
|
|
def seconds_to_digital_time(seconds):
|
|
hours = seconds / 3600
|
|
return round(hours, 2)
|
|
|
|
|
|
def get_tw_entries():
|
|
time_entries = execute_shell_command("timew export")
|
|
return json.loads(time_entries)
|
|
|
|
|
|
def utc_to_sql_stamp(utc):
|
|
datetime_obj = datetime.strptime(utc, "%Y%m%dT%H%M%SZ")
|
|
sql_stamp = datetime_obj.strftime("%Y-%m-%d %H:%M:%S")
|
|
return sql_stamp
|
|
|
|
|
|
def get_entry_duration(utc1, utc2):
|
|
utc1_object = datetime.strptime(utc1, "%Y%m%dT%H%M%SZ")
|
|
utc2_object = datetime.strptime(utc2, "%Y%m%dT%H%M%SZ")
|
|
|
|
time_difference = utc2_object - utc1_object
|
|
difference_seconds = time_difference.total_seconds()
|
|
return seconds_to_digital_time(difference_seconds)
|
|
|
|
|
|
def active_timer():
|
|
status = execute_shell_command("timew get dom.active")
|
|
return True if status == "1" else False
|
|
|
|
|
|
def get_tags(tag_list):
|
|
if len(tag_list) > 0:
|
|
return tag_list
|
|
else:
|
|
warnings.warn("No tag data")
|
|
return ["null", "null"]
|
|
|
|
|
|
def process_entries():
|
|
processed = []
|
|
entries = get_tw_entries()
|
|
|
|
for entry in entries:
|
|
tags = get_tags(entry["tags"])
|
|
start_sql_stamp = utc_to_sql_stamp(entry["start"])
|
|
end_sql_stamp = utc_to_sql_stamp(entry["end"])
|
|
duration = get_entry_duration(entry["start"], entry["end"])
|
|
|
|
processed.append(
|
|
{
|
|
"activity_type": tags[0],
|
|
"start": start_sql_stamp,
|
|
"end": end_sql_stamp,
|
|
"duration": duration,
|
|
"description": tags[1],
|
|
}
|
|
)
|
|
|
|
return processed
|
|
|
|
|
|
def main():
|
|
try:
|
|
active = False
|
|
|
|
if active_timer():
|
|
active = True
|
|
execute_shell_command("timew stop")
|
|
|
|
time_entries = process_entries()
|
|
print(time_entries)
|
|
|
|
if active:
|
|
execute_shell_command("timew continue")
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|