Merge pull request 'stable-diffuse-api' (#4) from stable-diffuse-api into master

Reviewed-on: #4
This commit is contained in:
Benjamyn Love 2023-09-17 22:20:47 -04:00
commit ce4e23b7c8
5 changed files with 460 additions and 19 deletions

219
commands/diffuse_cog.py Normal file
View File

@ -0,0 +1,219 @@
import discord
import hashlib
from discord import app_commands
from discord.ext import commands
from modules.diffuseapi import DiffuseAPI
from base64 import b64decode, b64encode
from typing import Literal
import io
class Confirm(discord.ui.View):
def __init__(self, api: DiffuseAPI):
super().__init__(timeout=None)
self.value = None
self.api = api
async def callback(self, interaction: discord.Interaction):
await interaction.response.send_message("You wish to upscale this image!")
# When the confirm function is run we will send the image to the upscale endpoint
@discord.ui.button(label='Upscale', style=discord.ButtonStyle.green)
async def confirm(self, interaction: discord.Interaction, button: discord.ui.Button):
image_data = await interaction.message.attachments[0].read()
await interaction.response.defer()
upscaled_image = await self.api.generate_upscale(b64encode(image_data).decode('utf-8'))
data = io.BytesIO(b64decode(upscaled_image))
await interaction.followup.send("", file=discord.File(data, "upscaled.png"))
self.stop()
# The cancel function will delete the message
@discord.ui.button(label='Delete', style=discord.ButtonStyle.grey)
async def cancel(self, interaction: discord.Interaction, button: discord.ui.Button):
await interaction.message.delete()
self.stop()
class DiffuseCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.api = DiffuseAPI("https://art.jurydoak.com", ["default"], True, 28)
self.user_settings = {}
self.prev_prompt = None
def _get_user_settings(self, user_id: int):
user_data = self.user_settings.get(user_id)
if user_data is None:
self.user_settings[user_id] = {
"prompt": "",
"styles": ["default"],
"steps": 28,
"seed": -1,
"n_iter": 1,
"height": 768,
"width": 512,
"negative_prompts": "",
"cfg_scale": 12
}
user_data = self.user_settings.get(user_id)
return user_data
@app_commands.command(name="pregen", description="Generate an image appending your prompt to the end")
async def pregenerate_image(self, interaction: discord.Interaction, prompt: str):
"""/pregen prompt (Generates an image using the prompt)"""
# This can take a while so we defer the response, this also tells the client the bot is thinking
await interaction.response.defer()
# Create an instance of the Confirm view so we can use it in our response
view = Confirm(self.api)
new_prompt = f"{self.prev_prompt}, {prompt}"
# Get any custom settings the user has entered
user_data = self._get_user_settings(interaction.user.id)
print(f"SENDING PROMPT: {new_prompt} from User: {interaction.user.display_name}")
# Send the prompt + the users custom payload to the API
image_data = await self.api.generate_image(prompt=new_prompt, payload=user_data)
# print(image_data)
# Verify that an image was returned
if image_data is None:
await interaction.followup.send("Failed to generate image, please bitch at admin :3")
# ### Bonk check
decoded_image = b64decode(image_data)
# h = hashlib.md5()
# h.update(decoded_image)
# digest = h.digest()
# return await self.bonk_check(interaction, digest)
### /Bonk check
# Decode the image to bytes and send it in the response as a discord.File object
data = io.BytesIO(decoded_image)
self.prev_prompt = new_prompt
await interaction.followup.send(new_prompt, file=discord.File(data, "prompt.png"), view=view)
@app_commands.command(name="gen", description="Generate an image with the following prompt")
async def generate_image(self, interaction: discord.Interaction, prompt: str):
"""/gen prompt (Generates an image using the prompt)"""
# This can take a while so we defer the response, this also tells the client the bot is thinking
await interaction.response.defer()
# Create an instance of the Confirm view so we can use it in our response
view = Confirm(self.api)
# Get any custom settings the user has entered
user_data = self._get_user_settings(interaction.user.id)
print(f"SENDING PROMPT: {prompt} from User: {interaction.user.display_name}")
# Send the prompt + the users custom payload to the API
image_data = await self.api.generate_image(prompt=prompt, payload=user_data)
# print(image_data)
# Verify that an image was returned
if image_data is None:
await interaction.followup.send("Failed to generate image, please bitch at admin :3")
# ### Bonk check
decoded_image = b64decode(image_data)
# h = hashlib.md5()
# h.update(decoded_image)
# digest = h.digest()
# return await self.bonk_check(interaction, digest)
### /Bonk check
# Decode the image to bytes and send it in the response as a discord.File object
data = io.BytesIO(decoded_image)
self.prev_prompt = prompt
await interaction.followup.send(prompt, file=discord.File(data, "prompt.png"), view=view)
# async def bonk_check(self, interaction, digest):
# digests = [b'i\xac`\xde\xbak\xba\xab{2Z\xcc\tK\xc2~']
# if digest in digests :
# await interaction.followup.send("Were no stranger to lewds, but you know the rules, and so do I", file=discord.File("images/404.jpg", "404.jpg"))
# return
@app_commands.command(name="steps", description="Set the amount of steps the bot will run")
async def set_steps(self, interaction: discord.Interaction, steps: int):
user_settings = self._get_user_settings(interaction.user.id)
try:
user_settings['steps'] = int(steps)
await interaction.response.send_message("Set steps uWu", ephemeral=True)
except Exception:
await interaction.response.send_message("Failed to set steps, baka", ephemeral=True)
@app_commands.command(name="api", description="Swaps between the main and test API", )
async def swap_api(self, interaction: discord.Interaction):
user_settings = self._get_user_settings(interaction.user.id)
'''Test function, currently changes the URL and bot settings'''
if self.api.url != "http://localhost:7860":
self.api.url = "http://localhost:7860"
user_settings['style'] = ["default"]
self.api.styles = ["default"]
activity = discord.Activity(type=discord.ActivityType.listening, name="Waiting for your prompt :3 (Test API)")
await self.bot.change_presence(activity=activity)
await interaction.response.send_message("Set to fastboi")
else:
self.api.url = "https://art.jurydoak.com"
self.api.styles = ["Bot"]
user_settings['style'] = ["Bot"]
activity = discord.Activity(type=discord.ActivityType.listening, name="Waiting for your prompt :3 (Main API)")
await self.bot.change_presence(activity=activity)
await interaction.response.send_message("Set to main api")
@app_commands.command(name="orientation", description="Set the orientation of the generated image")
# @app_commands.choices([app_commands.Choice("portrait"), app_commands.Choice("landscape"), app_commands.Choice("square")])
async def change_orientation(self, interaction: discord.Interaction, orientation: Literal["portrait", "landscape", "square"]):
if orientation.lower() not in ["portrait", "landscape", "square"]:
await interaction.response.send_message("You can only set the orientation to portrait, landscape or square")
return
user_data = self._get_user_settings(interaction.user.id)
match orientation.lower():
case "portrait":
user_data['height'] = 768
user_data['width'] = 512
case "landscape":
user_data['height'] = 512
user_data['width'] = 768
case "square":
user_data['height'] = 512
user_data['width'] = 512
await interaction.response.send_message(f"Set the orientation to {orientation}", ephemeral=True)
@app_commands.command(name="seed", description="Set a static seed (-1 is random)")
async def set_seed(self, interaction: discord.Interaction, seed: int):
try:
seed = int(seed)
user_settings = self._get_user_settings(interaction.user.id)
user_settings["seed"] = seed
await interaction.response.send_message(f"Set the seed to {seed}", ephemeral=True)
except:
await interaction.response.send_message("Failed to set the seed")
async def setup(bot: commands.Bot):
await bot.add_cog(DiffuseCog(bot), guild=discord.Object(id=775297479783874570))

171
commands/game_cog.py Normal file
View File

@ -0,0 +1,171 @@
import discord
import hashlib
from discord import app_commands
from discord.ext import commands
from modules.diffuseapi import DiffuseAPI
from base64 import b64decode, b64encode
from typing import Literal
import io
class Game:
def __init__(self, player1, player2, turns_per_player, base_prompt):
self.player1 = player1
self.player2 = player2
self.current_turn = 1
self.player1_turn_count = 0
self.player2_turn_count = 0
self.turns_per_player = turns_per_player
self.prompt = base_prompt
# Adds new_prompt to self.prompt
def prompt_add(self, new_prompt):
self.prompt = f"{self.prompt}, {new_prompt}"
def get_settings(self):
data = {
"prompt": self.prompt,
"styles": ["default"],
"steps": 28,
"seed": -1,
"n_iter": 1,
"height": 768,
"width": 512,
"negative_prompts": "",
"cfg_scale": 12
}
return data
def get_players(self):
return [self.player1.id, self.player2.id]
def is_player_ingame(self, player_id):
if player_id in self.get_players():
return True
return False
# return True if player_id in self.get_players() else False
def take_turn(self, new_prompt):
match(self.current_turn):
case 0:
if self.player1_turn_count <= self.turns_per_player:
# Generate new prompt string
self.prompt_add(new_prompt)
# Increment turn and swap player
self.current_turn = 1
self.player1_turn_count += 1
return self.prompt
return None
case 1:
if self.player2_turn_count <= self.turns_per_player:
# Generate new prompt string
self.prompt_add(new_prompt)
# Increment turn and swap player
self.current_turn = 0
self.player2_turn_count += 1
return self.prompt
return None
class GameCog(commands.Cog):
def __init__(self, bot: commands.Bot):
self.bot = bot
self.api = DiffuseAPI("https://art.jurydoak.com", ["default"], True, 28)
self.games = {}
@app_commands.command(name="startgame", description="Start a DEGEN GAME with another player")
async def start_game(self, interaction: discord.Integration, player: discord.Member, base_prompt: str):
"""/startgame @username (starts a game)"""
for game in self.games:
if self.games[game].is_player_ingame(interaction.user.id):
await interaction.response.send_message(f"{interaction.user.display_name} is still in a game")
if self.games[game].is_player_ingame(player.id):
await interaction.response.send_message(f"{player.display_name} is still in a game")
game_name = interaction.user.display_name + player.display_name
game = Game(interaction.user, player, 5, base_prompt)
self.games[game_name] = game
await interaction.response.defer()
print(f"SENDING PROMPT: {game.prompt} from User: {interaction.user.display_name}")
# Send the prompt + the users custom payload to the API
image_data = await self.api.generate_image(prompt=game.prompt, payload=game.get_settings())
# Verify that an image was returned
if image_data is None:
await interaction.followup.send("Failed to generate image, please bitch at admin :3")
# ### Bonk check
decoded_image = b64decode(image_data)
# Decode the image to bytes and send it in the response as a discord.File object
data = io.BytesIO(decoded_image)
await interaction.followup.send(f"Game started with {interaction.user.display_name} with {player.display_name} | 5 Turns | {game.prompt} \n Current player {game.player2.mention}", file=discord.File(data, "prompt.png"))
await interaction.response.send_message(f"Game started with {interaction.user.display_name} with {player.display_name} | 5 Turns | {game.prompt}")
@app_commands.command(name="listgames", description="List running degen games")
async def list_games(self, interaction: discord.Interaction):
"""/listgames (Lists running games)"""
ret = ""
for game in self.games:
ret += f"{self.games[game].player1.display_name} vs. {self.games[game].player2.display_name}: {self.games[game].player1_turn_count + self.games[game].player2_turn_count} turns in\n"
await interaction.response.send_message(ret)
@app_commands.command(name="stopgame", description="Stop the current game you are in.")
async def stop_game(self, interaction: discord.Interaction):
# Make sure this user is in a game
for game in self.games:
if self.games[game].is_player_ingame(interaction.user.id):
del self.games[game]
break
await interaction.response.send_message(f"Game has been removed")
@app_commands.command(name="taketurn", description="Take your turn in degen game")
async def take_turn(self, interaction: discord.Interaction, prompt: str):
# Make sure this user is in a game
game_instance = None
for game in self.games:
if self.games[game].is_player_ingame(interaction.user.id):
game_instance = self.games[game]
break
if game_instance is None:
await interaction.response.send_message(f"{interaction.user.display_name} is not in a game")
# Get the current turn and check the appropriate players id
match(game_instance.current_turn):
case 0:
if interaction.user.id != game_instance.player1.id:
await interaction.response.send_message("It's not your turn.", ephemeral=True)
case 1:
if interaction.user.id != game_instance.player2.id:
await interaction.response.send_message("It's not your turn.")
# We are all gucci for the turn so we defer as we will be generating images now
channel = interaction.channel
await interaction.response.defer(ephemeral=True)
game_instance.take_turn(prompt)
print(f"SENDING PROMPT: {game_instance.prompt} from User: {interaction.user.display_name}")
# Send the prompt + the users custom payload to the API
image_data = await self.api.generate_image(prompt=game_instance.prompt, payload=game_instance.get_settings())
# Verify that an image was returned
if image_data is None:
await interaction.followup.send("Failed to generate image, please bitch at admin :3")
# ### Bonk check
decoded_image = b64decode(image_data)
# Decode the image to bytes and send it in the response as a discord.File object
data = io.BytesIO(decoded_image)
await interaction.followup.send(game_instance.prompt, file=discord.File(data, "prompt.png"), ephemeral=True)
if game_instance.current_turn == 0:
player_name = game_instance.player1.mention
else:
player_name = game_instance.player2.mention
await channel.send(f"Turn taken: {player_name}'s turn")
async def setup(bot: commands.Bot):
await bot.add_cog(GameCog(bot), guild=discord.Object(id=775297479783874570))

View File

@ -18,7 +18,7 @@ class DiffuseAPI():
def set_steps(self, steps):
try:
new_steps = int(steps)
if 0 > new_steps < 50:
if new_steps > 0 and new_steps <= 50:
self.num_steps = new_steps
return True
return False
@ -83,7 +83,8 @@ class DiffuseAPI():
def get_nsfw_filter(self):
return self.nsfw_enabled
async def generate_image(self, prompt, neg_prompt=""):
async def generate_image(self, prompt, neg_prompt="", payload=None):
if payload is None:
payload = {
"prompt": prompt,
"styles": self.styles,
@ -96,9 +97,12 @@ class DiffuseAPI():
"cfg_scale": self.cfg_scale
}
payload['prompt'] = prompt
settings = {
"filter_nsfw": not self.nsfw_enabled,
"enable_pnginfo": False
# "filter_nsfw": not self.nsfw_enabled,
"enable_pnginfo": False,
# "sd_hypernetwork": "anime_3"
}
override_payload = {
@ -122,7 +126,7 @@ class DiffuseAPI():
"gfpgan_visibility": 0,
"codeformer_visibility": 0,
"codeformer_weight": 0,
"upscaling_resize": 4,
"upscaling_resize": 2,
"upscaling_resize_w": 512,
"upscaling_resize_h": 1024,
"upscaling_crop": True,

47
slash_commands.py Normal file
View File

@ -0,0 +1,47 @@
import discord
from discord import app_commands
from discord.ext import commands
import os
import asyncio
try:
with open("token.secret", 'r') as f:
discord_client_token = f.read()
except FileNotFoundError:
print("Cannot locate token.secret please generate a token")
exit(69)
class aclient(commands.Bot):
def __init__(self):
activity = discord.Activity(type=discord.ActivityType.listening, name="Waiting for your prompt :3 (Main API)")
intents = discord.Intents.default()
intents.message_content = True
super().__init__(intents=intents, command_prefix="!", activity=activity, help_command=commands.DefaultHelpCommand())
self.synced = False
async def load(self):
for filename in os.listdir("./commands"):
if filename.endswith(".py"):
print(f"Loading commands.{filename[:-3]}")
await self.load_extension(f"commands.{filename[:-3]}")
async def on_ready(self):
await self.wait_until_ready()
if not self.synced:
await tree.sync(guild=discord.Object(id=775297479783874570))
self.synced = True
print(f"We have logged in as {self.user}.")
pass
client = aclient()
tree = client.tree
async def main():
await client.load()
await client.start(discord_client_token)
# client.run(discord_client_token)
asyncio.run(main())