Merge pull request '2to3port' (#2) from 2to3port into master

This commit is contained in:
Benjamyn Love 2020-10-10 18:48:33 -04:00
commit 0aa3911622
10 changed files with 709 additions and 106 deletions

View File

@ -5,7 +5,7 @@ from setuptools import setup, find_packages
import sys, os import sys, os
__version__ = '0.1' __version__ = '0.2'
setup(name='whois', setup(name='whois',
version=__version__, version=__version__,

View File

@ -6,6 +6,7 @@
# \ \_\\ \_\\"\_\\ \_\ \ \_\ # \ \_\\ \_\\"\_\\ \_\ \ \_\
# \/_/ \/_/ \/_/ \/_/ \/_/ # \/_/ \/_/ \/_/ \/_/ \/_/
from whois import Whois as whois from .whois import Whois
from parser import * # import flags
from flags import * from . import flags
from . import parser

View File

@ -6,7 +6,7 @@
# \ \_\ \ \_\ \_\\ \_\ \_\\/\_____\\ \_____\\ \_\ \_\ # \ \_\ \ \_\ \_\\ \_\ \_\\/\_____\\ \_____\\ \_\ \_\
# \/_/ \/_/\/_/ \/_/ /_/ \/_____/ \/_____/ \/_/ /_/ # \/_/ \/_/\/_/ \/_/ /_/ \/_____/ \/_____/ \/_/ /_/
import error from . import error
import re import re
import sys import sys
import os import os
@ -45,7 +45,7 @@ class Parser(object):
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True") logging.debug("__init__: DEBUG is set to True")
self.domain = unicode(domain, "utf-8").encode("idna") self.domain = str(domain)
if not text: if not text:
raise error.InvalidInputText(text) raise error.InvalidInputText(text)
@ -63,13 +63,13 @@ class Parser(object):
self.parseDefaultConf = {} self.parseDefaultConf = {}
logging.debug("__init__: Loading default tld configuration file") logging.debug("__init__: Loading default tld configuration file")
execfile(os.path.join(self.tldPath, "default"), {}, self.parseDefaultConf) exec(compile(open(os.path.join(self.tldPath, "default")).read(), os.path.join(self.tldPath, "default"), 'exec'), {}, self.parseDefaultConf)
self.parseDefaultConf = self.parseDefaultConf.get("parse") self.parseDefaultConf = self.parseDefaultConf.get("parse")
self.parseConf = {} self.parseConf = {}
try: try:
execfile(os.path.join(self.tldPath, self.tld), {}, self.parseConf) exec(compile(open(os.path.join(self.tldPath, self.tld)).read(), os.path.join(self.tldPath, self.tld), 'exec'), {}, self.parseConf)
self.parseConf = self.parseConf.get("parse") self.parseConf = self.parseConf.get("parse")
@ -113,7 +113,7 @@ class Parser(object):
logging.debug("__init__: Loading configuration file of tld name %s"%(lcTLD)) logging.debug("__init__: Loading configuration file of tld name %s"%(lcTLD))
execfile(os.path.join(self.tldPath, "%s"%(lcTLD)), {}, lcConf) exec(compile(open(os.path.join(self.tldPath, "%s"%(lcTLD))).read(), os.path.join(self.tldPath, "%s"%(lcTLD)), 'exec'), {}, lcConf)
lcConf = lcConf.get("parse") lcConf = lcConf.get("parse")
self.parseConf.update(lcConf.get(lcWS)) self.parseConf.update(lcConf.get(lcWS))
@ -134,8 +134,8 @@ class Parser(object):
for key in self.parseConf: for key in self.parseConf:
matches = re.findall(self.parseConf[key], self.text, re.MULTILINE) matches = re.findall(self.parseConf[key], self.text, re.MULTILINE)
if matches: if matches:
logging.debug("run: regex matches found for key %s. %s"%(key, matches)) logging.debug("run: regex matches found for key %s. %s"%(key, matches))
result.update({key: map(lambda x: x.strip(), matches)}) result.update({key: [x.strip() for x in matches]})
else: else:
logging.debug("run: No match for %s"%(key)) logging.debug("run: No match for %s"%(key))

145
whois/parser.py2 Normal file
View File

@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
# ______ ______ ______ ______ ______ ______
#/\ == \/\ __ \ /\ == \ /\ ___\ /\ ___\ /\ == \
#\ \ _-/\ \ __ \\ \ __< \ \___ \\ \ __\ \ \ __<
# \ \_\ \ \_\ \_\\ \_\ \_\\/\_____\\ \_____\\ \_\ \_\
# \/_/ \/_/\/_/ \/_/ /_/ \/_____/ \/_____/ \/_/ /_/
from . import error
import re
import sys
import os
import time
import logging
def convertDate(s):
"""Convert any date string found in WHOIS to a datetime object.
"""
# Source from https://code.google.com/p/pywhois/source/browse/whois/parser.py
known_formats = [
'%d-%b-%Y', # 02-jan-2000
'%Y-%m-%d', # 2000-01-02
'%d.%m.%Y', # 2.1.2000
'%Y.%m.%d', # 2000.01.02
'%Y/%m/%d', # 2000/01/02
'%d-%b-%Y %H:%M:%S %Z', # 24-Jul-2009 13:20:03 UTC
'%a %b %d %H:%M:%S %Z %Y', # Tue Jun 21 23:59:59 GMT 2011
'%Y-%m-%dT%H:%M:%SZ', # 2007-01-26T19:10:31Z
'%Y. %m. %d.', # 2012. 04. 03. - whois.krnic.net
'%d/%m/%Y %H:%M:%S', # 14/09/2013 00:59:59 - whois.nic.im
'%Y/%m/%d %H:%M:%S (%Z)', # 2012/07/01 01:05:01 (JST) - whois.jprs.jp
]
for known_format in known_formats:
try:
return time.mktime(time.strptime(s.strip(), known_format))
except ValueError as e:
pass # Wrong format, keep trying
return s
class Parser(object):
def __init__(self, domain, text, whoisServer=None, debug=False):
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True")
self.domain = unicode(domain, "utf-8").encode("idna")
if not text:
raise error.InvalidInputText(text)
self.text = text
self.whoisServer = whoisServer and whoisServer or "default"
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__))
self.tldPath = os.path.join(self.currPath, "tlds")
logging.debug("__init__: Setting initial variables...\nself.domain: %s\nself.text = %s\nself.whoisServer = %s\nself.tld = %s\nself.currPath = %s\nself.tldPath = %s"
%(self.domain, self.text, self.whoisServer, self.tld, self.currPath, self.tldPath))
self.parseDefaultConf = {}
logging.debug("__init__: Loading default tld configuration file")
execfile(os.path.join(self.tldPath, "default"), {}, self.parseDefaultConf)
self.parseDefaultConf = self.parseDefaultConf.get("parse")
self.parseConf = {}
try:
execfile(os.path.join(self.tldPath, self.tld), {}, self.parseConf)
self.parseConf = self.parseConf.get("parse")
# THERE IS NO "parse" in the tld config AND THERE IS regex for specified server in default conf
if not self.parseConf and whoisServer not in self.parseDefaultConf:
self.parseConf = self.parseDefaultConf.get("default")
# THERE IS NO "parse" in the tld config
elif not self.parseConf:
self.parseConf = self.parseDefaultConf.get(whoisServer)
# THERE IS "parse" in the tld config AND THERE IS regex for specified server
elif self.whoisServer in self.parseConf:
self.parseConf = self.parseConf.get(self.whoisServer)
# THERE IS "parse" in the tld config AND THERE IS "default" regex in the tld config AND
# THERE IS NO regex for specified server
elif "default" in self.parseConf:
self.parseConf = self.parseConf.get("default")
# THEE IS "parse" in the tld config AND THERE IS NO "default" regex in the tld config
# MAYBE empty file?
else:
self.parseConf = self.parseDefaultConf.get("default")
# Check for LoadConf
_parseConf = self.parseConf
self.parseConf = {}
if "LoadConf" in _parseConf:
logging.debug("__init__: LoadConf found in parser config")
try:
# <tld>/<whois server>
# e.g. org/whois.publicinternetregistry.net
lc = _parseConf["LoadConf"].split("/", 1)
lcTLD = lc[0]
lcWS = lc[1]
lcConf = {}
logging.debug("__init__: Loading configuration file of tld name %s"%(lcTLD))
execfile(os.path.join(self.tldPath, "%s"%(lcTLD)), {}, lcConf)
lcConf = lcConf.get("parse")
self.parseConf.update(lcConf.get(lcWS))
except:
pass
self.parseConf.update(_parseConf)
except:
self.parseConf = self.parseDefaultConf.get("default")
logging.debug("__init__: self.parseConf = %s"%(self.parseConf))
def parse(self):
result = {}
for key in self.parseConf:
matches = re.findall(self.parseConf[key], self.text, re.MULTILINE)
if matches:
logging.debug("run: regex matches found for key %s. %s"%(key, matches))
result.update({key: map(lambda x: x.strip(), matches)})
else:
logging.debug("run: No match for %s"%(key))
return result

145
whois/py3/parser.py Normal file
View File

@ -0,0 +1,145 @@
# -*- coding: utf-8 -*-
# ______ ______ ______ ______ ______ ______
#/\ == \/\ __ \ /\ == \ /\ ___\ /\ ___\ /\ == \
#\ \ _-/\ \ __ \\ \ __< \ \___ \\ \ __\ \ \ __<
# \ \_\ \ \_\ \_\\ \_\ \_\\/\_____\\ \_____\\ \_\ \_\
# \/_/ \/_/\/_/ \/_/ /_/ \/_____/ \/_____/ \/_/ /_/
from . import error
import re
import sys
import os
import time
import logging
def convertDate(s):
"""Convert any date string found in WHOIS to a datetime object.
"""
# Source from https://code.google.com/p/pywhois/source/browse/whois/parser.py
known_formats = [
'%d-%b-%Y', # 02-jan-2000
'%Y-%m-%d', # 2000-01-02
'%d.%m.%Y', # 2.1.2000
'%Y.%m.%d', # 2000.01.02
'%Y/%m/%d', # 2000/01/02
'%d-%b-%Y %H:%M:%S %Z', # 24-Jul-2009 13:20:03 UTC
'%a %b %d %H:%M:%S %Z %Y', # Tue Jun 21 23:59:59 GMT 2011
'%Y-%m-%dT%H:%M:%SZ', # 2007-01-26T19:10:31Z
'%Y. %m. %d.', # 2012. 04. 03. - whois.krnic.net
'%d/%m/%Y %H:%M:%S', # 14/09/2013 00:59:59 - whois.nic.im
'%Y/%m/%d %H:%M:%S (%Z)', # 2012/07/01 01:05:01 (JST) - whois.jprs.jp
]
for known_format in known_formats:
try:
return time.mktime(time.strptime(s.strip(), known_format))
except ValueError as e:
pass # Wrong format, keep trying
return s
class Parser(object):
def __init__(self, domain, text, whoisServer=None, debug=False):
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True")
self.domain = str(domain, "utf-8").encode("idna")
if not text:
raise error.InvalidInputText(text)
self.text = text
self.whoisServer = whoisServer and whoisServer or "default"
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__))
self.tldPath = os.path.join(self.currPath, "tlds")
logging.debug("__init__: Setting initial variables...\nself.domain: %s\nself.text = %s\nself.whoisServer = %s\nself.tld = %s\nself.currPath = %s\nself.tldPath = %s"
%(self.domain, self.text, self.whoisServer, self.tld, self.currPath, self.tldPath))
self.parseDefaultConf = {}
logging.debug("__init__: Loading default tld configuration file")
exec(compile(open(os.path.join(self.tldPath, "default")).read(), os.path.join(self.tldPath, "default"), 'exec'), {}, self.parseDefaultConf)
self.parseDefaultConf = self.parseDefaultConf.get("parse")
self.parseConf = {}
try:
exec(compile(open(os.path.join(self.tldPath, self.tld)).read(), os.path.join(self.tldPath, self.tld), 'exec'), {}, self.parseConf)
self.parseConf = self.parseConf.get("parse")
# THERE IS NO "parse" in the tld config AND THERE IS regex for specified server in default conf
if not self.parseConf and whoisServer not in self.parseDefaultConf:
self.parseConf = self.parseDefaultConf.get("default")
# THERE IS NO "parse" in the tld config
elif not self.parseConf:
self.parseConf = self.parseDefaultConf.get(whoisServer)
# THERE IS "parse" in the tld config AND THERE IS regex for specified server
elif self.whoisServer in self.parseConf:
self.parseConf = self.parseConf.get(self.whoisServer)
# THERE IS "parse" in the tld config AND THERE IS "default" regex in the tld config AND
# THERE IS NO regex for specified server
elif "default" in self.parseConf:
self.parseConf = self.parseConf.get("default")
# THEE IS "parse" in the tld config AND THERE IS NO "default" regex in the tld config
# MAYBE empty file?
else:
self.parseConf = self.parseDefaultConf.get("default")
# Check for LoadConf
_parseConf = self.parseConf
self.parseConf = {}
if "LoadConf" in _parseConf:
logging.debug("__init__: LoadConf found in parser config")
try:
# <tld>/<whois server>
# e.g. org/whois.publicinternetregistry.net
lc = _parseConf["LoadConf"].split("/", 1)
lcTLD = lc[0]
lcWS = lc[1]
lcConf = {}
logging.debug("__init__: Loading configuration file of tld name %s"%(lcTLD))
exec(compile(open(os.path.join(self.tldPath, "%s"%(lcTLD))).read(), os.path.join(self.tldPath, "%s"%(lcTLD)), 'exec'), {}, lcConf)
lcConf = lcConf.get("parse")
self.parseConf.update(lcConf.get(lcWS))
except:
pass
self.parseConf.update(_parseConf)
except:
self.parseConf = self.parseDefaultConf.get("default")
logging.debug("__init__: self.parseConf = %s"%(self.parseConf))
def parse(self):
result = {}
for key in self.parseConf:
matches = re.findall(self.parseConf[key], self.text, re.MULTILINE)
if matches:
logging.debug("run: regex matches found for key %s. %s"%(key, matches))
result.update({key: [x.strip() for x in matches]})
else:
logging.debug("run: No match for %s"%(key))
return result

