Jump to content

Wiki Bot Example: Difference between revisions

From Photonicamp Wiki
Created page with "This code is an example of how to automate wiki page creation. It also features integration with Perplexity AI-Powered web search to generate instrument pages base text.<syntaxhighlight lang="python" line="1"> ''' This script scrapes info of all equipments in the old LCO database, including pictures and manuals, uses AI to generate a summary about each equipment using web searches to improve accuracy, and then creates a wiki article about it. Created by pfjarschel Dec..."
 
No edit summary
 
(One intermediate revision by the same user not shown)
Line 437: Line 437:


# Main definitions
# Main definitions
lco_url = "http://143.106.153.11/LCOSys"
lco_url = "http://beltza.ifi.unicamp.br/LCOSys"
wiki_type = "prod"  # test or prod
wiki_type = "prod"  # test or prod
test_on_random_id = False  # To test on a random equipment. Overrides id set on test_id.
test_on_random_id = False  # To test on a random equipment. Overrides id set on test_id.
Line 464: Line 464:
     credsfile = "photonbot.txt"
     credsfile = "photonbot.txt"
else:
else:
     wiki_url = "http://143.106.153.11/media_wiki_demo/api.php"
     wiki_url = "http://beltza.ifi.unicamp.br/media_wiki_demo/api.php"
     credsfile = "testbot.txt"
     credsfile = "testbot.txt"
with open(credsfile, 'r', encoding='utf-8') as file:
with open(credsfile, 'r', encoding='utf-8') as file:

Latest revision as of 19:35, 3 February 2025

This code is an example of how to automate wiki page creation.

It also features integration with Perplexity AI-Powered web search to generate instrument pages base text.

'''
This script scrapes info of all equipments in the old LCO database,
including pictures and manuals, uses AI to generate a summary about each
equipment using web searches to improve accuracy, and then creates a
wiki article about it.

Created by pfjarschel
Dec 2024  
'''

# Basic imports
import os
import time
import numpy as np  # For random number generation
import requests

# Specific imports
import pandoc
from PIL import Image
from openai import OpenAI

# To try and suppress some useless warnings
import warnings

# Suppress https warnings
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

# This file's path
main_directory = os.path.dirname(os.path.abspath(__file__)).replace("\\", "/")


class ScrapedEquip:
    '''
    This class simply holds all the information found on a certain equipment
    '''
    
    raw = requests.Response()
    id = "LCO000000"
    inst_type = ""
    name = "Brand Model"
    status = "OK"
    location = ""
    serial = ""
    patrimonio = ""
    date = ""
    keywords = ""
    summary = ""
    summary_citations = []
    img_url = ""
    img_file = ""


class PPLX:
    '''
    This class is the interface to Perplexity's API
    A valid API key is needed ¯\_(ツ)_/¯
    '''
    
    def __init__(self, key):
        self.key = key
        self.url = "https://api.perplexity.ai"
        self.client = OpenAI(api_key=self.key, base_url=self.url)
        
    def build_msgs(self, msg, system_msg="") -> list:
        msgs = [
            {
                "role": "system",
                "content": (system_msg)
            },
            {
                "role": "user",
                "content": (msg)
            }
        ]
        
        return msgs

    def request(self, msg: str, system_msg="", model="llama-3.1-sonar-small-128k-online"):
        msgs = self.build_msgs(msg, system_msg)
        citations = []
        response = self.client.chat.completions.create(
            model=model,
            messages=msgs,
        )
        reply = response.choices[0].message.content
        
        try:
            for cit in response.citations:
                if not cit in citations:
                    citations.append(cit)
        except:
            pass
        
        return reply, response, citations


