Updated the documentation The only thing that is still broken is the getStream() as it is 400ing and I cannot tell why
205 lines
6.0 KiB
Python
205 lines
6.0 KiB
Python
import requests
|
|
import json
|
|
import base64
|
|
from pprint import pprint
|
|
|
|
proxies = {
|
|
'http': "localhost:8080"
|
|
}
|
|
|
|
class gotipy(object):
|
|
def __init__(self, token, url):
|
|
self.token = token
|
|
self.url = url
|
|
self.headers = {'X-Gotify-Key' : self.token}
|
|
|
|
## Auth Function
|
|
def basicAuth(self, user, passwd):
|
|
self.user = user
|
|
self.passwd = passwd
|
|
self.string = self.user + ':' + self.passwd
|
|
self.encoded = base64.b64encode(self.string.encode("utf-8"))
|
|
print(self.string.encode('utf-8'))
|
|
self.headers["authorization"] = "Basic " + base64.b64encode(self.string)
|
|
print(self.encoded)
|
|
return self.headers
|
|
|
|
## Token Functions
|
|
def getApplications(self):
|
|
self.requrl = self.url + '/application'
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def createApplication(self, name, description):
|
|
self.name = name
|
|
self.description = description
|
|
self.requrl = self.url + '/application'
|
|
|
|
data = {}
|
|
data["name"] = self.name
|
|
data["description"] = self.description
|
|
|
|
req = requests.post(self.requrl, headers=self.headers, json=data)
|
|
pprint(req)
|
|
return req
|
|
|
|
def deleteApplication(self, appid):
|
|
self.appid = appid
|
|
self.requrl = self.url + '/application/' + str(self.appid)
|
|
|
|
req = requests.delete(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def updateApplicationImage(self, appid, imgData):
|
|
self.id = appid
|
|
self.imgData = { 'file': imgData}
|
|
self.requrl = self.url + '/application/' + self.id + '/image'
|
|
|
|
req = requests.post(self.requrl, headers=self.headers, files=self.imgData)
|
|
return req
|
|
|
|
def getClients(self):
|
|
self.requrl = self.url + '/client'
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def addClient(self, name):
|
|
self.name = name
|
|
self.requrl = self.url + '/client'
|
|
data = {"name": self.name}
|
|
pprint(data)
|
|
|
|
req = requests.post(self.requrl, headers=self.headers, json=data)
|
|
return req
|
|
|
|
def deleteClient(self, clientid):
|
|
self.clientid = clientid
|
|
self.requrl = self.url + '/client/' + str(self.clientid)
|
|
|
|
req = requests.delete(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
## Message Functions
|
|
def getMessagesByApp(self, appid):
|
|
self.appid = appid
|
|
self.requrl = self.url + '/application/' + str(self.appid) + "/message"
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def deleteAllMessagesFromApp(self, appid):
|
|
self.appid = appid
|
|
self.requrl = self.url + '/application/' + str(self.appid) + "/message"
|
|
|
|
req = requests.delete(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def getAllMessages(self):
|
|
self.requrl = self.url + '/message'
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def sendMessage(self, title, message):
|
|
self.title = title
|
|
self.message = message
|
|
self.requrl = self.url + '/message'
|
|
|
|
#define a new dict for the message
|
|
data = {}
|
|
data["message"] = self.message
|
|
data["title"] = self.title
|
|
|
|
req = requests.post(self.requrl, headers=self.headers, json=data)
|
|
return req
|
|
|
|
def deleteAllMessages(self):
|
|
self.requrl = self.url + '/message'
|
|
|
|
req = requests.delete(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def deleteMessageByID(self, msgid):
|
|
self.msgid = msgid
|
|
self.requrl = self.url + '/message/' + str(self.msgid)
|
|
|
|
req = requests.delete(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def getStream(self): ## Currently broken, returns 400 error, no idea why
|
|
self.requrl = self.url + '/stream'
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
## User Functions
|
|
|
|
def getCurrentUser(self):
|
|
self.requrl = self.url + '/current/user'
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def changeUserPassword(self, passwd):
|
|
self.passwd = str(passwd)
|
|
self.requrl = self.url + '/current/user/password'
|
|
|
|
data = {"pass": self.passwd}
|
|
req = requests.post(self.requrl, headers=self.headers, json=data)
|
|
return req
|
|
|
|
def getUsers(self):
|
|
self.requrl = self.url + '/user'
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def createUser(self, isAdmin, username, password):
|
|
self.isAdmin = bool(isAdmin)
|
|
self.username = str(username)
|
|
self.password = str(password)
|
|
self.requrl = self.url + '/user'
|
|
|
|
data = {"admin": self.isAdmin,
|
|
"name": self.username,
|
|
"pass": self.password}
|
|
|
|
req = requests.post(self.requrl, headers=self.headers, json=data)
|
|
return req
|
|
|
|
def getUserByID(self, userID):
|
|
self.userID = str(userID)
|
|
self.requrl = self.url + '/user/' + self.userID
|
|
|
|
req = requests.get(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
def updateUserByID(self, userID, isAdmin, username, password): ## I really do not like this implementation will look into a better way
|
|
self.userID = str(userID)
|
|
self.isAdmin = bool(isAdmin)
|
|
self.username = str(username)
|
|
self.password = str(password)
|
|
self.requrl = self.url + '/user/' + self.userID
|
|
|
|
data = {"admin": self.isAdmin,
|
|
"name": self.username,
|
|
"pass": self.password}
|
|
req = requests.post(self.requrl, headers=self.headers, json=data)
|
|
return req
|
|
|
|
def deleteUserByID(self, userID):
|
|
self.userID = str(userID)
|
|
self.requrl = self.url + '/user/' + self.userID
|
|
|
|
req = requests.delete(self.requrl, headers=self.headers)
|
|
return req
|
|
|
|
## Get version information
|
|
|
|
def getVersion(self):
|
|
self.requrl = self.url + '/version'
|
|
|
|
req = requests.get(self.requrl)
|
|
return req |