2to3port #2

Merged
benjamyn merged 12 commits from 2to3port into master 2020-10-10 18:48:33 -04:00
3 changed files with 286 additions and 14 deletions
Showing only changes of commit d9f11956c2 - Show all commits

136
whois/py3/whois.py Normal file
View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# __ __ __ __ ______ __ ______
#/\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\
#\ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \
# \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\
# \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/
import sys
import os
import socket
import re
import logging
import urllib.request, urllib.parse, urllib.error
import urllib.request, urllib.error, urllib.parse
from . import error
from . import flags
class Whois(object):
def __init__(self, domain, debug=False):
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True")
self.domain = str(domain, "utf-8").encode("idna")
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__))
self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s"
%(self.currPath, self.tldPath, self.tldList))
self.settings = {}
if self.tld in self.tldList:
logging.debug("__init__: Loading tld configuration file...")
_settings = {}
exec(compile(open(os.path.join(self.tldPath, self.tld)).read(), os.path.join(self.tldPath, self.tld), 'exec'), {}, _settings)
if "server" in _settings:
logging.debug("__init__: Settings: %s"%(_settings["server"]))
self.settings.update(_settings["server"])
else:
logging.debug("__init__: No server settings found")
def chooseServer(self):
'''Choose whois server by detecting tld of given domain.'''
if "host" in self.settings:
logging.debug("chooseServer: Whois server addr: %s"%(self.settings["host"]))
return self.settings["host"]
else:
logging.debug("chooseServer: Whois server addr: %s"%(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
def sendHTTPQuery(self, whoisServer):
param = urllib.parse.urlencode({self.settings["http-arg"]: self.domain})
if self.settings.get("http-method").lower() == "post":
logging.debug("sendHTTPQuery: Connecting to whois server using POST")
req = urllib.request.Request(whoisServer, param)
else: # GET
logging.debug("sendHTTPQuery: Connecting to whois server using GET")
req = urllib.request.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
data = urllib.request.urlopen(req).read()
print(data)
return data
def sendQuery(self, whoisServer):
'''Send query to whois server.'''
logging.debug("sendQuery: Connecting to whois server")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((whoisServer, 43))
except:
# FIXME: Create a exception class for this
logging.error("sendQuery: Error connecting to whois server %s"%(whoisServer))
return False
try:
msg = self.settings['format'][whoisServer].replace("%DOMAIN%", self.domain) + "\r\n"
except:
msg = self.domain + "\r\n"
logging.debug("sendQuery: Sending data.. %s"%(msg))
s.send(msg)
result = ""
while True:
buffer = s.recv(512)
if not buffer:
break
result += buffer
finalResult = result.replace("\r\n", "\n")
logging.debug("sendQuery: result: %s"%(finalResult))
return finalResult
def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST):
'''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
if self.settings.get("method") == "http":
result = self.sendHTTPQuery(whoisServer)
else:
result = self.sendQuery(whoisServer)
if redirect and "redirect" in self.settings:
logging.debug("query: Redirection found. Connecting to given server address")
redirection = re.findall(self.settings["redirect"], result, re.MULTILINE)
while redirection and len(redirection) >= 1:
whoisServer = redirection[0]
result = self.sendQuery(whoisServer)
redirection = re.findall(self.settings["redirect"], result)
if return_type == flags.RETURN_TYPE_LIST:
return whoisServer, result
else:
return {"whoisServer": whoisServer, "result": result}

View File

@ -11,19 +11,19 @@ import os
import socket
import re
import logging
import urllib
#import urllib2
import urllib.request, urllib.parse, urllib.error
#import errormyhost
import error
import flags
class Whois(object):
def __init__(self, domain, debug=False):
def __init__(self, domain, debug=True):
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True")
self.domain = unicode(domain, "utf-8").encode("idna")
self.domain = domain
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__))
@ -39,7 +39,7 @@ class Whois(object):
logging.debug("__init__: Loading tld configuration file...")
_settings = {}
execfile(os.path.join(self.tldPath, self.tld), {}, _settings)
exec(compile(open(os.path.join(self.tldPath, self.tld)).read(), os.path.join(self.tldPath, self.tld), 'exec'), {}, _settings)
if "server" in _settings:
logging.debug("__init__: Settings: %s"%(_settings["server"]))
@ -57,16 +57,16 @@ class Whois(object):
return self.tld + ".whois-servers.net"
def sendHTTPQuery(self, whoisServer):
param = urllib.urlencode({self.settings["http-arg"]: self.domain})
param = urllib.parse.urlencode({self.settings["http-arg"]: self.domain})
if self.settings.get("http-method").lower() == "post":
logging.debug("sendHTTPQuery: Connecting to whois server using POST")
req = urllib.Request(whoisServer, param)
req = urllib.request.Request(whoisServer, param)
else: # GET
logging.debug("sendHTTPQuery: Connecting to whois server using GET")
req = urllib.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
req = urllib.request.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
data = urllib.urlopen(req).read()
data = urllib.request.urlopen(req).read()
print(data)
return data
@ -91,8 +91,8 @@ class Whois(object):
msg = self.domain + "\r\n"
logging.debug("sendQuery: Sending data.. %s"%(msg))
s.send(msg)
# print(msg.encode())
s.send(msg.encode('utf-8'))
result = ""
@ -102,7 +102,7 @@ class Whois(object):
if not buffer:
break
result += buffer
result += buffer.decode()
finalResult = result.replace("\r\n", "\n")

136
whois/whois.py2 Normal file
View File

@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
# __ __ __ __ ______ __ ______
#/\ \ _ \ \ /\ \_\ \ /\ __ \ /\ \ /\ ___\
#\ \ \/ ".\ \\ \ __ \\ \ \/\ \\ \ \\ \___ \
# \ \__/".~\_\\ \_\ \_\\ \_____\\ \_\\/\_____\
# \/_/ \/_/ \/_/\/_/ \/_____/ \/_/ \/_____/
import sys
import os
import socket
import re
import logging
import urllib
import urllib2
import error
import flags
class Whois(object):
def __init__(self, domain, debug=False):
if debug:
logging.basicConfig(level=logging.DEBUG)
logging.debug("__init__: DEBUG is set to True")
self.domain = unicode(domain, "utf-8").encode("idna")
self.tld = self.domain.split(".")[-1]
self.currPath = os.path.dirname(os.path.realpath(__file__))
self.tldPath = os.path.join(self.currPath, "tlds")
self.tldList = os.listdir(self.tldPath)
logging.debug("__init__: Setting initial variables.. self.currPath = %s / self.tldPath = %s / self.tldList = %s"
%(self.currPath, self.tldPath, self.tldList))
self.settings = {}
if self.tld in self.tldList:
logging.debug("__init__: Loading tld configuration file...")
_settings = {}
execfile(os.path.join(self.tldPath, self.tld), {}, _settings)
if "server" in _settings:
logging.debug("__init__: Settings: %s"%(_settings["server"]))
self.settings.update(_settings["server"])
else:
logging.debug("__init__: No server settings found")
def chooseServer(self):
'''Choose whois server by detecting tld of given domain.'''
if "host" in self.settings:
logging.debug("chooseServer: Whois server addr: %s"%(self.settings["host"]))
return self.settings["host"]
else:
logging.debug("chooseServer: Whois server addr: %s"%(self.tld + ".whois-servers.net"))
return self.tld + ".whois-servers.net"
def sendHTTPQuery(self, whoisServer):
param = urllib.urlencode({self.settings["http-arg"]: self.domain})
if self.settings.get("http-method").lower() == "post":
logging.debug("sendHTTPQuery: Connecting to whois server using POST")
req = urllib2.Request(whoisServer, param)
else: # GET
logging.debug("sendHTTPQuery: Connecting to whois server using GET")
req = urllib2.Request((whoisServer.endswith("?") and whoisServer or whoisServer+"?") + param)
data = urllib2.urlopen(req).read()
print data
return data
def sendQuery(self, whoisServer):
'''Send query to whois server.'''
logging.debug("sendQuery: Connecting to whois server")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((whoisServer, 43))
except:
# FIXME: Create a exception class for this
logging.error("sendQuery: Error connecting to whois server %s"%(whoisServer))
return False
try:
msg = self.settings['format'][whoisServer].replace("%DOMAIN%", self.domain) + "\r\n"
except:
msg = self.domain + "\r\n"
logging.debug("sendQuery: Sending data.. %s"%(msg))
s.send(msg)
result = ""
while True:
buffer = s.recv(512)
if not buffer:
break
result += buffer
finalResult = result.replace("\r\n", "\n")
logging.debug("sendQuery: result: %s"%(finalResult))
return finalResult
def query(self, redirect=True, return_type=flags.RETURN_TYPE_LIST):
'''Start whole process of whois query. This method will do them all.'''
whoisServer = self.chooseServer()
if self.settings.get("method") == "http":
result = self.sendHTTPQuery(whoisServer)
else:
result = self.sendQuery(whoisServer)
if redirect and "redirect" in self.settings:
logging.debug("query: Redirection found. Connecting to given server address")
redirection = re.findall(self.settings["redirect"], result, re.MULTILINE)
while redirection and len(redirection) >= 1:
whoisServer = redirection[0]
result = self.sendQuery(whoisServer)
redirection = re.findall(self.settings["redirect"], result)
if return_type == flags.RETURN_TYPE_LIST:
return whoisServer, result
else:
return {"whoisServer": whoisServer, "result": result}