#! /usr/local/bin/python3 # # this file: # http://user.it.uu.se/~embe8573/bot/sth.py # https://dataswamp.org/~incal/bot/sth.py import argparse import html.parser import importlib import os import pathlib import pickle import random import re import socket import ssl import subprocess import sys import textwrap import time from numpy import loadtxt from os import listdir from os.path import isfile, join from pprint import pformat from textwrap import indent sys.path.append('py/') import hello_world mail = 'moasenwood@zoho.eu' __author__ = 'incal, michi' __contact__ = 'irc.dataswamp.org #dataswamp ' + mail __credits__ = ['demsh', 'hhvn', 'incal', 'james', 'josuah', 'katzeilla', 'Lucas', 'maxxe', 'michi', 'solene', 'vrl'] __email__ = mail __maintainer__ = 'incal' __version__ = time.strftime('%Y-%m-%d', time.gmtime(os.path.getmtime(pathlib.Path(__file__).absolute()))) cli = argparse.ArgumentParser(description='swamp thing (sth) IRC bot') cli.add_argument('-c', '--channel', type=str, help='IRC channel', default='#swamp-thing') cli.add_argument('-D', '--debug', help='enable debug messages', action='store_true') cli.add_argument('-d', '--dry', help="dry run, don't connect", action='store_true') cli.add_argument('-e', '--encode', help='use SSL', action='store_true') cli.add_argument('-lf', '--log-file', type=str, help='log file') cli.add_argument('-ll', '--log-level', type=int, help='log level, high is more', default=3) cli.add_argument('-m', '--track-messages', help='track messages', action='store_true') cli.add_argument('-n', '--nick', type=str, help='bot nick', default='sth') cli.add_argument('-p', '--port', type=int, help='IRC port', default=6697) cli.add_argument('-s', '--server', type=str, help='IRC server (host)', default='irc.freenode.net') cli.add_argument('-v', '--version', help='show version and exit', action='store_true') cli.add_argument('-vl', '--verbose-level', type=int, help='verbose level, high is more', default=2) opts = cli.parse_args() class Bot: def __init__(self): self.channel = opts.channel self.debug = opts.debug self.dry = opts.dry self.encode = opts.encode self.log_file = opts.log_file self.log_lvl = opts.log_level self.nick = opts.nick self.port = opts.port self.server = opts.server self.track_msgs = opts.track_messages self.ver = opts.version self.verb_lvl = opts.verbose_level self.data_dir = 'data/' self.aliases = [['l', 'elem']] self.alias_cmd_n = 'alias' self.alias_db = self.data_dir + self.alias_cmd_n + '.db' self.bot_cmd_pf = ',' + self.nick self.bot_cmd_pf_alt = self.bot_cmd_pf[0] * 2 self.last_cmd = '' self.rules_url = 'https://dataswamp.org/channel_guidelines.txt' self.scripts_loc = 'scripts/' self.timestamp = '%Y-%m-%dT%H:%M:%S' self.users = [] self.users_file = self.data_dir + 'users.txt' self.zsh = '/usr/local/bin/zsh' if self.ver: print(__version__) quit() else: self.create_dirs() self.init_users() def alias_dump(self): with open(self.alias_db, 'wb+') as file: file.write(pickle.dumps(self.aliases)) def alias_load(self): try: with open(self.alias_db, 'rb') as file: self.aliases = pickle.load(file) except: self.error('error loading alias file' + self.alias_db) def file_lines(self, f): # https://www.w3resource.com/python-exercises/file/python-io-exercise-9.php with open(f) as f: for i, l in enumerate(f): pass return i + 1 def alias_print(self, channel): s = '' if self.aliases: for [a, f] in self.aliases: s += a + ' - ' + f + ", " s = s[:-2] if not s: s = "none" self.send_privmsg(channel, s) def alias_add(self, a, f): if a != self.alias_cmd_n: old = self.aliases ali = [a, f] if ali not in self.aliases: self.aliases = [ali] + old self.alias_dump() def alias_clear(self): self.aliases = [] def alias_lookup(self, cmd): key = '' for [a, f] in self.aliases: key = a if cmd.find(key) == 0 and (len(cmd) == len(a) or cmd[len(a)] == ' '): return cmd.replace(key, f, 1) return cmd def alias_rm(self, alias, channel): new = list(filter(lambda ai: ai[0] != alias, self.aliases)) if new != self.aliases: self.aliases = new self.alias_dump() def make_printable(self, s): # https://stackoverflow.com/a/54451837 NOPRINT_TRANS_TABLE = { i: None for i in range(0, sys.maxunicode + 1) if not chr(i).isprintable() } return s.translate(NOPRINT_TRANS_TABLE) def leet(self, s, rev=False): new = s reps = [ ['a', '4'], ['A', '4'], ['e', '3'], ['E', '3'], ['l', '1'], ['L', '1'], ['o', '0'], ['O', '0'], ['s', '5'], ['S', '5'], ['t', '7'], ['T', '7'], ] for lamer, leet in reps: if rev: new = new.replace(leet, lamer) else: new = new.replace(lamer, leet) return new def find_urls(self, s): # https://www.geeksforgeeks.org/python-check-url-string/ url_re = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z] {2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))" urls = re.findall(url_re, s) return [u[0] for u in urls] def url_title(self, url, channel): # https://dataswamp.org/~incal/bot/scripts/url2title url2title = self.scripts_loc + 'url2title' self.shell_cmd([self.zsh, url2title, url], channel) def create_dirs(self): if not os.path.exists(self.data_dir): os.mkdir(self.data_dir) def greet(self, nick, channel): greeting = '%s: welcome to %s <%s>' % (nick, channel, self.rules_url) self.send_privmsg(channel, greeting) def add_users(self, users, channel=''): usrs = [u.replace('@','') for u in users] if channel: for u in usrs: if not u in self.users: self.greet(u, channel) self.users += usrs self.users = list(dict.fromkeys(self.users)) self.users.sort(key=str.lower) self.save_users() def add_user(self, user, channel): self.add_users([user], channel) def init_users(self): if not os.path.exists(self.users_file): with open(self.users_file, 'w'): pass if os.path.getsize(self.users_file) > 0: user_arr = loadtxt(self.users_file, dtype=str, comments='#', delimiter='\n', unpack=False) self.add_users(user_arr.tolist()) def save_users(self): f = open(self.users_file, 'w') write_str = '\n'.join(str(u) for u in self.users) print(write_str, file=f) f.close() def shell_cmd(self, cmd, channel=''): res = '' try: res = subprocess.run(cmd, stdout=subprocess.PIPE) except FileNotFoundError as e: self.error('command \'' + cmd[0] + '\' not found', e, channel) return if res: out = res.stdout.decode('utf-8') out = html.parser.unescape(out) if out and out != '\x0A': if channel: print('sending ' + out + ' to channel') self.send_privmsg(channel, out) return out def send_privmsg(self, channel, msg): # https://tools.ietf.org/rfc/rfc1459.txt - sect. 2.3.1, item 2 max_len = 256 lines = [] m = self.make_printable(msg) if len(msg) > max_len: lines = textwrap.wrap(msg, max_len, break_long_words=False) for l in lines: self.send('PRIVMSG ' + channel + ' :' + l) self.sleep() else: self.send('PRIVMSG ' + channel + ' :' + m) def make_full_docstring(self, data): ds = self.bot_cmd_pf + ' ' + data[0] if data[1]: ds += ' ' + data[1] if data[2]: ds += ' ' + data[2] return ds def help_cmd_f(self, cmd, help_cmd, cmds, nick, channel): help_msg = '' search_key = cmd[len(help_cmd[0]) + 1:] if search_key: if search_key == 'all': ds = '' for c in cmds: ds = self.make_full_docstring(c) self.send_privmsg(nick, ds) self.sleep() elif search_key == 'list': help_msg = '' for c in cmds: help_msg += c[0] + ' ' help_msg += "(try '" + self.bot_cmd_pf + " help CMD')" help_msg += ' [%s commands]' % len(cmds) else: found = False for c in cmds: if search_key == c[0]: help_msg = self.make_full_docstring(c) found = True break if not found: help_msg = 'no hit: ' + search_key else: help_msg = 'syntax: ' + self.bot_cmd_pf + ' CMD[; ...] (or just ' + self.bot_cmd_pf_alt + ' CMD) where CMD is: ' for c in cmds: help_msg += c[0] if c[1]: help_msg += ' ' + c[1] help_msg += ', ' help_msg = help_msg[:-2] if help_msg: self.send_privmsg(channel, help_msg) def dot_str(self, s): new = '' if s: new = s[:1] + '.' + s[1:] return new def sleep(self, dur=0.1): time.sleep(dur) def bot_cmd(self, cmd, channel, nick): alias_cmd = [self.alias_cmd_n, '[[ALIAS FUN] | rm ALIAS]', 'list aliases; create an alias for a function; remove alias. \cmd to inhibit'] alice_cmd = ['alice', 'NICK', "don't try this"] bem_cmd = ['bem', '[dot]', 'print known users'] cred_cmd = ['cred', '', 'print credits'] echo_cmd = ['echo', 'STR', 'have ' + self.nick + ' say STR'] echos_cmd = ['echos', 'DATA', 'send DATA'] elem_cmd = ['elem', 'CITY', 'weather in CITY. output: location temperature relative-humidity wind-direction/speed [rain] pressure sunrise-zenit-sunset summary feels-like'] geoip_cmd = ['geoip', '[IP]', 'print GeoIP of IP/the bot'] help_cmd = ['help', '[CMD|all|list]', 'print command syntax/CMD docstring/all help/a list of commands'] hi_cmd = ['hi', '', 'say hi to the bot'] hist_cmd = ['hist', '[TECH]', 'look up TECH in computer history [print db URL]'] ip_cmd = ['ip', '', 'print bot IP'] join_cmd = ['join', 'CHANNEL', 'have ' + self.nick + ' join CHANNEL'] junior_cmd = ['junior', '', 'ask this person for help'] kill_cmd = ['kill', '', 'kill the bot'] leet_cmd = ['leet', '[rev] STR', 'translate STR to/from h4x0r'] name_cmd = ['name', '', "print the bot's name"] nick_cmd = ['nick', 'NICK', 'set bot NICK'] quiz_cmd = ['quiz', '', 'print quiz URLs'] r_cmd = ['r', '', 'execute the previous command'] rules_cmd = ['rules', '', 'print channel guidelines URL'] senior_cmd = ['senior', '[POS|NICK]', "print NICK's ancienty POS. put nick as n.ick"] src_cmd = ['src', '', 'print bot source URL'] stats_cmd = ['stats', '', 'print system info'] time_cmd = ['time', '[TZ]', 'print time, set timezone TZ'] ver_cmd = ['ver', '', 'print bot version'] cmds = [ alias_cmd, bem_cmd, cred_cmd, echo_cmd, echos_cmd, elem_cmd, geoip_cmd, help_cmd, hi_cmd, hist_cmd, ip_cmd, join_cmd, junior_cmd, kill_cmd, leet_cmd, name_cmd, nick_cmd, quiz_cmd, r_cmd, rules_cmd, senior_cmd, src_cmd, stats_cmd, time_cmd, ver_cmd ] cmds.sort() ip_shell_cmd = ['curl', '-s', 'icanhazip.com'] cmd = cmd.strip() delim_char = ';' if cmd.find(delim_char) != -1: cmds = cmd.split(delim_char) for c in cmds: self.bot_cmd(c, channel, nick) return if cmd: if cmd[0] == '\\': cmd = cmd[1:] else: cmd = self.alias_lookup(cmd) else: return if cmd.find(nick_cmd[0]) == 0: nick = cmd[len(nick_cmd[0]) + 1:] if nick: self.nick_set(nick) elif cmd.find(alice_cmd[0]) == 0: args = cmd[len(alice_cmd[0]) + 1:] args_l = args.split() nick = args_l[0] msg = args[len(nick) + 1:] self.send_privmsg(nick, msg) elif cmd.find(join_cmd[0]) == 0: cnl = cmd[len(join_cmd[0]) + 1:] if cnl: if cnl[0] != "#": cnl = '#' + cnl self.join(cnl) # put this before echo elif cmd.find(echos_cmd[0]) == 0: send_cmd = cmd[len(echos_cmd[0]) + 1:] if send_cmd: self.send(send_cmd) elif cmd.find(echo_cmd[0]) == 0: say_what = cmd[len(echo_cmd[0]) + 1:] if say_what: self.send_privmsg(channel, say_what) elif cmd.find(alias_cmd[0]) == 0: args = cmd[len(alias_cmd[0]) + 1:] args_l = args.split() rm = alias_cmd[1][alias_cmd[1].find('|') + 2:] rm = rm[:rm.find(' ')] if args == "": self.alias_print(channel) elif args_l[0] == rm: for a in args_l[1:]: self.alias_rm(a, channel) else: if len(args_l) >= 2: ali = args_l[0] fun_l = args_l[1:] fun = ' '.join(fun_l) self.alias_add(ali, fun) elif cmd.find(elem_cmd[0]) == 0: city = cmd[len(elem_cmd[0]) + 1:] if city: elem = self.scripts_loc + elem_cmd[0] res = self.shell_cmd([self.zsh, elem, city]) if res: self.send_privmsg(channel, res) elif cmd.find(senior_cmd[0]) == 0: arg = cmd[len(senior_cmd[0]) + 1:] senior = self.scripts_loc + senior_cmd[0] res = self.shell_cmd([self.zsh, senior, arg]) if res: self.send_privmsg(channel, res) elif cmd == stats_cmd[0]: # https://dataswamp.org/~incal/bot/scripts/stats stats = self.scripts_loc + stats_cmd[0] res = self.shell_cmd([self.zsh, stats]) if res: self.send_privmsg(channel, res) elif cmd.find(leet_cmd[0]) == 0: s = cmd[len(leet_cmd[0]) + 1:] out = '' rev_kwd = leet_cmd[1][leet_cmd[1].find('[') + 1 : leet_cmd[1].find(']')] if s.find(rev_kwd) == 0: s = s[len(rev_kwd) + 1:] out = self.leet(s, True) else: out = self.leet(s) self.send_privmsg(channel, out) elif cmd.find(bem_cmd[0]) == 0: style = cmd[len(bem_cmd[0]) + 1:] usrs = [] new = '' if style == 'dot': for u in self.users: new = self.dot_str(u) usrs.append(new) else: for u in self.users: new = self.leet(u) if new == u: new = self.dot_str(u) usrs.append(new) usrs_str = ' '.join(sorted(usrs, key=str.lower)) l = len(usrs) out = '%s (%s users)' % (usrs_str, l) self.send_privmsg(channel, out) elif cmd == r_cmd[0]: if self.last_cmd: self.bot_cmd(self.last_cmd, channel, nick) return elif cmd == cred_cmd[0]: cred = ' '.join(sorted(__credits__, key=str.lower)) self.send_privmsg(nick, cred) elif cmd.find(geoip_cmd[0]) == 0: ip = cmd[len(geoip_cmd[0]) + 1:] if ip == '': ip = self.shell_cmd(ip_shell_cmd) geoip_shell_cmd = ['geoiplookup', ip][:23] res = self.shell_cmd(geoip_shell_cmd, channel) if res: self.send_privmsg(channel, res[27:]) elif cmd.find(help_cmd[0]) == 0: self.help_cmd_f(cmd, help_cmd, cmds, nick, channel) elif cmd == junior_cmd[0]: # https://dataswamp.org/~incal/bot/scripts/junior jnr = self.scripts_loc + junior_cmd[0] res = self.shell_cmd([self.zsh, jnr]) self.send_privmsg(channel, res) elif cmd.find(hist_cmd[0]) == 0: # https://dataswamp.org/~incal/bot/scripts/hist tech = cmd[len(hist_cmd[0]) + 1:] if not tech: tech = 'db' hist = self.scripts_loc + hist_cmd[0] res = self.shell_cmd([self.zsh, hist, tech]) if res: self.send_privmsg(channel, res) else: self.send_privmsg(channel, tech + ': no hit') elif cmd == hi_cmd[0] or cmd.find(hi_cmd[0] + ' ') == 0: end='' rand=random.randint(0, 9) if rand == 0: end=' :$' self.send_privmsg(channel, 'hi ' + nick + end) elif cmd == ip_cmd[0]: self.shell_cmd(ip_shell_cmd, channel) elif cmd == kill_cmd[0]: quit() elif cmd == name_cmd[0]: out = 'swamp thing ' self.send_privmsg(channel, out) elif cmd == rules_cmd[0]: out = self.rules_url self.send_privmsg(channel, out) elif cmd == src_cmd[0]: url = 'https://dataswamp.org/~incal' bot_sec = url + '/#bot' dir_url = url + '/bot/' bot_src = 'sth.py' bot_url = dir_url + bot_src outp = "%s %s [%d lines] " % (bot_sec, bot_url, self.file_lines(bot_src)) scripts_url = dir_url + self.scripts_loc outp += scripts_url + " " # https://stackoverflow.com/questions/3207219/how-do-i-list-all-files-of-a-directory scripts = [s for s in listdir(self.scripts_loc) if isfile(join(self.scripts_loc, s))] for s in scripts: sp = self.scripts_loc + s outp += "%s [%d] " % (s, self.file_lines(sp)) outp = outp[:-1] self.send_privmsg(channel, outp) elif cmd.find(time_cmd[0]) == 0: # https://stackoverflow.com/a/1398742 tz_arg = cmd[len(time_cmd[0]) + 1:] now = time.time() if tz_arg: os.environ['TZ'] = tz_arg time.tzset() t = time.strftime('%Y-%m-%d %H:%M:%S %Z', time.localtime(now)) self.send_privmsg(channel, t) elif cmd == ver_cmd[0]: self.send_privmsg(channel, __version__) elif cmd == quiz_cmd[0]: bike_quiz = 'bikes: https://dataswamp.org/~incal/BIKE-QUIZ' cmp_quiz = 'computers: https://dataswamp.org/~incal/CMP-QUIZ' interactive = 'interactive versions: http://nifty-networks.net:6969/' self.send_privmsg(channel, cmp_quiz) self.sleep() self.send_privmsg(channel, bike_quiz) self.sleep() self.send_privmsg(channel, interactive) else: cmd_file = 'py/' + cmd + '.py' if os.path.isfile(cmd_file): mod = __import__(cmd) importlib.reload(mod) out = mod.engage() self.send_privmsg(channel, out) else: self.send_privmsg(channel, "DNC") self.last_cmd = cmd def err(self, reason): self.output(reason, 0, 'err') exit def log(self, entry, msg_lvl=0): if self.log_file and msg_lvl <= self.log_lvl: entry = entry + '\n' f = open(self.log_file, 'a') f.write(entry) f.close() def output(self, text, msg_lvl=2, from_where=''): # msg lvl: 0 - error, the program terminates # 1 - warning # 2 - some info (default) # 3 - all info ts_prefix = time.strftime(self.timestamp) + ' ' err_prefix = 'ERR: ' wrn_prefix = 'WRN: ' prefix = ts_prefix if msg_lvl == 0: prefix += err_prefix elif msg_lvl == 1: prefix += wrn_prefix if self.track_msgs and from_where: prefix += ' [' + from_where + '] ' out = prefix + text self.log(out, msg_lvl) if msg_lvl <= self.verb_lvl: print(out) def send(self, data): self.socket.send((data + '\r\n').encode()) def error(self, desc, e='', channel=''): errstr = '' if e: errstr = "%s [exception %s]" % (desc, e.__class__.__name__) else: errstr = desc + " [failed]" if errstr and channel: self.send_privmsg(channel, errstr) elif errstr: print(errstr) def recv(self): bts = self.socket.recv(4096) try: decode = bts.decode() except UnicodeDecodeError as e: self.error("non-UTF-8 char(s)?", e) return lines = decode.splitlines() for l in lines: if l: self.output(l, from_where='recv') self.init_process(l) def connect(self, server, port, nick, encode): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((server, port)) if encode: self.socket = ssl.wrap_socket(s) else: self.socket = s self.nick_set(nick) self.send('USER %s %s %s :%s' % (nick, nick, nick, nick)) def nick_set(self, nick): self.nick = nick self.bot_cmd_pf = self.bot_cmd_pf[0] + self.nick self.send('NICK ' + self.nick) def join(self, channel): self.send('JOIN ' + channel) def names(self, channel): self.send('NAMES ' + channel) def mode(self, channel, nick, mode): self.send('MODE %s %s %s' % (channel, mode, nick)) def voice(self, channel, nick): self.mode(channel, nick, '+v') def init_process(self, msg): here = 'init_process' params = [] trailing = '' nick = '' channel = '' tkn = msg.split() if len(tkn) < 2: self.output('msg too small', 3, here) return if tkn[0] and tkn[0][0] == ':': prefix = tkn.pop(0)[1:] if '!' in prefix: nick, idathost = re.split('!', prefix, 1) else: nick = prefix cmd = tkn.pop(0) while tkn: if tkn[0] and tkn[0][0] != ':': params.append(tkn.pop(0)) else: tkn[0] = tkn[0][1:] trailing = ' '.join(tkn) break # parse server cmd if cmd: self.output('cmd: ' + cmd, from_where=here) else: self.output('cannot tokenize', 1, here) if cmd == 'PING': daemon = trailing self.send('PONG ' + daemon) self.output('pong ' + daemon, from_where=here+'/PING') elif cmd == 'JOIN': if trailing: channel = trailing else: channel = params[0] if channel and nick: self.output(nick + ' joined ' + channel, from_where=here+'/JOIN') self.add_user(nick, channel) elif cmd == 'PART': channel = params[0] out = nick + ' quit ' + channel self.output(out, from_where=here+'/PART') elif cmd == 'PRIVMSG': channel = params[0] usr_msg = trailing out = nick + ' [' + channel + '] ' + usr_msg self.output(out, from_where=here+'/PRIVMSG') bot_cmd_hit = usr_msg.find(self.bot_cmd_pf) bot_cmd_alt_hit = usr_msg.find(self.bot_cmd_pf_alt) if bot_cmd_hit >= 0: cmd_index = bot_cmd_hit + len(self.bot_cmd_pf) + 1 cmd_str = usr_msg[cmd_index:] self.bot_cmd(cmd_str, channel, nick) elif bot_cmd_alt_hit >= 0: empty_cmd = False i = 2 index = bot_cmd_alt_hit + 2 if index <= len(usr_msg) - 1: if usr_msg[index] == ' ': i = 3 else: empty_cmd = True index = bot_cmd_alt_hit + i if index <= len(usr_msg) - 1: cmd_str = usr_msg[bot_cmd_alt_hit + i:] print(cmd_str) self.bot_cmd(cmd_str, channel, nick) else: empty_cmd = True if empty_cmd: self.send_privmsg(channel, "empty barrels make the most noise") else: urls = self.find_urls(usr_msg) if urls: urls = list(dict.fromkeys(urls)) for u in urls: self.url_title(u, channel) elif cmd == '266': current = params[1] max_users = params[2] out = 'global users %s, max %s' % (current, max_users) self.output(out, from_where=here+'/266') elif cmd == '353': channel = params[2] users = trailing.split() self.add_users(users, channel) def connect_and_join(self): online = False wait = 1 while not online: try: self.connect(self.server, self.port, self.nick, self.encode) online = True except: err_string = 'connect error. reconnecting in ' + wait + 's' self.error(err_string) wait *= 2 self.sleep(wait) self.alias_load() self.join(self.channel) self.join("#ghost") def main_loop(self): while True: try: self.recv() except: self.error('receive error') self.connect_and_join() bot = Bot() if not bot.dry: bot.connect_and_join() bot.main_loop()