A szurubooru discord bot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

131 lines
4.4 KiB

from aiohttp_requests import requests
from config import REVERSE_SEARCH_CONFIG
import requests as fuck_aiohttp
import asyncio
import functools
import time
from datetime import datetime
from fake_useragent import UserAgent
import logging
logger = logging.getLogger(__name__)
last_call = datetime.now()
categories = ['default', 'artist', '???', 'copyright', 'character', 'species']
ua = UserAgent()
user_agent = {
"User-Agent": "BooruBot/1.0 (by DiamondsDroog on e621)"
}
DANBOORU_USERNAME = REVERSE_SEARCH_CONFIG["danbooru_username"]
DANBOORU_API_KEY = REVERSE_SEARCH_CONFIG["danbooru_api_key"]
async def get_category_danbooru(tag_name):
url = "https://danbooru.donmai.us/tags.json"
loop = asyncio.get_event_loop()
r = await loop.run_in_executor(None, functools.partial(fuck_aiohttp.get, url, params={"search[name_matches]": tag_name}, auth=(DANBOORU_USERNAME, DANBOORU_API_KEY)))
try:
for cat in r.json():
return cat['category']
except: # pragma: no cover
pass
return 0
async def get_category_e621(tag_name):
url = "https://e621.net/tag/index.json"
r = await requests.get(url, headers=user_agent, params={"name":tag_name})
try:
for cat in await r.json():
return cat['type']
except: # pragma: no cover
pass
return 0
async def reverse_categorize_tag(tag) -> dict:
"""Reverse looks up a tag resource to get a category."""
global last_call
while (datetime.now() - last_call).total_seconds() < 1: # pragma: no cover
time.sleep(0.1)
last_call = datetime.now()
# Get category
name = tag['names'][0]
cat_dan = await get_category_danbooru(name)
cat_e621 = await get_category_e621(name)
# Skip if both default
if(cat_dan == 0 and cat_e621 == 0):
logger.info(f"Processed tag: {tag['names']} (no category found)")
return tag
if(cat_dan == 0):
cat = categories[cat_e621]
else:
cat = categories[cat_dan]
tag['category'] = cat
logger.info(f"Processed tag: {tag['names']} (category:{tag['category']})")
return tag
async def get_implications_danbooru(tag_name):
implications = []
loop = asyncio.get_event_loop()
url = "https://danbooru.donmai.us/tag_implications.json"
r = await loop.run_in_executor(None, functools.partial(fuck_aiohttp.get, url=url, params={"search[antecedent_name]": tag_name}, auth=(DANBOORU_USERNAME, DANBOORU_API_KEY)))
try:
for imp in r.json():
implications.append(imp['consequent_name'])
except: # pragma: no cover
pass
return implications
async def get_implications_e621(tag_name):
implications = []
url = "https://e621.net/tag_implication/index.json"
request_obtain = await requests.get(url, headers=user_agent, params={'query': tag_name, 'approved': 'true'})
try:
for imp in await request_obtain.json():
tag_api = await requests.get("https://e621.net/tag/show.json", headers=user_agent, params={'id': imp['consequent_id']})
tag_json = await tag_api.json()
implications.append(tag_json["name"])
except: # pragma: no cover
pass
return implications
async def reverse_tag_implications(tag) -> dict:
"""Reverse looks up a tag resource to get the implications for the tag."""
implications = []
for name in tag["names"]:
global last_call
while (datetime.now() - last_call).total_seconds() < 1: # pragma: no cover
time.sleep(0.1)
last_call = datetime.now()
implications += await get_implications_danbooru(name)
implications += await get_implications_e621(name)
for implication in implications:
if implication not in tag["implications"]:
tag["implications"].append(implication)
if tag["implications"]:
logger.warn(f"Processed tag: {tag['names']} (implications: {tag['implications']})")
else:
logger.warn(f"Processed tag: {tag['names']} (no implications)")
return tag
async def reverse_alias_tag(tag) -> list:
aliases = []
for name in tag["names"]:
global last_call
while (datetime.now() - last_call).total_seconds() < 1: # pragma: no cover
time.sleep(0.1)
last_call = datetime.now()
# TODO: Implement these methods
# aliases += await get_aliases_danbooru(name)
# aliases += await get_implications_e621(name)
logger.warn(f"Processed tag: {tag['names']}")
return aliases