from discord.ext import commands from discord import option import discord import os import get_from_reddit import core_utils import requests class ActualUtils(commands.Cog): def __init__(self, bot): self.bot: commands.Bot = bot @commands.slash_command( guild_ids=None, name="youtube", description="Search youtube for the passed in query", ) @option( "query", description="The search string you want to enter on youtube", input_type="str", requred=True, ) async def youtube(self, ctx, query: str): import re from urllib import parse, request query_string = parse.urlencode({"search_query": query}) html_content = request.urlopen("http://www.youtube.com/results?" + query_string) search_results = re.findall("\/watch\?v=(.{11})", html_content.read().decode()) result = "https://www.youtube.com/watch?v=" + search_results[0] await ctx.defer() await ctx.followup.send(result) @commands.command(name="issue") async def issue(self, ctx: commands.Context): import gitlab await ctx.send(gitlab.parse_message(ctx.message)) @commands.command(name="tts") async def tts(self, ctx: commands.Context): import tts if ctx.message.content.split()[1] == "langs": await ctx.send("Ok {}, check your DMs".format(ctx.message.author.mention)) return await ctx.message.author.send(tts.get_all_langs()) file_path = tts.text_to_speech(ctx.message.content) await ctx.send( file=discord.File( file_path, filename="A Message From {}.mp3".format(ctx.message.author.name), ) ) await ctx.message.delete() os.remove(file_path) @commands.command(name="ask", aliases=["wolfram"]) async def ask(self, ctx: commands.Context, *, query): import questions await ctx.reply( questions.answer_question(query), ) @commands.command(name="openai") async def openai(self, ctx: commands.Context, *, query): if ctx.message.author.id != core_utils.my_id: await ctx.send("Sorry, this is a paid dale-bot feature") return import questions await ctx.reply( questions.open_ai(query), ) @commands.has_role("stable-diffuser") @commands.slash_command( guild_ids=core_utils.my_guilds, name="sd", description="Pass a prompt and optinal negative prompt to stable diffusion", ) @option( "positive_prompt", description="The positive prompt to pass to stable diffusion", input_type="str", requred=True, ) @option( "negative_prompt", description="An options set of negatives you want to pass to stablediffusion", requred=False, ) async def sd( self, ctx: commands.Context, positive_prompt: str, negative_prompt: str ): if ctx.channel.name == "stable-diffusion": import socket import stable_diffusion port = "7860" ip = "192.168.1.80" steps = 20 # Send my requests to my gaming computer with the 3080 (if its up) if ctx.author.id == core_utils.my_id: ip = "192.168.1.188" steps = 60 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) try: s.connect((ip, int(port))) except: ip = "192.168.1.80" try: await ctx.defer() original_message = await ctx.followup.send( "Please be patient, I'm generating your image" ) file_path, time_taken = await stable_diffusion.generate_image( ip=ip, port=port, positives=positive_prompt, negatives=negative_prompt or None, steps=steps, ) await original_message.edit( content=time_taken, file=discord.File( file_path, filename="unknown.png", ), ) os.remove(file_path) except Exception as e: await ctx.reply( "Stable diffusion isnt running right now, sorry.\n%s" % e, ) else: await ctx.respond("You can only do that in the stable-diffusion channel") @commands.has_role("Track day gamers") @commands.slash_command( guild_ids=core_utils.my_guilds, name="trackdays", description="Query motorsportsreg.com for a list of trackdays going on at Buttonwillow and Thunderhill", ) async def trackdays(self, ctx: commands.Context): import trackdays shid = await trackdays.get_msreg() for track, events in shid.items(): embed = discord.Embed( description=":checkered_flag: **Upcoming events at %s**:checkered_flag: " % track, color=0x428BCA, type="rich", ) for track_day in events: embed.add_field( name="-------------\n**Event Name**", value=track_day["event_name"], inline=False, ) embed.add_field( name="Date", value=track_day["event_date"], inline=False ) embed.add_field( name="Event URL", value=track_day["event_url"], inline=False ) await ctx.send(embed=embed) @commands.command(name="corona", aliases=["covid"]) async def corona(self, ctx: commands.Context, *, location=None): import corona async with ctx.message.channel.typing(): result = corona.parse_message(location) await ctx.send(embed=result) @commands.command(name="stock") async def stock(self, ctx: commands.Context): msg = ctx.message.content if len(msg.split()) < 2: import help_methods await ctx.send(help_methods.get_help_message("stock")) import stock results = stock.parse_message(msg) for res in results: await ctx.reply(embed=res) def setup(bot): bot.add_cog(ActualUtils(bot))