136
whois/py3/whois.py Normal file
View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# __ __ __ __ ______ __ ______
#/\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\
#\ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \
# \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\
# \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/
import sys
import os
import socket
import re
import logging
import urllib.request, urllib.parse, urllib.error
import urllib.request, urllib.error, urllib.parse
from . import error
from . import flags
class Whois(object):
def __init__(self, domain, debug=False):
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True")
self.domain = str(domain, "utf-8").encode("idna")
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__))
self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s"
%(self.currPath, self.tldPath, self.tldList))
self.settings = {}
if self.tld in self.tldList:
logging.debug("__init__: Loading tld configuration file...")
_settings = {}
exec(compile(open(os.path.join(self.tldPath, self.tld)).read(), os.path.join(self.tldPath, self.tld), 'exec'), {}, _settings)
if "server" in _settings:
logging.debug("__init__: Settings: %s"%(_settings["server"]))
self.settings.update(_settings["server"])
else:
logging.debug("__init__: No server settings found")
def chooseServer(self):
'''Choose whois server by detecting tld of given domain.'''
if "host" in self.settings:
logging.debug("chooseServer: Whois server addr: %s"%(self.settings["host"]))
return self.settings["host"]
else:
logging.debug("chooseServer: Whois server addr: %s"%(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
def sendHTTPQuery(self, whoisServer):
param = urllib.parse.urlencode({self.settings["http-arg"]: self.domain})
if self.settings.get("http-method").lower() == "post":
logging.debug("sendHTTPQuery: Connecting to whois server using POST")
req = urllib.request.Request(whoisServer, param)
else: # GET
logging.debug("sendHTTPQuery: Connecting to whois server using GET")
req = urllib.request.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
data = urllib.request.urlopen(req).read()
print(data)
return data
def sendQuery(self, whoisServer):
'''Send query to whois server.'''
logging.debug("sendQuery: Connecting to whois server")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((whoisServer, 43))
except:
# FIXME: Create a exception class for this
logging.error("sendQuery: Error connecting to whois server %s"%(whoisServer))
return False
try:
msg = self.settings['format'][whoisServer].replace("%DOMAIN%", self.domain) + "\r\n"
except:
msg = self.domain + "\r\n"
logging.debug("sendQuery: Sending data.. %s"%(msg))
s.send(msg)
result = ""
while True:
buffer = s.recv(512)
if not buffer:
break
result += buffer
finalResult = result.replace("\r\n", "\n")
logging.debug("sendQuery: result: %s"%(finalResult))
return finalResult
def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST):
'''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
if self.settings.get("method") == "http":
result = self.sendHTTPQuery(whoisServer)
else:
result = self.sendQuery(whoisServer)
if redirect and "redirect" in self.settings:
logging.debug("query: Redirection found. Connecting to given server address")
redirection = re.findall(self.settings["redirect"], result, re.MULTILINE)
while redirection and len(redirection) >= 1:
whoisServer = redirection[0]
result = self.sendQuery(whoisServer)
redirection = re.findall(self.settings["redirect"], result)
if return_type == flags.RETURN_TYPE_LIST:
return whoisServer, result
else:
return {"whoisServer": whoisServer, "result": result}

View File

@ -1,5 +1,5 @@
server = { server = {
"host": "whois.auda.ltd", "host": "whois.auda.org.au",
"redirect": "\s+Whois Server: (.*)", # Should never apply to .au domains "redirect": "\s+Whois Server: (.*)", # Should never apply to .au domains
} }
@ -17,6 +17,7 @@ parse = {
"EligibilityName": "Eligibility Name:(.+)", "EligibilityName": "Eligibility Name:(.+)",
"EligibilityType": "Eligibility Type:(.+)", "EligibilityType": "Eligibility Type:(.+)",
"EligibilityID": "Eligibility ID:(.+)", "EligibilityID": "Eligibility ID:(.+)",
"RegistrantID": "Registrant ID:(.+)",
"AdminID": "Registrant Contact ID:(.+)", "AdminID": "Registrant Contact ID:(.+)",
"AdminName": "Registrant Contact Name:(.+)", "AdminName": "Registrant Contact Name:(.+)",
"TechName": "Tech Contact Name:(.+)", "TechName": "Tech Contact Name:(.+)",

25
whois/tlds/online Normal file
View File

@ -0,0 +1,25 @@
server = {
"host": "whois.nic.online",
"redirect": "\s+Whois Server: (.*)", # Should never apply to .au domains
}
parse = {
"default": {
"NotFound": "No Data Found",
"DomainName": "Domain Name:\s+(.+)",
"Registrar": "Registrar Name:\s+(.+)",
"NameServer": "Name Server:\s+(.+)",
"Status": "Status:\s+(.+)",
"UpdatedDate": "Last Modified:\s+(.+)",
# "CreationDate": "Creation Date:\s+(.+)",
# "ExpirationDate": "Expiration Date:\s+(.+)",
"RegistrantName": "Registrant:(.+)",
"EligibilityName": "Eligibility Name:(.+)",
"EligibilityType": "Eligibility Type:(.+)",
"EligibilityID": "Eligibility ID:(.+)",
"AdminID": "Registrant Contact ID:(.+)",
"AdminName": "Registrant Contact Name:(.+)",
"TechName": "Tech Contact Name:(.+)",
"TechID": "Tech Contact ID:(.+)",
},
}

View File

@ -1,136 +1,150 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# __ __ __ __ ______ __ ______ # __ __ __ __ ______ __ ______
#/\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\ # /\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\
#\ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \ # \ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \
# \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\ # \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\
# \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/ # \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/
import sys import sys
import os import os
import socket import socket
import re import re
import logging import logging
import urllib import urllib.request
import urllib2 import urllib.parse
import urllib.error
from . import flags
#import error
# import flags
import error
import flags
class Whois(object): class Whois(object):
def __init__(self, domain, debug=False): def __init__(self, domain, debug=False):
if debug: if debug:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True") logging.debug("__init__: DEBUG is set to True")
self.domain = unicode(domain, "utf-8").encode("idna") self.domain = domain
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__)) self.tld = self.domain.split(".")[-1]
self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s" self.currPath = os.path.dirname(os.path.realpath(__file__))
%(self.currPath, self.tldPath, self.tldList)) self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
self.settings = {} logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s"
% (self.currPath, self.tldPath, self.tldList))
if self.tld in self.tldList: self.settings = {}
logging.debug("__init__: Loading tld configuration file...")
_settings = {} if self.tld in self.tldList:
execfile(os.path.join(self.tldPath, self.tld), {}, _settings) logging.debug("__init__: Loading tld configuration file...")
if "server" in _settings: _settings = {}
logging.debug("__init__: Settings: %s"%(_settings["server"])) exec(compile(open(os.path.join(self.tldPath, self.tld)).read(),
self.settings.update(_settings["server"]) os.path.join(self.tldPath, self.tld), 'exec'), {}, _settings)
else:
logging.debug("__init__: No server settings found")
def chooseServer(self): if "server" in _settings:
'''Choose whois server by detecting tld of given domain.''' logging.debug("__init__: Settings: %s" % (_settings["server"]))
if "host" in self.settings: self.settings.update(_settings["server"])
logging.debug("chooseServer: Whois server addr: %s"%(self.settings["host"])) else:
return self.settings["host"] logging.debug("__init__: No server settings found")
else:
logging.debug("chooseServer: Whois server addr: %s"%(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
def sendHTTPQuery(self, whoisServer): def chooseServer(self):
param = urllib.urlencode({self.settings["http-arg"]: self.domain}) '''Choose whois server by detecting tld of given domain.'''
if "host" in self.settings:
logging.debug("chooseServer: Whois server addr: %s" %
(self.settings["host"]))
return self.settings["host"]
else:
logging.debug("chooseServer: Whois server addr: %s" %
(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
if self.settings.get("http-method").lower() == "post": def sendHTTPQuery(self, whoisServer):
logging.debug("sendHTTPQuery: Connecting to whois server using POST") param = urllib.parse.urlencode(
req = urllib2.Request(whoisServer, param) {self.settings["http-arg"]: self.domain})
else: # GET
logging.debug("sendHTTPQuery: Connecting to whois server using GET")
req = urllib2.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
data = urllib2.urlopen(req).read() if self.settings.get("http-method").lower() == "post":
print data logging.debug(
"sendHTTPQuery: Connecting to whois server using POST")
req = urllib.request.Request(whoisServer, param)
else: # GET
logging.debug(
"sendHTTPQuery: Connecting to whois server using GET")
req = urllib.request.Request((whoisServer.endswith(
"?") and whoisServer or whoisServer+"?") + param)
return data data = urllib.request.urlopen(req).read()
print(data)
def sendQuery(self, whoisServer):
'''Send query to whois server.'''
logging.debug("sendQuery: Connecting to whois server")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: return data
s.connect((whoisServer, 43))
except: def sendQuery(self, whoisServer):
# FIXME: Create a exception class for this '''Send query to whois server.'''
logging.error("sendQuery: Error connecting to whois server %s"%(whoisServer)) logging.debug("sendQuery: Connecting to whois server")
return False s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
msg = self.settings['format'][whoisServer].replace("%DOMAIN%", self.domain) + "\r\n" s.connect((whoisServer, 43))
except: except:
msg = self.domain + "\r\n" # FIXME: Create a exception class for this
logging.error(
"sendQuery: Error connecting to whois server %s" % (whoisServer))
return False
logging.debug("sendQuery: Sending data.. %s"%(msg)) try:
msg = self.settings['format'][whoisServer].replace(
"%DOMAIN%", self.domain) + "\r\n"
s.send(msg) except:
msg = self.domain + "\r\n"
result = ""
while True: logging.debug("sendQuery: Sending data.. %s" % (msg))
buffer = s.recv(512) # print(msg.encode())
s.send(msg.encode('utf-8'))
if not buffer: result = ""
break
result += buffer while True:
buffer = s.recv(512)
finalResult = result.replace("\r\n", "\n") if not buffer:
break
logging.debug("sendQuery: result: %s"%(finalResult)) result += buffer.decode()
return finalResult finalResult = result.replace("\r\n", "\n")
def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST): logging.debug("sendQuery: result: %s" % (finalResult))
'''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
if self.settings.get("method") == "http": return finalResult
result = self.sendHTTPQuery(whoisServer)
else:
result = self.sendQuery(whoisServer)
if redirect and "redirect" in self.settings: def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST):
logging.debug("query: Redirection found. Connecting to given server address") '''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
redirection = re.findall(self.settings["redirect"], result, re.MULTILINE) if self.settings.get("method") == "http":
result = self.sendHTTPQuery(whoisServer)
else:
result = self.sendQuery(whoisServer)
while redirection and len(redirection) >= 1: if redirect and "redirect" in self.settings:
whoisServer = redirection[0] logging.debug(
result = self.sendQuery(whoisServer) "query: Redirection found. Connecting to given server address")
redirection = re.findall(self.settings["redirect"], result)
redirection = re.findall(
self.settings["redirect"], result, re.MULTILINE)
if return_type == flags.RETURN_TYPE_LIST: while redirection and len(redirection) >= 1:
return whoisServer, result whoisServer = redirection[0]
else: result = self.sendQuery(whoisServer)
return {"whoisServer": whoisServer, "result": result} redirection = re.findall(self.settings["redirect"], result)
if return_type == flags.RETURN_TYPE_LIST:
return whoisServer, result
else:
return {"whoisServer": whoisServer, "result": result}

136
whois/whois.py2 Normal file
View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# __ __ __ __ ______ __ ______
#/\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\
#\ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \
# \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\
# \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/
import sys
import os
import socket
import re
import logging
import urllib
import urllib2
import error
import flags
class Whois(object):
def __init__(self, domain, debug=False):
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True")
self.domain = unicode(domain, "utf-8").encode("idna")
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__))
self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s"
%(self.currPath, self.tldPath, self.tldList))
self.settings = {}
if self.tld in self.tldList:
logging.debug("__init__: Loading tld configuration file...")
_settings = {}
execfile(os.path.join(self.tldPath, self.tld), {}, _settings)
if "server" in _settings:
logging.debug("__init__: Settings: %s"%(_settings["server"]))
self.settings.update(_settings["server"])
else:
logging.debug("__init__: No server settings found")
def chooseServer(self):
'''Choose whois server by detecting tld of given domain.'''
if "host" in self.settings:
logging.debug("chooseServer: Whois server addr: %s"%(self.settings["host"]))
return self.settings["host"]
else:
logging.debug("chooseServer: Whois server addr: %s"%(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
def sendHTTPQuery(self, whoisServer):
param = urllib.urlencode({self.settings["http-arg"]: self.domain})
if self.settings.get("http-method").lower() == "post":
logging.debug("sendHTTPQuery: Connecting to whois server using POST")
req = urllib2.Request(whoisServer, param)
else: # GET
logging.debug("sendHTTPQuery: Connecting to whois server using GET")
req = urllib2.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
data = urllib2.urlopen(req).read()
print data
return data
def sendQuery(self, whoisServer):
'''Send query to whois server.'''
logging.debug("sendQuery: Connecting to whois server")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((whoisServer, 43))
except:
# FIXME: Create a exception class for this
logging.error("sendQuery: Error connecting to whois server %s"%(whoisServer))
return False
try:
msg = self.settings['format'][whoisServer].replace("%DOMAIN%", self.domain) + "\r\n"
except:
msg = self.domain + "\r\n"
logging.debug("sendQuery: Sending data.. %s"%(msg))
s.send(msg)
result = ""
while True:
buffer = s.recv(512)
if not buffer:
break
result += buffer
finalResult = result.replace("\r\n", "\n")
logging.debug("sendQuery: result: %s"%(finalResult))
return finalResult
def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST):
'''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
if self.settings.get("method") == "http":
result = self.sendHTTPQuery(whoisServer)
else:
result = self.sendQuery(whoisServer)
if redirect and "redirect" in self.settings:
logging.debug("query: Redirection found. Connecting to given server address")
redirection = re.findall(self.settings["redirect"], result, re.MULTILINE)
while redirection and len(redirection) >= 1:
whoisServer = redirection[0]
result = self.sendQuery(whoisServer)
redirection = re.findall(self.settings["redirect"], result)
if return_type == flags.RETURN_TYPE_LIST:
return whoisServer, result
else:
return {"whoisServer": whoisServer, "result": result}