class Scraper:
    '''
    This class is responsible for all scraping operations
    '''
    
    def __init__(self, lco_url: str, pplx: PPLX):
        self.lco_url = lco_url
        self.lco_equips = f"{lco_url}/ubrowse.php"
        self.dl_folder = f"{main_directory}/scraped_files"
        self.imgs_folder = f"{self.dl_folder}/imgs"
        self.others_folder = f"{self.dl_folder}/other"
        self.equips_ids = []
        self.pplx = pplx
        
        if not os.path.isdir(self.dl_folder):
            os.makedirs(self.dl_folder)
        if not os.path.isdir(self.imgs_folder):
            os.makedirs(self.imgs_folder)
        if not os.path.isdir(self.others_folder):
            os.makedirs(self.others_folder)
    
    def get_equips_id_list(self) -> list:
        equips_raw = requests.get(self.lco_equips)
        self.equips_ids = []
        if equips_raw.status_code == 200:
            try:
                equips_list_raw = equips_raw.text.split("<a href='equip.php?id=")
                for span in equips_list_raw:
                    if span[:3] == "LCO":
                        self.equips_ids.append(span[:9])
            except:
                print("Error parsing data. are you sure the site is up and the ID is correct?")
        
        return self.equips_ids

    def get_equip_data(self, id: str, print_data=False) -> ScrapedEquip:
        equip = ScrapedEquip()
        equip.id = id
        equip_link = f"{self.lco_url}/equip.php?id={equip.id}"

        equip.raw = requests.get(equip_link, verify=False)
        if equip.raw.status_code == 200:
            equip.raw.encoding = 'utf-8'
            
            # Get basic information
            try:
                equip.inst_type = equip.raw.text.split("Tipo: </td><td>")[1].split("</td></tr>")[0].split(": ")[1]
                equip.name = equip.raw.text.split("Nome Completo: </td><td>")[1].split("</td></tr>")[0]
                equip.status = equip.raw.text.split("Status: </td>")[1].split("<span")[1].split(";'>")[1].split("</span>")[0]
                equip.location = equip.raw.text.split("Local: </td><td>")[1].split("</td></tr>")[0]
                equip.serial = equip.raw.text.split("Serial: </td><td>")[1].split("</td></tr>")[0]
                equip.patrimonio = equip.raw.text.split("Patrimônio: </td><td>")[1].split("</td></tr>")[0]
                equip.date = equip.raw.text.split("Data de Entrada: </td><td>")[1].split("</td></tr>")[0]
                equip.keywords = equip.raw.text.split("Keywords: </td><td>")[1].split("</td></tr>")[0]
                
                try:
                    imgurl_split_l = "<image src='"
                    imgurl_split_r = "'"
                    equip.img_url = f"{self.lco_url}/{equip.raw.text.split(imgurl_split_l)[1].split(imgurl_split_r)[0]}"
                except:
                    pass
            except:
                print("Error parsing some data. are you sure the site is up and the ID is correct?")
        
        if print_data:
            print(equip.id)
            print(equip.inst_type)
            print(equip.name)
            print(equip.status)
            print(equip.location)
            print(equip.serial)
            print(equip.patrimonio)
            print(equip.date)
            print(equip.keywords)
            print(equip.img_url)
        
        return equip

    def save_equip_img(self, equip: ScrapedEquip):
        if equip.img_url != "":
            try:
                response = requests.get(equip.img_url)
                if response.status_code == 200:
                    filename = f"{self.imgs_folder}/{equip.id}"
                    if "jpg" in equip.img_url.lower() or "jpeg" in equip.img_url:
                        filename = f"{filename}.jpg"
                    elif "png" in equip.img_url.lower():
                        filename = f"{filename}.png"
                    equip.img_file = filename.split("/")[-1]
                    if not os.path.isfile(filename):
                        with open(filename, 'wb') as file:
                            file.write(response.content)
                        print(f"Image downloaded as: {filename}")
                    else:
                        print(f"Image {filename} already exists.")
                else:
                    print("Failed to download image")
            except:
                pass

    def create_ai_summary(self, equip: ScrapedEquip, wiki_conv=True):
        model = "llama-3.1-sonar-huge-128k-online"
        system_msg = "You are a technology test and measurement expert, a brilliant scientist that knows a lot about scientific experiments, " + \
                    "especially in the fields of electronics, photonics, optics, and telecommunications, " + \
                    "and also has access to all kowledge available in the internet." + \
                    "Your job is to search the internet and find information about the equipments and instruments that are going to be input in the prompt." + \
                    "There are going to be some keywords that can help and the type of instrument it is, that can contain some typos or weird characters " + \
                    "due to text encoding. Ignore them if it harms more than help." + \
                    "Summarize all the information you can find, but in a way that can be useful for students and researchers to quickly know what that " + \
                    "equipment or instrument can and cannot do, and on which scenarios it could be best used." + \
                    "Always reply in english, and only the summary of your findings, please ommit interactions with the user."              
        prompt = f"Please find information and summarize it for me, about {equip.name}. It should be of the type {equip.inst_type}. " + \
                 f"Some keywords that may or may not help: {equip.keywords}. Try to format the summary on a way that will look good in a wiki page." + \
                 f"Also please, create a References section at the end of the reply, with all the info about the sources used, even if it would be empty! " + \
                 f"Some information here may be in portuguese, use it, but please write everything in english." 

        try:
            print(f"Getting AI summary for {equip.name}. This can take a little while...")
            reply, response, citations = self.pplx.request(prompt, system_msg, model)
            print(f"AI summary request for {equip.name} completed successfully.")
        except Exception as e:
            print(f"Error getting AI summary for {equip.name}.")
            print(e)
        
        if wiki_conv:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                md_doc = pandoc.read(reply, format='markdown')
                reply = pandoc.write(md_doc, format='mediawiki').replace("===", "==")
        
        equip.summary = reply
        equip.summary_citations = citations

    def get_lorem_ipsum_filler(ps = 5) -> str:
        raw = requests.get("https://loremipsum.io/generator?n=5&t=p").text.split('id="text"')[1].split("</div>")[0]
        paragraphs = raw.replace("</p>", "").replace("<p>", "\n").replace("<", "").replace(">", "")
        return paragraphs
    
    def scrape_equip(self, id: str, ai_summary=False) -> ScrapedEquip:
        equip = self.get_equip_data(id)
        self.save_equip_img(equip)
        if ai_summary:
            self.create_ai_summary(equip, True)
        else:
            equip.summary = self.get_lorem_ipsum_filler()
            
        return equip
  

