Source code for hermes.common


#########################################################
#   HERMES - github.com/baronefr/hermes
#--------------------------------------------------------

import os
import csv
from termcolor import colored

import hermes



[docs] class hprint: """Class wrapper to print fancy colorful messages."""
[docs] def std(string : str, head : str = 'hermes', color : str = 'blue') -> None: if head is not None: print('[', colored(head, color, attrs=['bold']), '] ', end = '') print(string)
[docs] def err(string : str, head : str = 'error') -> None: if head is not None: print('[', colored(head, 'red', attrs=['bold']), '] ', end = '') print(string)
[docs] def warn(string : str, head : str = 'warning') -> None: if head is not None: print('[', colored(head, 'yellow', attrs=['bold']), '] ', end = '') print(string)
[docs] def info(string : str, head : str = 'info', color : str = 'green') -> None: if head is not None: print('[', colored(head, color, attrs=['bold']), '] ', end = '') print(string)
[docs] def append(string : str) -> None: print(' ∟ ', string)
# DEFAULT NAMESPACE ----------------------------------------------- # # My advice is to not change these options... # namespace = { # configuration files ------------ 'SETUP_FILE' : 'setup.hermes', 'SETTINGS_FILE' : 'settings.cfg', 'AUTH_FILE' : 'users.key', 'ONESHOTS' : 'oneshot.py', # bot log files 'LOG_DIR' : 'log/', # dev: do not change this... there might be unmanaged references 'BLOG_UNAUTH' : "unauth.log", # log of all unauthorized users 'BLOG_AUTH_USER' : "user_{}.log", # a log for each authorized user 'BLOG_REGISTER' : "register.tmp", # a register for user which ask for access # bot etc 'BLOG_HEAD' : "Hermes Telegram bot - log file [{}]\n init: {}\n============================\n\n", 'LOG_TIMESTAMP' : "%Y/%m/%d %H:%M:%S", # task files 'TASK_INDEX' : 'tasks.list', 'TASK_PRIVATE' : 'task_{}.log', # task etc 'TINDEX_HEAD' : "Hermes Task - index file [{}]\n", 'TINDEX_FIELDS' : "id,alias,pid,spawntime,status", # cli 'CLI_HOOK' : "%HERMES%" }
[docs] def hook_default() -> str: """Returns the default CLI hook flag.""" return namespace['CLI_HOOK']
bot_timeout = 30 # max age of messages to process [seconds] # SETTINGS POLICY ------------------------------------------------- # # Each module (bot & task) has an optional argument 'settings' # of the init function, which overrides the following policy. # In priority order, the default policy hereby implemented is: # 0) (override, managed by the calling function) # 1) environment variable # 2) current path of the executable #
[docs] def settings_default_policy(): # [1] environment variable prefix = os.environ.get( hermes.env_key ) # [2] local path if prefix is None: prefix = './' print('[ warning ] no env variable detected, using pwd') return prefix
# USER PERMISSIONS FILE ------------------------------------------- # # This function reads the user permission file. # # Returns: - dictionary of (key : USER_NAME, value : CHAT_ID) # - dictionary of special lists (bonjour, ...) #
[docs] def read_permissions(ufile): users_dict = {}; extra_dict = {}; bonjour_list = [] try: auth_file = open(ufile, mode='r') except Exception as err: raise Exception('cannot open user file ' + ufile) try: auth_csv = csv.DictReader(auth_file) except Exception as err: raise Exception('cannot read user file ' + ufile) # process each record for usr in auth_csv: # converting types & parsing boolean values try: usr['chatid'] = int( usr['chatid'] ) usr['active'] = True if usr['active'] in ['true', 'True'] else False usr['bonjour'] = True if usr['bonjour'] in ['true', 'True'] else False except Exception as err: print(' >>', err) raise Exception("cannot parse user field, check file ({}) syntax".format(ufile)) # skip users users marked as unactive if not usr['active']: continue # dispatch if usr['bonjour']: bonjour_list.append( usr['chatid'] ) users_dict[ usr['name'] ] = usr['chatid'] auth_file.close() extra_dict[ 'bonjour' ] = bonjour_list return users_dict, extra_dict
# INDEX checks ------------------------------------------------- #
[docs] def index_access(ipath, ifile, time = '', autocreate = True) -> None: # check the path if autocreate: os.makedirs(ipath, exist_ok=True) if os.path.exists(ipath) is False: if autocreate: raise Exception("Task path does not exist and cannot be created.\n" "Check if you have permission to create this folder.") else: raise Exception("Task path does not exist, autocreate = False.") # create index, if not exist if os.path.isfile(ifile) is False: if autocreate: index_header = namespace['TINDEX_HEAD'] + namespace['TINDEX_FIELDS'] + '\n' with open(ifile, 'w') as f: f.write( index_header.format( time ) ) else: raise Exception('index does not exists, autocreate = False.') # check if index is writable with open(ifile, 'a') as f: if f.writable() is False: raise Exception('index file is not writable')
[docs] def index_get_tasks(ifile : str): # Arg: ifile name of auth file # # Output: - list of alive tasks (id) # - list of closed tasks (id) (includes failed tasks) # - dictionary with (key:id, alias) alive_tasks = []; closed_tasks = []; alias_tasks = {}; with open(ifile, mode='r') as tindex: next(tindex) # skip header rtasks = csv.DictReader(tindex) for task in rtasks: if(task['status'] in ['failed','closed']): closed_tasks.append( task['id'] ) else: alive_tasks.append( task['id'] ) alias_tasks[ str(task['id']) ] = str(task['alias']) return alive_tasks, closed_tasks, alias_tasks