Fixed AU and UK

This commit is contained in:
Benjamyn Love 2020-05-19 09:48:54 -04:00
parent 772c0d5779
commit a09941ccae
3 changed files with 110 additions and 96 deletions

View File

@ -1,5 +1,5 @@
server = { server = {
"host": "whois.auda.ltd", "host": "whois.auda.org.au",
"redirect": "\s+Whois Server: (.*)", # Should never apply to .au domains "redirect": "\s+Whois Server: (.*)", # Should never apply to .au domains
} }
@ -17,6 +17,7 @@ parse = {
"EligibilityName": "Eligibility Name:(.+)", "EligibilityName": "Eligibility Name:(.+)",
"EligibilityType": "Eligibility Type:(.+)", "EligibilityType": "Eligibility Type:(.+)",
"EligibilityID": "Eligibility ID:(.+)", "EligibilityID": "Eligibility ID:(.+)",
"RegistrantID": "Registrant ID:(.+)",
"AdminID": "Registrant Contact ID:(.+)", "AdminID": "Registrant Contact ID:(.+)",
"AdminName": "Registrant Contact Name:(.+)", "AdminName": "Registrant Contact Name:(.+)",
"TechName": "Tech Contact Name:(.+)", "TechName": "Tech Contact Name:(.+)",

View File

@ -8,7 +8,7 @@ parse = {
"NotFound": "No match for", "NotFound": "No match for",
"DomainName": "Domain name:\s+(.+|\n)", "DomainName": "Domain name:\s+(.+|\n)",
"Registrar": "Registrar:\s+(.+)", "Registrar": "Registrar:\s+(.+)",
"NameServer": "Name servers:\s+(.+)", "NameServer": "Name servers:\s+(.+?\n\n)",
"Status": "Registration status:\s+(.+)", "Status": "Registration status:\s+(.+)",
"UpdatedDate": "Last updated:\s+(.+)", "UpdatedDate": "Last updated:\s+(.+)",
"CreationDate": "Registered on:\s+(.+)", "CreationDate": "Registered on:\s+(.+)",

View File

@ -1,137 +1,150 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# __ __ __ __ ______ __ ______ # __ __ __ __ ______ __ ______
#/\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\ # /\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\
#\ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \ # \ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \
# \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\ # \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\
# \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/ # \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/
import sys import sys
import os import os
import socket import socket
import re import re
import logging import logging
import urllib.request, urllib.parse, urllib.error import urllib.request
import urllib.parse
import urllib.error
from . import flags from . import flags
#import error #import error
#import flags # import flags
class Whois(object): class Whois(object):
def __init__(self, domain, debug=False): def __init__(self, domain, debug=False):
if debug: if debug:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True") logging.debug("__init__: DEBUG is set to True")
self.domain = domain self.domain = domain
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__)) self.tld = self.domain.split(".")[-1]
self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s" self.currPath = os.path.dirname(os.path.realpath(__file__))
%(self.currPath, self.tldPath, self.tldList)) self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
self.settings = {} logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s"
% (self.currPath, self.tldPath, self.tldList))
if self.tld in self.tldList: self.settings = {}
logging.debug("__init__: Loading tld configuration file...")
_settings = {} if self.tld in self.tldList:
exec(compile(open(os.path.join(self.tldPath, self.tld)).read(), os.path.join(self.tldPath, self.tld), 'exec'), {}, _settings) logging.debug("__init__: Loading tld configuration file...")
if "server" in _settings: _settings = {}
logging.debug("__init__: Settings: %s"%(_settings["server"])) exec(compile(open(os.path.join(self.tldPath, self.tld)).read(),
self.settings.update(_settings["server"]) os.path.join(self.tldPath, self.tld), 'exec'), {}, _settings)
else:
logging.debug("__init__: No server settings found")
def chooseServer(self): if "server" in _settings:
'''Choose whois server by detecting tld of given domain.''' logging.debug("__init__: Settings: %s" % (_settings["server"]))
if "host" in self.settings: self.settings.update(_settings["server"])
logging.debug("chooseServer: Whois server addr: %s"%(self.settings["host"])) else:
return self.settings["host"] logging.debug("__init__: No server settings found")
else:
logging.debug("chooseServer: Whois server addr: %s"%(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
def sendHTTPQuery(self, whoisServer): def chooseServer(self):
param = urllib.parse.urlencode({self.settings["http-arg"]: self.domain}) '''Choose whois server by detecting tld of given domain.'''
if "host" in self.settings:
logging.debug("chooseServer: Whois server addr: %s" %
(self.settings["host"]))
return self.settings["host"]
else:
logging.debug("chooseServer: Whois server addr: %s" %
(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
if self.settings.get("http-method").lower() == "post": def sendHTTPQuery(self, whoisServer):
logging.debug("sendHTTPQuery: Connecting to whois server using POST") param = urllib.parse.urlencode(
req = urllib.request.Request(whoisServer, param) {self.settings["http-arg"]: self.domain})
else: # GET
logging.debug("sendHTTPQuery: Connecting to whois server using GET")
req = urllib.request.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
data = urllib.request.urlopen(req).read() if self.settings.get("http-method").lower() == "post":
print(data) logging.debug(
"sendHTTPQuery: Connecting to whois server using POST")
req = urllib.request.Request(whoisServer, param)
else: # GET
logging.debug(
"sendHTTPQuery: Connecting to whois server using GET")
req = urllib.request.Request((whoisServer.endswith(
"?") and whoisServer or whoisServer+"?") + param)
return data data = urllib.request.urlopen(req).read()
print(data)
def sendQuery(self, whoisServer):
'''Send query to whois server.'''
logging.debug("sendQuery: Connecting to whois server")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: return data
s.connect((whoisServer, 43))
except: def sendQuery(self, whoisServer):
# FIXME: Create a exception class for this '''Send query to whois server.'''
logging.error("sendQuery: Error connecting to whois server %s"%(whoisServer)) logging.debug("sendQuery: Connecting to whois server")
return False s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
msg = self.settings['format'][whoisServer].replace("%DOMAIN%", self.domain) + "\r\n" s.connect((whoisServer, 43))
except: except:
msg = self.domain + "\r\n" # FIXME: Create a exception class for this
logging.error(
"sendQuery: Error connecting to whois server %s" % (whoisServer))
return False
logging.debug("sendQuery: Sending data.. %s"%(msg)) try:
# print(msg.encode()) msg = self.settings['format'][whoisServer].replace(
s.send(msg.encode('utf-8')) "%DOMAIN%", self.domain) + "\r\n"
result = ""
while True: except:
buffer = s.recv(512) msg = self.domain + "\r\n"
if not buffer: logging.debug("sendQuery: Sending data.. %s" % (msg))
break # print(msg.encode())
s.send(msg.encode('utf-8'))
result += buffer.decode() result = ""
finalResult = result.replace("\r\n", "\n") while True:
buffer = s.recv(512)
logging.debug("sendQuery: result: %s"%(finalResult)) if not buffer:
break
return finalResult result += buffer.decode()
def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST): finalResult = result.replace("\r\n", "\n")
'''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
if self.settings.get("method") == "http": logging.debug("sendQuery: result: %s" % (finalResult))
result = self.sendHTTPQuery(whoisServer)
else:
result = self.sendQuery(whoisServer)
if redirect and "redirect" in self.settings: return finalResult
logging.debug("query: Redirection found. Connecting to given server address")
redirection = re.findall(self.settings["redirect"], result, re.MULTILINE) def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST):
'''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
while redirection and len(redirection) >= 1: if self.settings.get("method") == "http":
whoisServer = redirection[0] result = self.sendHTTPQuery(whoisServer)
result = self.sendQuery(whoisServer) else:
redirection = re.findall(self.settings["redirect"], result) result = self.sendQuery(whoisServer)
if redirect and "redirect" in self.settings:
logging.debug(
"query: Redirection found. Connecting to given server address")
if return_type == flags.RETURN_TYPE_LIST: redirection = re.findall(
return whoisServer, result self.settings["redirect"], result, re.MULTILINE)
else:
return {"whoisServer": whoisServer, "result": result} while redirection and len(redirection) >= 1:
whoisServer = redirection[0]
result = self.sendQuery(whoisServer)
redirection = re.findall(self.settings["redirect"], result)
if return_type == flags.RETURN_TYPE_LIST:
return whoisServer, result
else:
return {"whoisServer": whoisServer, "result": result}