#! /usr/local/bin/python3 # # this file: # https://dataswamp.org/~incal/sth/sth.py import argparse import html.parser import importlib import inspect import numpy import os import pathlib import pickle import queue import random import re import select import socket import ssl import subprocess import sys import textwrap import threading import time msgs = queue.Queue() pymods_dir = 'py' sys.path.append(pymods_dir) import hello_world program_name = 'sth' sth_nick = program_name tess_nick = 'tess' dataswamp_channel = '#dataswamp' dataswamp_server = 'irc.dataswamp.org' threads = threading.Event() __email__ = 'incal@dataswamp.org' __author__ = 'incal, michi' __contact__ = f'{dataswamp_server} {dataswamp_channel}, {__email__}' __credits__ = ['demsh', 'hhvn', 'incal', 'james', 'josuah', 'katzeilla', 'leslie', 'Lucas', 'maxxe', 'michi', 'solene', 'vrl'] __maintainer__ = 'incal' __version__ = time.strftime('%Y-%m-%d', time.gmtime(os.path.getmtime(pathlib.Path(__file__).absolute()))) cli = argparse.ArgumentParser(description=f'swamp thing ({sth_nick}) IRC bot') cli.add_argument('-c', '--channel', type=str, help='IRC channel', default='#swamp-thing') cli.add_argument('-D', '--debug', help='enable debug messages', action='store_true') cli.add_argument('-d', '--dry', help="dry run, don't connect", action='store_true') cli.add_argument('-e', '--encode', help='use SSL', action='store_true') cli.add_argument('-lf', '--log-file', type=str, help='log file') cli.add_argument('-ll', '--log-level', type=int, help='log level, high is more', default=3) cli.add_argument('-m', '--track-messages', help='track messages', action='store_true') cli.add_argument('-n', '--nick', type=str, help='bot nick', default=sth_nick) cli.add_argument('-p', '--port', type=int, help='IRC port', default=6697) cli.add_argument('-s', '--server', type=str, help='IRC server (host)', default='irc.freenode.net') cli.add_argument('-v', '--version', help='show version and exit', action='store_true') cli.add_argument('-vl', '--verbose-level', type=int, help='verbose level, high is more', default=2) opts = cli.parse_args() def day(t): return (t[0], t[1], t[2]) def dot_str(s): new = '' if s: new = s[:1] + '.' + s[1:] return new def file_lines(filename): # https://www.w3resource.com/python-exercises/file/python-io-exercise-9.php with open(filename) as f: i = 0 for i, _ in enumerate(f): pass return i + 1 def find_urls(s): # https://www.geeksforgeeks.org/python-check-url-string/ url_re = r"(?i)\b((?:(https?|gemini|gopher)://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z] {2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))" urls = re.findall(url_re, s) return [u[0] for u in urls] def leet(s, rev=False): new = s reps = [['a', '4'], ['A', '4'], ['e', '3'], ['E', '3'], ['l', '1'], ['L', '1'], ['o', '0'], ['O', '0'], ['s', '5'], ['S', '5'], ['t', '7'], ['T', '7']] for lamer, elite in reps: if rev: new = new.replace(elite, lamer) else: new = new.replace(lamer, elite) return new def make_channel(cnl): if cnl[0] != '#': return f"#{cnl}" return cnl def make_printable(s): NOPRINT_TRANS_TABLE = { i: None for i in range(0, sys.maxunicode + 1) if not chr(i).isprintable() } return s.translate(NOPRINT_TRANS_TABLE) def sleep(dur=0.025): time.sleep(dur) alias_kw = 'alias' alice_kw = 'alice' ascii_kw = 'ascii' bem_kw = 'bem' bridge_kw = 'bridge' cred_kw = 'cred' echo_kw = 'echo' echos_kw = 'echos' elem_kw = 'elem' fifo_kw = 'fifo' help_kw = 'help' hi_kw = 'hi' hist_kw = 'hist' ip_kw = 'ip' join_kw = 'join' junior_kw = 'junior' kill_kw = 'kill' leet_kw = 'leet' name_kw = 'name' nick_kw = 'nick' plonk_kw = 'plonk' r_kw = 'r' rand_kw = 'rand' rules_kw = 'rules' senior_kw = 'senior' src_kw = 'src' stats_kw = 'stats' time_kw = 'time' ver_kw = 'ver' chnl_fp = 'CHNL' nick_fp = 'NICK' chnl_dst_fp = '%s_DST' % chnl_fp chnl_src_fp = '%s_SRC' % chnl_fp chnl_or_nick_fp = '%s | %s' % (chnl_fp, nick_fp) alias_fp = alias_kw.upper() city_fp = 'CITY' cmd_fp = 'CMD' data_fp = 'DATA' fun_fp = 'FUN' msg_fp = 'MSG' name_fp = 'NAME' pos_fp = 'POS' str_fp = 'STR' tech_fp = 'TECH' tz_fp = 'TZ' all_arg = 'all' dot_arg = 'dot' fall_arg = 'fall' list_arg = 'list' name_arg = 'name' rev_arg = 'rev' rm_arg = 'rm' senior_arg = 'senior' usr_arg = 'user' help_cmd = [help_kw, '[%s | %s | %s]' % (cmd_fp, all_arg, list_arg), "print command syntax; print %s's docstring; print %s help; print a %s of commands" % (cmd_fp, all_arg, list_arg)] alias_cmd = [alias_kw, '[%s | %s [%s] | %s %s]' % (list_arg, alias_fp, fun_fp, rm_arg, alias_fp), 'print (%s) all aliases; create an %s for %s; remove (%s) %s. \\cmd to inhibit' % (list_arg, alias_fp, fun_fp, rm_arg, alias_fp)] alice_cmd = [alice_kw, '%s %s' % (chnl_or_nick_fp, msg_fp), "say %s to %s or %s" % (msg_fp, chnl_fp, nick_fp)] ascii_cmd = [ascii_kw, '[%s [force]]' % (name_fp), 'list all ascii art; show %s (unconditionally)' % (name_fp)] bem_cmd = [bem_kw, '[%s]' % dot_arg, 'print known users; use %s notation' % dot_str(dot_arg)] bridge_cmd = [bridge_kw, '%s | %s [%s]' % (fall_arg, chnl_src_fp, chnl_dst_fp), '%s, i.e. stop; or echo traffic on %s here (default) or on %s' % (fall_arg, chnl_src_fp, chnl_dst_fp)] cred_cmd = [cred_kw, '', 'credits'] echo_cmd = [echo_kw, '%s' % str_fp, 'have the bot say %s' % str_fp] echos_cmd = [echos_kw, '%s' % data_fp, 'send %s' % data_fp] elem_cmd = [elem_kw, '%s' % city_fp, 'print weather in %s' % city_fp] fifo_cmd = [fifo_kw, '', "read from FIFO"] hi_cmd = [hi_kw, '', 'say hi to the bot'] hist_cmd = [hist_kw, '[%s [, ...]]' % tech_fp, 'print computer history database URL or item %s' % tech_fp] ip_cmd = [ip_kw, '', "print the bot's IP"] join_cmd = [join_kw, '%s' % chnl_fp, 'have the bot join channel %s' % chnl_fp] junior_cmd = [junior_kw, '[%s]' % senior_arg, 'ask this person for help (or the most recent %s)' % senior_arg] kill_cmd = [kill_kw, '', 'kill the bot'] leet_cmd = [leet_kw, '[%s] %s' % (rev_arg, str_fp), 'translate (or %s) %s from h4x0r)' % (rev_arg, str_fp)] name_cmd = [name_kw, '', "about the bot's name"] nick_cmd = [nick_kw, '%s' % nick_fp, 'set bot %s' % nick_fp] r_cmd = [r_kw, '', "execute (repeat) the channel's previous bot command"] rand_cmd = [rand_kw, '', 'print a random number'] rules_cmd = [rules_kw, '', 'print channel guidelines URL'] plonk_cmd = [plonk_kw, '', 'print how to not annoy/be annoyed with the bot'] senior_cmd = [senior_kw, '[%s | %s]' % (pos_fp, nick_fp), "print %s's ancienty %s (put %s as %s)" % (nick_fp, pos_fp, usr_arg, dot_str(usr_arg))] src_cmd = [src_kw, '', 'print source URLs'] stats_cmd = [stats_kw, '', 'print system info'] time_cmd = [time_kw, '[%s]' % tz_fp, 'print time in timezone %s' % tz_fp] ver_cmd = [ver_kw, '', "print version"] cmds = [ alias_cmd, alice_cmd, ascii_cmd, bem_cmd, bridge_cmd, cred_cmd, echo_cmd, echos_cmd, elem_cmd, fifo_cmd, help_cmd, hi_cmd, hist_cmd, ip_cmd, join_cmd, junior_cmd, kill_cmd, leet_cmd, name_cmd, nick_cmd, plonk_cmd, r_cmd, rand_cmd, rules_cmd, senior_cmd, src_cmd, stats_cmd, time_cmd, ver_cmd ] cmds.sort() class Bot: def __init__(self, channel = opts.channel, encode = opts.encode, nick = opts.nick, port = opts.port, server = opts.server, do_send = False, do_get = False, echo_day = False, check_img = False): self.debug = opts.debug self.dry = opts.dry self.log_file = opts.log_file self.log_lvl = opts.log_level self.track_msgs = opts.track_messages self.ver = opts.version self.verb_lvl = opts.verbose_level self.channel = channel self.encode = encode self.nick = nick self.port = port self.server = server self.socket = '' self.ghost_channel = '#ghost' self.ghost_channel_2 = '#ghost-reporting' self.do_send = do_send self.do_get = do_get self.bridge_src = '' self.bridge_dst = '' self.data_dir = 'data/' self.ascii_dir = self.data_dir + 'ascii/' self.aliases = [] self.alias_db = self.data_dir + alias_kw + '.db' self.bot_cmd_p = ',' self.bot_cmd_pf = self.bot_cmd_p + self.nick self.bot_cmd_pf_alt = self.bot_cmd_p * 2 self.last_cmd = {} self.rules_url = 'https://dataswamp.org/doc/channel_guidelines.txt' self.scripts_loc = 'scripts/' self.timestamp = '%Y-%m-%dT%H:%M:%S' self.users = [] self.users_file = self.data_dir + 'users.txt' self.zsh = '/usr/local/bin/zsh' self.check_img = check_img self.img_path = r'/var/www/htdocs/dataswamp.org/~incal/show' self.len_ori = len(os.listdir(self.img_path)) self.echo_day = echo_day self.day = day(time.gmtime()) if self.ver: print(__version__) sys.exit() else: self.create_dirs() self.init_users() def img_check(self, channel): if self.check_img: len_fin = len(os.listdir(self.img_path)) if self.len_ori < len_fin: msg = "material sent to https://dataswamp.org/~incal/show/ ..." self.send_privmsg(channel, msg) for file in os.listdir(self.img_path): os.chmod(os.path.join(self.img_path, file), 0o644) self.len_ori = len_fin def fifo(self, channel): fifo = 'FIFO' res = None try: client_file = os.open(fifo, os.O_RDONLY | os.O_NONBLOCK) except OSError as exc: if exc.errno == errno.ENOENT: client_file = None else: raise if client_file is not None: try: rlist = [client_file] wlist = [] xlist = [] rlist, wlist, xlist = select.select(rlist, wlist, xlist, 0.01) if client_file in rlist: res = os.read(client_file, 1024) except OSError as exc: if exc.errno == errno.EAGAIN or exc.errno == errno.EWOULDBLOCK: res = None else: raise os.close(client_file) if res: msgs = str.splitlines(res.decode('utf-8')) for m in msgs: if m: self.send_privmsg(channel, m) def ascii(self, ascii_name, force, channel): msg = '' if not ascii_name: msg = ' '.join(sorted(os.listdir(self.ascii_dir))) elif not force and channel != self.ghost_channel: msg = f"Use {self.ghost_channel} for this command (or append 'force')." else: ascii_file = self.ascii_dir + ascii_name if os.path.isfile(ascii_file): with open(ascii_file, 'r') as fp: lines = fp.readlines() for l in lines: self.send_privmsg(channel, l) else: msg = f"ASCII file not found: {ascii_file}" if msg: self.send_privmsg(channel, msg) def alias_dump(self): with open(self.alias_db, 'wb+') as file: file.write(pickle.dumps(self.aliases)) def alias_load(self): try: with open(self.alias_db, 'rb') as file: self.aliases = pickle.load(file) except IOError as e: self.error(e, f'with file ${self.alias_db}') def alias_print_alias(self, alias, channel): if self.aliases: for [a, f] in self.aliases: if a == alias: s = f"{a} - {f}" self.send_privmsg(channel, s) return s = f"undefined alias: {alias}" self.send_privmsg(channel, s) def alias_print(self, channel): if channel not in (self.ghost_channel, self.ghost_channel_2): s = f"(see {self.ghost_channel} for output)" self.send_privmsg(channel, s) channel = self.ghost_channel self.aliases = sorted(self.aliases) if self.aliases: for [a, f] in self.aliases: s = f"{a} - {f}" self.send_privmsg(channel, s) sleep() def alias_add(self, a, f): if a != alias_kw: old = list(filter(lambda ali: ali[0] != a, self.aliases)) self.aliases = old + [[a, f]] self.alias_dump() def alias_clear(self): self.aliases = [] def alias_lookup(self, cmd): key = '' keylen = len(key) cmdlen = len(cmd) for [a, f] in self.aliases: key = a keylen = len(key) if cmd.find(key) == 0 and (cmdlen == keylen or cmd[keylen] == ' '): return cmd.replace(key, f, 1) return cmd def alias_rm(self, alias): new = list(filter(lambda ai: ai[0] != alias, self.aliases)) if new != self.aliases: self.aliases = new self.alias_dump() def url_title(self, url, channel): # https://dataswamp.org/~incal/sth/scripts/url2title script = self.scripts_loc + 'url2title.py' py3 = '/usr/local/bin/python3' self.shell_cmd([py3, script, url], channel) def create_dirs(self): if not os.path.exists(self.data_dir): os.mkdir(self.data_dir) if not os.path.exists(self.ascii_dir): os.mkdir(self.ascii_dir) def greet(self, nick, channel): if self.server == dataswamp_server and channel == dataswamp_channel: greet = f'{nick}: welcome to {channel} <{self.rules_url}>' self.send_privmsg(channel, greet) def add_users(self, users, channel=''): usrs = [u.replace('@','') for u in users] usrs = [u.replace('+','') for u in usrs] if channel: for u in usrs: last_char = u[-1] if last_char in ('_', '1'): look_for = u[:-1] else: look_for = u if not look_for in self.users: self.greet(u, channel) self.users += usrs self.users = list(dict.fromkeys(self.users)) self.users.sort(key=str.lower) self.save_users() def add_user(self, user, channel): self.add_users([user], channel) def init_users(self): if not os.path.exists(self.users_file): with open(self.users_file, 'w'): pass if os.path.getsize(self.users_file) > 0: user_arr = numpy.loadtxt(self.users_file, dtype=str) self.add_users(user_arr.tolist()) def save_users(self): f = open(self.users_file, 'w') write_str = ' '.join(str(u) for u in self.users) print(write_str, file=f) f.close() def shell_cmd(self, cmd, channel=''): res = '' out = '' try: res = subprocess.run(cmd, stdout=subprocess.PIPE, check=True) except IOError as e: self.error(e, f'{cmd[0]} not found?', channel) except subprocess.CalledProcessError as e: self.error(e, f'{cmd[0]} crashed', channel) if res and res != '\x0A': out = res.stdout.decode('utf-8') out = html.parser.unescape(out) if out and out != '\x0A' and channel: print(f"sending {out} to channel") self.send_privmsg(channel, out) return out def send_privmsg_many(self, channel, msg, delim='\n'): msgs = msg.split(delim) for m in msgs: self.send_privmsg(channel, m) def send_privmsg(self, channel, msg): # https://tools.ietf.org/rfc/rfc1459.txt - sect. 2.3.1, item 2 max_len = 256 lines = [] m = make_printable(msg) if m: if len(m) > max_len: lines = textwrap.wrap(m, max_len, break_long_words=False) for l in lines: self.send(f"PRIVMSG {channel} :{l}") sleep() else: self.send(f"PRIVMSG {channel} :{m}") def make_full_docstring(self, data): ds = f"{self.bot_cmd_pf} {data[0]}" if data[1]: ds += ' ' + data[1] if data[2]: ds += ' ' + data[2] return ds def help_cmd_f(self, cmd, help_c, nick, channel): help_msg = '' search_key = cmd[len(help_c[0]) + 1:] if search_key: if search_key == all_arg: ds = '' for c in cmds: ds = self.make_full_docstring(c) self.send_privmsg(nick, ds) sleep() elif search_key == list_arg: for c in cmds: help_msg += c[0] + ' ' help_msg += f"(try '{self.bot_cmd_pf} help CMD')" help_msg += ' [%s commands]' % len(cmds) elif search_key == 'elem': elem = self.scripts_loc + search_key help_msg = self.shell_cmd([self.zsh, elem, 'help']) else: found = False for c in cmds: if search_key == c[0]: help_msg = self.make_full_docstring(c) found = True break if not found: help_msg = f"no such command: {search_key}" else: # f-strings done to here help_msg = f"syntax: {self.bot_cmd_pf} CMD[; ...] (or just {self.bot_cmd_pf_alt} CMD) where CMD is: " for c in cmds: help_msg += c[0] if c[1]: help_msg += ' ' + c[1] help_msg += ', ' help_msg = help_msg[:-2] if help_msg: self.send_privmsg(channel, help_msg) def bot_cmd(self, cmd, channel, nick): ip_shell_cmd = ['curl', '-s', 'icanhazip.com'] cmd = cmd.strip() delim_char = ';' if cmd.find(delim_char) != -1: input_cmds = cmd.split(delim_char) for c in input_cmds: self.bot_cmd(c, channel, nick) return if cmd: if cmd[0] == '\\': cmd = cmd[1:] else: cmd = self.alias_lookup(cmd) else: return if cmd.find(nick_kw) == 0: nick = cmd[len(nick_kw) + 1:] if nick: self.nick_set(nick) elif cmd.find(alice_kw) == 0: beg = len(alice_kw) + 1 if beg < len(cmd): args = cmd[beg:] args_l = args.split() nick = args_l[0] msg = args[len(nick) + 1:] if msg: self.send_privmsg(nick, msg) elif cmd.find(fifo_kw) == 0: self.fifo(channel) elif cmd.find(ascii_kw) == 0: args = cmd.split() args_len = len(args) ascii_name = '' force = False if args_len >= 2: ascii_name = args[1] if args_len >= 3: force = True self.ascii(ascii_name, force, channel) elif cmd.find(join_kw) == 0: cnl = cmd[len(join_kw) + 1:] if cnl: self.join(make_channel(cnl)) # keep this before echo elif cmd.find(echos_kw) == 0: send_cmd = cmd[len(echos_kw) + 1:] if send_cmd: self.send(send_cmd) elif cmd.find(echo_kw) == 0: say_what = cmd[len(echo_kw) + 1:] if say_what: self.send_privmsg(channel, say_what) elif cmd.find(alias_kw) == 0: args = cmd[len(alias_kw) + 1:] args_l = args.split() if not args or args_l[0] == list_arg: self.alias_print(channel) elif args_l[0] == rm_arg: for a in args_l[1:]: self.alias_rm(a) elif len(args_l) == 1: ali = args_l[0] self.alias_print_alias(ali, channel) elif len(args_l) >= 2: ali = args_l[0] fun_l = args_l[1:] fun = ' '.join(fun_l) self.alias_add(ali, fun) elif cmd.find(bridge_kw) == 0: start = len(bridge_kw) + 1 if start < len(cmd): args = cmd[start:] args_l = args.split() first = args_l[0] bridge0 = bridge_cmd[0] reset = bridge0.split('|') reset = reset[0] if first == reset: self.bridge_src = '' self.bridge_dst = '' else: self.bridge_src = make_channel(args_l[0]) if len(args_l) == 1: self.bridge_dst = make_channel(channel) else: self.bridge_dst = make_channel(args_l[1]) if self.bridge_src and self.bridge_dst: self.join(self.bridge_src) self.join(self.bridge_dst) elif cmd.find(elem_kw) == 0: city = cmd[len(elem_kw) + 1:] if city: elem = self.scripts_loc + elem_cmd[0] res = self.shell_cmd([self.zsh, elem, city]) if res: self.send_privmsg(channel, res) elif cmd.find(senior_kw) == 0: arg = cmd[len(senior_kw) + 1:] senior = self.scripts_loc + senior_cmd[0] res = self.shell_cmd([self.zsh, senior, arg]) if res: self.send_privmsg(channel, res) elif cmd == stats_kw: # https://dataswamp.org/~incal/sth/scripts/stats stats = self.scripts_loc + stats_kw res = self.shell_cmd([self.zsh, stats]) if res: self.send_privmsg(channel, res) elif cmd.find(leet_kw) == 0: s = cmd[len(leet_kw) + 1:] out = '' if s.find(rev_arg) == 0: s = s[len(rev_arg) + 1:] out = leet(s, True) else: out = leet(s) self.send_privmsg(channel, out) elif cmd.find(bem_kw) == 0: style = cmd[len(bem_kw) + 1:] usrs = [] new = '' if style == dot_arg: for u in self.users: new = dot_str(u) usrs.append(new) else: for u in self.users: new = leet(u) if new == u: new = dot_str(u) usrs.append(new) usrs_str = ' '.join(sorted(usrs, key=str.lower)) l = len(usrs) out = '%s (%s BEMs)' % (usrs_str, l) self.send_privmsg(channel, out) elif cmd == r_kw: cmd_last = self.last_cmd.get(channel) if cmd_last: self.bot_cmd(cmd_last, channel, nick) else: self.bot_cmd("(no previous command!)", channel, nick) return elif cmd == cred_kw: cred = ' '.join(sorted(__credits__, key=str.lower)) self.send_privmsg(nick, cred) elif cmd.find(help_kw) == 0: self.help_cmd_f(cmd, help_cmd, nick, channel) elif cmd.find(junior_kw) == 0: # https://dataswamp.org/~incal/sth/scripts/junior jnr = self.scripts_loc + junior_kw sen = cmd[len(junior_kw) + 1:] res = self.shell_cmd([self.zsh, jnr, sen]) if res: self.send_privmsg(channel, res) elif cmd.find(hist_kw) == 0: # https://dataswamp.org/~incal/sth/scripts/hist tech = cmd[len(hist_kw) + 1:] techs = [] if not tech: techs = ['db'] else: techs = tech.split(',') for t in techs: ts = t.strip() hist = self.scripts_loc + hist_kw res = self.shell_cmd([self.zsh, hist, ts]) if res: self.send_privmsg_many(channel, res) else: self.send_privmsg(channel, f"{ts}: no hit") elif cmd == hi_kw or cmd.find(f"{hi_kw} ") == 0: end = '' rand = random.randint(0, 9) if rand == 0: end = ' :$' self.send_privmsg(channel, f"{hi_kw} {nick}{end}") elif cmd == ip_kw: self.shell_cmd(ip_shell_cmd, channel) elif cmd == kill_kw: threads.set() elif cmd == name_kw: if self.nick == sth_nick: out = 'Swamp Thing ' elif self.nick == tess_nick: out = 'Tess Mattisson ' if out: self.send_privmsg(channel, out) elif cmd == rand_kw: out = '%d' % random.randint(0, sys.maxsize) self.send_privmsg(channel, out) elif cmd == rules_kw: out = self.rules_url self.send_privmsg(channel, out) elif cmd == plonk_kw: plonk_msg = "Use " + self.ghost_channel + " to test and play with the bot. In catgirl, put 'ignore = ' + sth_nick + ' and 'ignore = * PRIVMSG * ,, *' in $XDG_CONFIG_HOME/catgirl or ~/.config/catgirl to block the bot including other people's use of it. In ERC, do (setq-default erc-ignore-list `(,@erc-ignore-list \"' + sth_nick + '\"))" self.send_privmsg(channel, plonk_msg) elif cmd.find(src_kw) == 0: arg = cmd[len(src_kw) + 1:] url = 'https://dataswamp.org/~incal' dir_url = url + '/' + program_name + '/' scripts_url = dir_url + self.scripts_loc scripts = [s for s in sorted(os.listdir(self.scripts_loc)) if os.path.isfile(os.path.join(self.scripts_loc, s))] if not arg: bot_sec = url + '/#bot' bot_src = program_name + '.py' bot_url = dir_url + bot_src bot_lines = file_lines(bot_src) outp = '%s %s (%d lines) ' % (bot_sec, bot_url, bot_lines) outp += scripts_url + 'SCRIPT where SCRIPT is ' # https://stackoverflow.com/questions/3207219/how-do-i-list-all-files-of-a-directory total_lines = bot_lines lines = 0 for s in scripts: sp = self.scripts_loc + s lines = file_lines(sp) total_lines += lines outp += '%s (%d) ' % (s, lines) outp += 'total %d' % total_lines elif arg in scripts: outp = scripts_url + arg else: outp = 'no script: ' + arg self.send_privmsg(channel, outp) elif cmd.find(time_kw) == 0: # https://stackoverflow.com/a/1398742 tz_arg = cmd[len(time_kw) + 1:] now = time.time() if tz_arg: os.environ['TZ'] = tz_arg time.tzset() t = time.strftime('%Y-%m-%d %H:%M:%S %Z (%j)', time.localtime(now)) self.send_privmsg(channel, t) elif cmd == ver_kw: self.send_privmsg(channel, __version__) else: py_cmd_file = f'{pymods_dir}/{cmd}.py' sh_cmd_file = self.scripts_loc + cmd if os.path.isfile(py_cmd_file): mod = __import__(cmd) importlib.reload(mod) res = mod.engage() self.send_privmsg(channel, res) elif os.path.isfile(sh_cmd_file): res = self.shell_cmd([self.zsh, sh_cmd_file]) if res: self.send_privmsg(channel, res) else: self.send_privmsg(channel, 'command DNC') self.last_cmd[channel] = cmd def err(self, reason): self.output(reason, 0, 'err') def log(self, entry, msg_lvl=0): if self.log_file and msg_lvl <= self.log_lvl: entry = entry + '\n' f = open(self.log_file, 'a') f.write(entry) f.close() def output(self, text, msg_lvl=2, from_where=''): # messages verbosity level: # 0 - error, the program terminates # 1 - warning # 2 - some info (default) # 3 - all info ts_prefix = time.strftime(self.timestamp) + ' ' err_prefix = 'ERR: ' wrn_prefix = 'WRN: ' prefix = ts_prefix if msg_lvl == 0: prefix += err_prefix elif msg_lvl == 1: prefix += wrn_prefix if self.track_msgs and from_where: prefix += ' [' + from_where + '] ' out = prefix + text self.log(out, msg_lvl) if msg_lvl <= self.verb_lvl: print(out) def send(self, data): try: self.socket.sendall((data + '\r\n').encode()) except ssl.SSLError as e: self.error(e, f'on data {data}') except socket.timeout as e: self.error(e, f'on data {data}') def error(self, e='', desc='', channel=''): pre = 'error' if e: pre += ': %s' % e.__class__.__name__ pre = f'[{pre}]' if desc: errstr = f'{pre} {desc}' else: errstr = pre if errstr and channel: self.send_privmsg(channel, errstr) elif errstr: print(errstr) def recv(self): here = inspect.currentframe().f_code.co_name try: bts = self.socket.recv(4096) except socket.timeout: return if bts: decode = '' try: decode = bts.decode() except UnicodeDecodeError as e: self.error(e, f'non-UTF-8 char(s)?') if decode: lines = decode.splitlines() for l in lines: if l: self.output(l, from_where=here) self.init_process(l) def connect(self, server, port, nick, encode): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) try: s.connect((server, port)) except socket.timeout as e: self.error(e, f'connecting to {server}') return if encode: self.socket = ssl.wrap_socket(s) else: self.socket = s self.nick_set(nick) self.send(f'USER {nick} {nick} {nick} :{nick}') def nick_set(self, nick=''): if not nick and self.nick: nick = self.nick if nick: self.send('NICK ' + nick) def nick_set_confirmed(self, nick): self.nick = nick self.bot_cmd_pf = self.bot_cmd_p + self.nick def join(self, channel): self.send('JOIN ' + channel) def names(self, channel): self.send('NAMES ' + channel) def mode(self, channel, nick, mode): self.send('MODE %s %s %s' % (channel, mode, nick)) def voice(self, channel, nick): self.mode(channel, nick, '+v') def get_and_say(self, channel=''): here = inspect.currentframe().f_code.co_name if self.do_get: try: msg = msgs.get(False) except queue.Empty: return if channel and msg: self.output(msg, from_where=here) self.send_privmsg(channel, msg) def init_process(self, msg): here = inspect.currentframe().f_code.co_name tkn = msg.split() if len(tkn) < 2: self.output('msg too small', 3, here) return nick = '' if tkn[0] and tkn[0][0] == ':': prefix = tkn.pop(0)[1:] if '!' in prefix: nick = re.split('!', prefix)[0] else: nick = prefix cmd = tkn.pop(0) params = [] trailing = '' while tkn: if tkn[0] and tkn[0][0] != ':': params.append(tkn.pop(0)) else: tkn[0] = tkn[0][1:] trailing = ' '.join(tkn) break # parse server cmd if cmd: self.output('cmd: ' + cmd, from_where=here) else: self.output('cannot tokenize', 1, here) if cmd == 'NICK' and nick == self.nick and trailing: self.nick_set_confirmed(trailing) elif cmd == 'PING': daemon = trailing self.send('PONG ' + daemon) self.output('pong ' + daemon, from_where=here+'/PING') elif cmd == 'JOIN': if trailing: channel = trailing else: channel = params[0] if channel and nick: self.output(nick + ' joined ' + channel, from_where=here+'/JOIN') self.add_user(nick, channel) elif cmd == 'PART': channel = params[0] out = nick + ' quit ' + channel self.output(out, from_where=here+'/PART') elif cmd == 'PRIVMSG': channel = params[0] usr_msg = trailing chnl_blk = f"[{channel}]" out = f"{nick} {chnl_blk} {usr_msg}" msg = f"({self.server}) {chnl_blk} <{nick}> {usr_msg}" self.output(out, from_where=here+'/PRIVMSG') if self.do_send: msgs.put(msg) if channel == self.bridge_src: self.send_privmsg(self.bridge_dst, msg) bot_cmd_hit = usr_msg.find(self.bot_cmd_pf) bot_cmd_alt_hit = usr_msg.find(self.bot_cmd_pf_alt) if bot_cmd_hit >= 0: cmd_index = bot_cmd_hit + len(self.bot_cmd_pf) + 1 cmd_str = usr_msg[cmd_index:] self.bot_cmd(cmd_str, channel, nick) elif bot_cmd_alt_hit >= 0: empty_cmd = False i = 2 index = bot_cmd_alt_hit + 2 if index <= len(usr_msg) - 1: if usr_msg[index] == ' ': i = 3 else: empty_cmd = True index = bot_cmd_alt_hit + i if index <= len(usr_msg) - 1: cmd_str = usr_msg[bot_cmd_alt_hit + i:] print(cmd_str) self.bot_cmd(cmd_str, channel, nick) else: empty_cmd = True if empty_cmd: self.send_privmsg(channel, 'empty barrels make the most noise') else: urls = find_urls(usr_msg) if urls: urls = list(dict.fromkeys(urls)) for u in urls: self.url_title(u, channel) elif cmd == '266': current = params[1] max_users = params[2] out = 'global users %s, max %s' % (current, max_users) self.output(out, from_where=here+'/266') elif cmd == '353': channel = params[2] users = trailing.split() self.add_users(users, channel) def connect_and_join(self): online = False wait = 1 while not online: try: self.connect(self.server, self.port, self.nick, self.encode) online = True except ConnectionError as e: self.error(e, f'reconnecting in {wait}s ...') wait *= 2 sleep(wait) self.alias_load() self.join(self.channel) if self.nick == sth_nick: self.join(self.ghost_channel) self.join(self.ghost_channel_2) def new_day(self, channel): today = day(time.gmtime()) if self.day != today: self.day = today if self.echo_day: msg = "finally a new day" self.send_privmsg(channel, msg) def main_loop(self): while True: if threads.is_set(): self.send('QUIT') self.socket.close() return if self.recv() == -1: self.connect_and_join() self.get_and_say(dataswamp_channel) self.img_check(dataswamp_channel) self.new_day(dataswamp_channel) sleep(1) two_bots = True if two_bots: bot_do_get = True bot_do_send = True bot1 = Bot(do_get=bot_do_get) bot2 = Bot(nick=tess_nick, server="irc.eu.libera.chat", channel=dataswamp_channel, do_send=bot_do_send) bot1.connect_and_join() bot2.connect_and_join() l1 = threading.Thread(target=bot1.main_loop) l2 = threading.Thread(target=bot2.main_loop) l1.start() l2.start() l1.join() l2.join() else: bot = Bot() bot.connect_and_join() l = threading.Thread(target=bot.main_loop) l.start() l.join()