changed tabs to 4 spaces + some minor fixes in rtxHelpers
This commit is contained in:
parent
267167769d
commit
0876bef079
236
rtxHelpers.py
236
rtxHelpers.py
@ -13,151 +13,151 @@ import logging
|
||||
import config
|
||||
|
||||
def process_byte(in_byte):
|
||||
""" takes the received byte and processes the command line
|
||||
if a new command has been entered, it will call process_command()
|
||||
if a short link has been entered, the link_list of the current page will be searched
|
||||
and the found target will be sent to process_command()
|
||||
"""
|
||||
instr=str(in_byte, encoding="latin-1")
|
||||
|
||||
if config.MODE=="modem":
|
||||
logging.debug("> %s < CD: %s", instr, glob.ser.getCD())
|
||||
""" takes the received byte and processes the command line
|
||||
if a new command has been entered, it will call process_command()
|
||||
if a short link has been entered, the link_list of the current page will be searched
|
||||
and the found target will be sent to process_command()
|
||||
"""
|
||||
instr=str(in_byte, encoding="latin-1")
|
||||
|
||||
if config.MODE=="modem":
|
||||
logging.debug("> %s < CD: %s", instr, glob.ser.getCD())
|
||||
|
||||
# what to send back to the client
|
||||
if instr == cept.INI:
|
||||
logging.debug("INI")
|
||||
echo_char = "*"
|
||||
elif instr == cept.TER:
|
||||
logging.debug("TER")
|
||||
echo_char = "#"
|
||||
else:
|
||||
echo_char=instr
|
||||
# what to send back to the client
|
||||
if instr == cept.INI:
|
||||
logging.debug("INI")
|
||||
echo_char = "*"
|
||||
elif instr == cept.TER:
|
||||
logging.debug("TER")
|
||||
echo_char = "#"
|
||||
else:
|
||||
echo_char=instr
|
||||
|
||||
# start of line: clear rest of cmdline
|
||||
if len(glob.cmdline) == 0:
|
||||
glob.ser.write(bytes(cept.CLEARLINE, "latin-1"))
|
||||
# start of line: clear rest of cmdline
|
||||
if len(glob.cmdline) == 0:
|
||||
glob.ser.write(bytes(cept.CLEARLINE, "latin-1"))
|
||||
|
||||
# echo the character to the client
|
||||
glob.ser.write(bytes(echo_char, "latin-1"))
|
||||
# echo the character to the client
|
||||
glob.ser.write(bytes(echo_char, "latin-1"))
|
||||
|
||||
glob.cmdline += instr
|
||||
glob.cmdline += instr
|
||||
|
||||
# Backspace:
|
||||
if instr == cept.BSP:
|
||||
glob.cmdline = glob.cmdline[:-1]
|
||||
return
|
||||
# Backspace:
|
||||
if instr == cept.BSP:
|
||||
glob.cmdline = glob.cmdline[:-1]
|
||||
return
|
||||
|
||||
logging.debug("Cmdline: >%s<", glob.cmdline)
|
||||
logging.debug("Cmdline: >%s<", glob.cmdline)
|
||||
|
||||
# process links inside the page:
|
||||
if ( len(glob.cmdline) == 1 or len(glob.cmdline) == 2 ) and glob.curpage.get_link(glob.cmdline):
|
||||
glob.cmdline = cept.INI + glob.curpage.get_link(glob.cmdline) + cept.TER
|
||||
process_command(glob.cmdline)
|
||||
return
|
||||
# process links inside the page:
|
||||
if ( len(glob.cmdline) == 1 or len(glob.cmdline) == 2 ) and glob.curpage.get_link(glob.cmdline):
|
||||
glob.cmdline = cept.INI + glob.curpage.get_link(glob.cmdline) + cept.TER
|
||||
process_command(glob.cmdline)
|
||||
return
|
||||
|
||||
# reset cmdline:
|
||||
if instr == cept.INI and glob.cmdline[-2:] == cept.INI+cept.INI:
|
||||
logging.debug("reset cmdline")
|
||||
glob.cmdline = ""
|
||||
glob.ser.write(bytes(cept.clear_line24, "latin-1"))
|
||||
return
|
||||
# reset cmdline:
|
||||
if instr == cept.INI and glob.cmdline[-2:] == cept.INI+cept.INI:
|
||||
logging.debug("reset cmdline")
|
||||
glob.cmdline = ""
|
||||
glob.ser.write(bytes(cept.clear_line24, "latin-1"))
|
||||
return
|
||||
|
||||
# goto last page:
|
||||
if instr == cept.TER and glob.cmdline == cept.INI+cept.TER:
|
||||
if glob.prevpage.get_page_id() == -1:
|
||||
glob.cmdline = ""
|
||||
send_error("Keine vorige Seite vorhanden.")
|
||||
return
|
||||
glob.cmdline = "" # reset cmdline
|
||||
goto_last_page()
|
||||
return
|
||||
# goto last page:
|
||||
if instr == cept.TER and glob.cmdline == cept.INI+cept.TER:
|
||||
if glob.prevpage.get_page_id() == -1:
|
||||
glob.cmdline = ""
|
||||
send_error("Keine vorige Seite vorhanden.")
|
||||
return
|
||||
glob.cmdline = "" # reset cmdline
|
||||
goto_last_page()
|
||||
return
|
||||
|
||||
# "Make it so":
|
||||
if instr == cept.TER:
|
||||
# or instr == cept.SEND or instr == cept.CR:
|
||||
logging.info("Kommando: >%s<", glob.cmdline)
|
||||
process_command(glob.cmdline)
|
||||
return
|
||||
# "Make it so":
|
||||
if instr == cept.TER:
|
||||
# or instr == cept.SEND or instr == cept.CR:
|
||||
logging.info("Kommando: >%s<", glob.cmdline)
|
||||
process_command(glob.cmdline)
|
||||
return
|
||||
|
||||
def process_command(cmd):
|
||||
""" processes the given command line
|
||||
"""
|
||||
""" processes the given command line
|
||||
"""
|
||||
|
||||
# reset cmdline
|
||||
glob.cmdline = ""
|
||||
# reset cmdline
|
||||
glob.cmdline = ""
|
||||
|
||||
# process *cmdline#, normally a page-loading command:
|
||||
if cmd[0] == cept.INI and cmd[-1:] == cept.TER:
|
||||
pgnr = cmd[1:-1]
|
||||
check_and_send_page(pgnr, "Die Seite kann nicht gefunden werden.")
|
||||
# process *cmdline#, normally a page-loading command:
|
||||
if cmd[0] == cept.INI and cmd[-1:] == cept.TER:
|
||||
pgnr = cmd[1:-1]
|
||||
check_and_send_page(pgnr, "Die Seite kann nicht gefunden werden.")
|
||||
|
||||
# if only # is entered, either go to the link behind '#'
|
||||
# or to the next sub-page (2000a => 2000b) if it exists
|
||||
if cmd == cept.TER:
|
||||
if glob.curpage.get_link("#"):
|
||||
pgnr = glob.curpage.get_link("#")
|
||||
check_and_send_page(pgnr, "Die Seite kann nicht gefunden werden.")
|
||||
else:
|
||||
# check if the next page exists, e.g. 2000a => 2000b
|
||||
curid = glob.curpage.get_page_id()
|
||||
if curid[-1:].isalpha():
|
||||
# aus 2000a 2000b machen:
|
||||
nextid = curid[:-1] + chr(ord(curid[-1:])+1)
|
||||
check_and_send_page(nextid, "Keine n"+str(cept['UMLAUT'])+"achste Seite gefunden.")
|
||||
# if only # is entered, either go to the link behind '#'
|
||||
# or to the next sub-page (2000a => 2000b) if it exists
|
||||
if cmd == cept.TER:
|
||||
if glob.curpage.get_link("#"):
|
||||
pgnr = glob.curpage.get_link("#")
|
||||
check_and_send_page(pgnr, "Die Seite kann nicht gefunden werden.")
|
||||
else:
|
||||
# check if the next page exists, e.g. 2000a => 2000b
|
||||
curid = glob.curpage.get_page_id()
|
||||
if curid[-1:].isalpha():
|
||||
# aus 2000a 2000b machen:
|
||||
nextid = curid[:-1] + chr(ord(curid[-1:])+1)
|
||||
check_and_send_page(nextid, "Keine n"+str(cept['UMLAUT'])+"achste Seite gefunden.")
|
||||
|
||||
# special commands:
|
||||
if cmd == "!1" + cept.TER:
|
||||
send_welcome_page()
|
||||
return
|
||||
if cmd == "!2" + cept.TER:
|
||||
glob.ser.write(bytes(cept.init_screen, "latin-1"))
|
||||
return
|
||||
# special commands:
|
||||
if cmd == "!1" + cept.TER:
|
||||
send_welcome_page()
|
||||
return
|
||||
if cmd == "!2" + cept.TER:
|
||||
glob.ser.write(bytes(cept.init_screen, "latin-1"))
|
||||
return
|
||||
|
||||
def goto_last_page():
|
||||
""" go back to the previous page and make the current page the previous page
|
||||
"""
|
||||
logging.info("Letzte Seite: %s", str(glob.prevpage.get_page_id()))
|
||||
""" go back to the previous page and make the current page the previous page
|
||||
"""
|
||||
logging.info("Letzte Seite: %s", str(glob.prevpage.get_page_id()))
|
||||
|
||||
# swap prevpage and curpage
|
||||
temp = glob.curpage
|
||||
glob.curpage = glob.prevpage
|
||||
glob.prevpage = temp
|
||||
# swap prevpage and curpage
|
||||
temp = glob.curpage
|
||||
glob.curpage = glob.prevpage
|
||||
glob.prevpage = temp
|
||||
|
||||
# send the previous page (now curpage) to the client
|
||||
send_page(glob.curpage)
|
||||
# send the previous page (now curpage) to the client
|
||||
send_page(glob.curpage)
|
||||
|
||||
def send_welcome_page():
|
||||
""" get the btxlogo, make a rtxPage of it and send it to the client """
|
||||
wpage = rtxPage()
|
||||
wpage.set_page(bytes(cept.btxlogo, "latin-1"))
|
||||
send_page(wpage)
|
||||
""" get the btxlogo, make a rtxPage of it and send it to the client """
|
||||
wpage = rtxPage()
|
||||
wpage.set_page(bytes(cept.btxlogo, "latin-1"))
|
||||
send_page(wpage)
|
||||
|
||||
def check_and_send_page(pgnr, errormsg):
|
||||
""" checks if the page "pgnr" exists
|
||||
if yes, it loads it into curpage and sends it to the client
|
||||
if not, it sends the error message "errormsg" to the client
|
||||
"""
|
||||
logging.info("suche Seite: >%s<", pgnr)
|
||||
""" checks if the page "pgnr" exists
|
||||
if yes, it loads it into curpage and sends it to the client
|
||||
if not, it sends the error message "errormsg" to the client
|
||||
"""
|
||||
logging.info("suche Seite: >%s<", pgnr)
|
||||
|
||||
if rtxPage.exists(pgnr):
|
||||
glob.prevpage = glob.curpage
|
||||
glob.curpage = rtxPage(pgnr)
|
||||
send_page(glob.curpage) # takes a rtxPage object!
|
||||
return True
|
||||
elif pgnr[-1:].isnumeric() and rtxPage.exists(pgnr + "a"):
|
||||
glob.prevpage = glob.curpage
|
||||
glob.curpage = rtxPage(pgnr + "a")
|
||||
send_page(glob.curpage) # takes a rtxPage object!
|
||||
return True
|
||||
else:
|
||||
send_error(errormsg)
|
||||
return False
|
||||
if rtxPage.exists(pgnr):
|
||||
glob.prevpage = glob.curpage
|
||||
glob.curpage = rtxPage(pgnr)
|
||||
send_page(glob.curpage) # takes a rtxPage object!
|
||||
return True
|
||||
elif pgnr[-1:].isnumeric() and rtxPage.exists(pgnr + "a"):
|
||||
glob.prevpage = glob.curpage
|
||||
glob.curpage = rtxPage(pgnr + "a")
|
||||
send_page(glob.curpage) # takes a rtxPage object!
|
||||
return True
|
||||
else:
|
||||
send_error(errormsg)
|
||||
return False
|
||||
|
||||
def send_error(msg):
|
||||
""" send an error message to the client """
|
||||
glob.ser.write(bytes(cept.error_prefix + msg + cept.error_suffix, "latin-1"))
|
||||
""" send an error message to the client """
|
||||
glob.ser.write(bytes(cept.error_prefix + msg + cept.error_suffix, "latin-1"))
|
||||
|
||||
def send_page(page):
|
||||
""" sends a page to the client """
|
||||
glob.ser.write(page.get_page())
|
||||
""" sends a page to the client """
|
||||
glob.ser.write(page.get_page())
|
||||
|
||||
|
84
rtxModem.py
84
rtxModem.py
@ -12,51 +12,51 @@ import config
|
||||
import logging
|
||||
|
||||
def init_modem():
|
||||
logging.debug("clearing input/output buffers")
|
||||
glob.ser.reset_input_buffer()
|
||||
glob.ser.reset_output_buffer()
|
||||
""" send the modem init strings to the modem """
|
||||
logging.debug("init_modem")
|
||||
if config.MODEM_INIT1 != "":
|
||||
logging.debug("sending MODEM_INIT1")
|
||||
glob.ser.write(bytes(config.MODEM_INIT1 + "\r", "latin-1"))
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT1: %s", rc)
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT1: %s", rc)
|
||||
logging.debug("clearing input/output buffers")
|
||||
glob.ser.reset_input_buffer()
|
||||
glob.ser.reset_output_buffer()
|
||||
""" send the modem init strings to the modem """
|
||||
logging.debug("init_modem")
|
||||
if config.MODEM_INIT1 != "":
|
||||
logging.debug("sending MODEM_INIT1")
|
||||
glob.ser.write(bytes(config.MODEM_INIT1 + "\r", "latin-1"))
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT1: %s", rc)
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT1: %s", rc)
|
||||
|
||||
if config.MODEM_INIT2 != "":
|
||||
logging.debug("sending MODEM_INIT2")
|
||||
glob.ser.write(bytes(config.MODEM_INIT2 + "\r", "latin-1"))
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT2: %s", rc)
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT2: %s", rc)
|
||||
if config.MODEM_INIT2 != "":
|
||||
logging.debug("sending MODEM_INIT2")
|
||||
glob.ser.write(bytes(config.MODEM_INIT2 + "\r", "latin-1"))
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT2: %s", rc)
|
||||
rc = _serial_readline()
|
||||
logging.info("MODEM_INIT2: %s", rc)
|
||||
|
||||
def wait_for_caller():
|
||||
""" waits for the RING of the modem and answers it """
|
||||
modem_answered = False
|
||||
logging.debug("wait_for_caller")
|
||||
while not modem_answered:
|
||||
rc = _serial_readline()
|
||||
if rc.startswith(config.MODEM_RING):
|
||||
logging.info("modem ringing! Answering.")
|
||||
glob.ser.write(bytes(config.MODEM_ANSWER + "\r", "latin-1"))
|
||||
rc = _serial_readline()
|
||||
logging.info("Answer: %s", rc)
|
||||
rc = _serial_readline()
|
||||
logging.info("Answer: %s", rc)
|
||||
modem_answered=True
|
||||
else:
|
||||
modem_answered=False
|
||||
""" waits for the RING of the modem and answers it """
|
||||
modem_answered = False
|
||||
logging.debug("wait_for_caller")
|
||||
while not modem_answered:
|
||||
rc = _serial_readline()
|
||||
if rc.startswith(config.MODEM_RING):
|
||||
logging.info("modem ringing! Answering.")
|
||||
glob.ser.write(bytes(config.MODEM_ANSWER + "\r", "latin-1"))
|
||||
rc = _serial_readline()
|
||||
logging.info("Answer: %s", rc)
|
||||
rc = _serial_readline()
|
||||
logging.info("Answer: %s", rc)
|
||||
modem_answered=True
|
||||
else:
|
||||
modem_answered=False
|
||||
|
||||
def _serial_readline():
|
||||
line = ""
|
||||
in_byte = ""
|
||||
|
||||
while not in_byte == b"\n":
|
||||
in_byte = glob.ser.read(1)
|
||||
#print(in_byte)
|
||||
line += str(in_byte, encoding="latin-1")
|
||||
line = ""
|
||||
in_byte = ""
|
||||
|
||||
while not in_byte == b"\n":
|
||||
in_byte = glob.ser.read(1)
|
||||
#print(in_byte)
|
||||
line += str(in_byte, encoding="latin-1")
|
||||
|
||||
return line
|
||||
return line
|
||||
|
172
rtxPage.py
172
rtxPage.py
@ -12,103 +12,103 @@ import config
|
||||
import logging
|
||||
|
||||
class rtxPage:
|
||||
""" Klasse zur Benutzung einer CEPT-Seite
|
||||
"""
|
||||
""" Klasse zur Benutzung einer CEPT-Seite
|
||||
"""
|
||||
|
||||
die_seite = ""
|
||||
seiten_nummer = -1
|
||||
link_liste = {}
|
||||
die_seite = ""
|
||||
seiten_nummer = -1
|
||||
link_liste = {}
|
||||
|
||||
def __init__(self, page = None):
|
||||
""" create a new page object for the page 'page'
|
||||
returns False if the page cannot be found
|
||||
returns True if the page has been loaded
|
||||
"""
|
||||
if page == None:
|
||||
return
|
||||
else:
|
||||
self.die_seite = ""
|
||||
self.seiten_nummer = -1
|
||||
self.link_liste = {}
|
||||
self._load_page(page)
|
||||
def __init__(self, page = None):
|
||||
""" create a new page object for the page 'page'
|
||||
returns False if the page cannot be found
|
||||
returns True if the page has been loaded
|
||||
"""
|
||||
if page == None:
|
||||
return
|
||||
else:
|
||||
self.die_seite = ""
|
||||
self.seiten_nummer = -1
|
||||
self.link_liste = {}
|
||||
self._load_page(page)
|
||||
|
||||
@staticmethod
|
||||
def exists(page):
|
||||
""" checks if the page (i.e. the file) exists """
|
||||
if os.path.isfile(config.PAGES + page):
|
||||
return True
|
||||
elif os.path.isfile(config.DEMOPAGES + page):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
@staticmethod
|
||||
def exists(page):
|
||||
""" checks if the page (i.e. the file) exists """
|
||||
if os.path.isfile(config.PAGES + page):
|
||||
return True
|
||||
elif os.path.isfile(config.DEMOPAGES + page):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_page(self):
|
||||
""" returns the page if loaded
|
||||
returns False if no page has been loaded
|
||||
"""
|
||||
if self.die_seite != "":
|
||||
return self.die_seite
|
||||
else:
|
||||
return False
|
||||
def get_page(self):
|
||||
""" returns the page if loaded
|
||||
returns False if no page has been loaded
|
||||
"""
|
||||
if self.die_seite != "":
|
||||
return self.die_seite
|
||||
else:
|
||||
return False
|
||||
|
||||
def set_page(self,pagedata):
|
||||
""" write the pagedata as the new page and (probably) the link list """
|
||||
self.die_seite = pagedata
|
||||
self.link_liste = self._get_link_list()
|
||||
def set_page(self,pagedata):
|
||||
""" write the pagedata as the new page and (probably) the link list """
|
||||
self.die_seite = pagedata
|
||||
self.link_liste = self._get_link_list()
|
||||
|
||||
def get_links(self):
|
||||
""" returns the list of links found in this page """
|
||||
return self.link_liste
|
||||
def get_links(self):
|
||||
""" returns the list of links found in this page """
|
||||
return self.link_liste
|
||||
|
||||
def get_link(self, link):
|
||||
""" returns the target for a given link
|
||||
returns False if this link does not exist
|
||||
"""
|
||||
if link in self.link_liste:
|
||||
return self.link_liste[link]
|
||||
else:
|
||||
return False
|
||||
def get_link(self, link):
|
||||
""" returns the target for a given link
|
||||
returns False if this link does not exist
|
||||
"""
|
||||
if link in self.link_liste:
|
||||
return self.link_liste[link]
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_page_id(self):
|
||||
""" returns the id of the current page """
|
||||
return self.seiten_nummer
|
||||
def get_page_id(self):
|
||||
""" returns the id of the current page """
|
||||
return self.seiten_nummer
|
||||
|
||||
def _load_page(self, page):
|
||||
""" Load a CEPT page from the file system
|
||||
returns False if the page cannot be found
|
||||
returns True if the page has been loaded
|
||||
"""
|
||||
if os.path.isfile(config.PAGES + page):
|
||||
filename = config.PAGES + page
|
||||
elif os.path.isfile(config.DEMOPAGES + page):
|
||||
filename = config.DEMOPAGES + page
|
||||
else:
|
||||
return False
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
self.die_seite = f.read()
|
||||
self.seiten_nummer = page
|
||||
link_liste = self._get_link_list()
|
||||
return True
|
||||
def _load_page(self, page):
|
||||
""" Load a CEPT page from the file system
|
||||
returns False if the page cannot be found
|
||||
returns True if the page has been loaded
|
||||
"""
|
||||
if os.path.isfile(config.PAGES + page):
|
||||
filename = config.PAGES + page
|
||||
elif os.path.isfile(config.DEMOPAGES + page):
|
||||
filename = config.DEMOPAGES + page
|
||||
else:
|
||||
return False
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
self.die_seite = f.read()
|
||||
self.seiten_nummer = page
|
||||
link_liste = self._get_link_list()
|
||||
return True
|
||||
|
||||
def _get_link_list(self):
|
||||
""" private function which parses the CEPT page and extracts the link list
|
||||
returns True if links have been found
|
||||
returns False if no links have been found
|
||||
"""
|
||||
links = []
|
||||
links = re.findall("\x1f\x3d([^\x1f\x9b\x1b]+)", str(self.die_seite, "latin-1"))
|
||||
def _get_link_list(self):
|
||||
""" private function which parses the CEPT page and extracts the link list
|
||||
returns True if links have been found
|
||||
returns False if no links have been found
|
||||
"""
|
||||
links = []
|
||||
links = re.findall("\x1f\x3d([^\x1f\x9b\x1b]+)", str(self.die_seite, "latin-1"))
|
||||
|
||||
for item in links:
|
||||
if item[0] != "0":
|
||||
link = item[1:3].strip()
|
||||
target = item[3:].strip()
|
||||
self.link_liste[link] = target
|
||||
for item in links:
|
||||
if item[0] != "0":
|
||||
link = item[1:3].strip()
|
||||
target = item[3:].strip()
|
||||
self.link_liste[link] = target
|
||||
|
||||
logging.info(self.link_liste)
|
||||
logging.info(self.link_liste)
|
||||
|
||||
if self.link_liste == {}:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
if self.link_liste == {}:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user