class WikiAPI:
    '''
    This class is the interface to MediaWiki Action API
    '''
    
    def __init__(self, url, user, passwd):
        self.url = url
        self.csrf_token = ""
        self.session = requests.Session()
        
        self.login(user, passwd)
        
    def login(self, user, passwd) -> str:        
        try:
            PARAMS_0 = {
                "action": "query",
                "meta": "tokens",
                "type": "login",
                "format": "json"
            }
            R = self.session.get(url=self.url, params=PARAMS_0, verify=False)
            DATA = R.json()
            LOGIN_TOKEN = DATA['query']['tokens']['logintoken']

            PARAMS_1 = {
                "action": "login",
                "lgname": user,
                "lgpassword": passwd,
                "lgtoken": LOGIN_TOKEN,
                "format": "json"
            }
            R = self.session.post(url=self.url, data=PARAMS_1, verify=False)

            PARAMS_2 = {
                "action": "query",
                "meta": "tokens",
                "format": "json"
            }
            R = self.session.get(url=self.url, params=PARAMS_2, verify=False)
            DATA = R.json()
            self.csrf_token = DATA['query']['tokens']['csrftoken']
        except Exception as e:
            print("Error logging in:")
            print(e)
            
    def send_edit(self, params) -> bool:
        R = self.session.post(self.url, data=params)
        DATA = R.json()
        try:
            return DATA['edit']['result'] == 'Success'
        except Exception as e:
            try:
                if DATA['error']['code'] == 'articleexists':
                    print("Page already exists, moving to override sections")
            except Exception as e2:
                print(e2)
                print(DATA)
            return False

    def create_page(self, name: str, text: str) -> bool:
        PARAMS = {
            "action": "edit",
            "title": name,
            "token": self.csrf_token,
            "format": "json",
            "text": text,
            "createonly": "true"
        }

        return self.send_edit(PARAMS)
        
    def edit_page(self, name: str, text: str, section = 0, new_section = False, section_title="") -> bool:
        if new_section:
            section = "new"
        elif section != 0:
            text = f"== {section_title} ==\n{text}"

        PARAMS = {
            "action": "edit",
            "title": name,
            "token": self.csrf_token,
            "format": "json",
            "section": section,
            "sectiontitle": section_title,
            "text": text,
            "nocreate": "true"
        }

        return self.send_edit(PARAMS)
        
    def prepend_to_page(self, name: str, text: str, section = 0) -> bool:
        PARAMS = {
            "action": "edit",
            "title": name,
            "token": self.csrf_token,
            "format": "json",
            "section": section,
            "prependtext": text,
            "nocreate": "true"
        }

        return self.send_edit(PARAMS)
        
    def append_to_page(self, name: str, text: str, section = 0) -> bool:
        PARAMS = {
            "action": "edit",
            "title": name,
            "token": self.csrf_token,
            "format": "json",
            "section": section,
            "appendtext": text,
            "nocreate": "true"
        }

        return self.send_edit(PARAMS)
        
    def upload_file(self, file_path: str, comment="", text="") -> bool:
        name = file_path.replace("\\", "/").split("/")[-1]
        text = text.encode('utf-8')
        PARAMS = {
            "action": "upload",
            "filename": name,
            "comment": comment,
            "text": text,
            "format": "json",
            "token": self.csrf_token,
            "ignorewarnings": 0
        }

        FILE = {'file':(name, open(file_path, 'rb'), 'multipart/form-data')}

        try:
            R = self.session.post(self.url, files=FILE, data=PARAMS, verify=False)
            DATA = R.json()
            return DATA['upload']['result'] == 'Success'
        except Exception as e:
            print(e)
            return False
        
def create_equip_wiki(equip: ScrapedEquip, abort_if_exists=True, stages_to_override=10*[False]):
    # Create page and add main text
    page_created = wiki.create_page(equip.name, "")
    if page_created or (not abort_if_exists):       
        # Set main text
        s = 0
        if page_created or (not page_created and stages_to_override[s]):
            wiki.edit_page(equip.name, equip.summary)

        # Add other scraped info
        s = 1
        other_text = f"* '''Status''': {equip.status}\n" + \
                     f"* '''Last known location''': {equip.location}\n" + \
                     f"* '''Serial''': {equip.serial}\n" + \
                     f"* '''Inventory ID''': {equip.patrimonio}\n" + \
                     f"* '''Arrival Date''': {equip.date}\n" + \
                     f"* '''Keywords''': {equip.keywords}\n" + \
                     f"* '''Old LCO ID''': {equip.id}\n" + \
                     f"* '''Old LCO Category''': {equip.inst_type}\n"
        if page_created or (not page_created and stages_to_override[s]):
            wiki.edit_page(equip.name, text=other_text.replace("  ", ""), new_section=True, section_title="Other info")
        
        # Upload Image
        s = 2
        if page_created or (not page_created and stages_to_override[s]):
            img_file = f"{scraper.imgs_folder}/{equip.img_file}"
            
            # Resize image to 2160 px high (4K)
            target_height = 2160
            img = Image.open(img_file)
            if img.size[1] > target_height:
                height_percent = (target_height / float(img.size[1]))
                target_width = int((float(img.size[0]) * float(height_percent)))
                resized_image = img.resize((target_width, target_height), Image.Resampling.LANCZOS)
                resized_image.save(img_file)
            
            wiki.upload_file(img_file, comment=f"Photo of {equip.name}", text=f"Photo of {equip.name}")
        
        # Place image in text
        s = 3
        if page_created or (not page_created and stages_to_override[s]):
            img_text = f"[[File:{equip.img_file}|alt={equip.name}|thumb|{equip.name}]]"
            wiki.prepend_to_page(equip.name, text=img_text, section=0)
        
        # Set Category
        s = 4
        if page_created or (not page_created and stages_to_override[s]):
            wiki.append_to_page(equip.name, "\n\n[[Category:Instruments]]")

# Main definitions
lco_url = "http://beltza.ifi.unicamp.br/LCOSys"
wiki_type = "prod"  # test or prod
test_on_random_id = False  # To test on a random equipment. Overrides id set on test_id.
test_id = "LCO110101"
ai_summary = True
abort_if_exists = False
overrides = [False, False, False, False, False]  # [Main text, Extra text, Image upload, Image in text, Category set]

# Enable this to run for many equipments
# To do ALL, set start_i to 0, and end_i to -1
DO_MANY = True
start_i = 100
end_i = -1

# Connect to Perplexity
with open("pplx_api.txt", 'r', encoding='utf-8') as file:
    pplx_key = file.readline()
pplx = PPLX(pplx_key)

# Create Scraper
scraper = Scraper(lco_url, pplx)

# Switch between test and prod Wiki
if wiki_type == "prod":
    wiki_url = "https://photonwiki.ifi.unicamp.br/wiki/api.php"
    credsfile = "photonbot.txt"
else:
    wiki_url = "http://beltza.ifi.unicamp.br/media_wiki_demo/api.php"
    credsfile = "testbot.txt"
with open(credsfile, 'r', encoding='utf-8') as file:
    bot_login = file.readline()[:-1]
    bot_passwd = file.readline()

# Connect to Wiki
wiki = WikiAPI(wiki_url, bot_login, bot_passwd)

# Get equipment id list
scraper.get_equips_id_list()
if end_i < 0:
    end_i = len(scraper.equips_ids) - 1
if DO_MANY:
    n_equips = end_i - start_i + 1
else:
    n_equips = 1
    
if not DO_MANY:
    # Specify equip to test, or try a random one
    if test_on_random_id:
        id_idx = np.random.randint(0, len(scraper.equips_ids))
        test_id = scraper.equips_ids[id_idx]

    # Scrape data for selected equipment(s)
    test_equip = scraper.scrape_equip(test_id, ai_summary=ai_summary)

    # Create Wiki page and fill it
    create_equip_wiki(test_equip, abort_if_exists, overrides)
else:
    # Do the above for the range of equipments selected
    t00 = time.time()
    t0 = time.time()
    for i in range(start_i, end_i + 1):
        equip_id = scraper.equips_ids[i]
        equip = scraper.scrape_equip(equip_id, ai_summary=ai_summary)
        create_equip_wiki(equip, abort_if_exists, overrides)
        perc = (i - start_i + 1)/(n_equips)
        t1 = time.time()
        dt = t1 - t0
        el_t = t1 - t00
        tt = el_t/perc
        rt = tt - el_t
        t0 = time.time()
        print(f"{i - start_i + 1}/{n_equips} pages done ({100.0*perc:.2f}%). {rt:.2f} s left.")