MarkdownPreviewer/Server/MarkdownPreviewer/server.py
2025-08-28 20:18:42 +02:00

403 lines
12 KiB
Python

from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler, StaticFileHandler
from tornado.websocket import WebSocketHandler
from tornado.template import Loader
import sys
import asyncio
import msgpack
import json
import re
import tempfile
import subprocess
import os
import time
from datetime import datetime
from .render_pipeline import RenderPipeline, MultiCallback, CallbackClass
from .pandoc import Pandoc, Div, Span, Header, Attr, Plain, PString, Inline, Image
blockquote_re = re.compile('!\\[([a-z]+)\\]')
panel_content = Attr("", classes=["panel-content"])
header_attr = Attr("", classes=["panel-title"])
warning_icon = Attr("", classes=["fas", "fa-exclamation-triangle", "panel-icon"])
information_icon = Attr("", classes=["fas", "fa-info-circle", "panel-icon"])
header_title_attr = Attr("", classes=["panel-header"])
outer_attr = Attr("", classes=["panel", "panel-warning"])
outer_info_attr = Attr("", classes=["panel", "panel-info"])
def blockquote_filter(block):
data = block['c']
if data[0]['t'] != 'Para':
return None
paragraph_content = data[0]['c']
if paragraph_content[0]['t'] != 'Str':
return None
string_content = paragraph_content[0]['c']
m = blockquote_re.match(string_content)
if m is None:
return None
return m.group(1)
def create_warning(content):
internal_content = content['c']
internal_content[0]['c'].pop(0)
internal_content[0]['c'].pop(0)
content_div = Div(panel_content, internal_content).toJson()
label = PString("Warning").toJson()
header = Header(header_attr, 3, [label]).toJson()
icon = Plain(Span(warning_icon, []).toJson()).toJson()
header_div = Div(header_title_attr,
[
icon,
header
]).toJson()
outer_div = Div(outer_attr,
[
header_div,
content_div
]).toJson()
return outer_div
def create_information(content):
internal_content = content['c']
internal_content[0]['c'].pop(0)
internal_content[0]['c'].pop(0)
content_div = Div(panel_content, internal_content).toJson()
label = PString("Information").toJson()
header = Header(header_attr, 3, [label]).toJson()
icon = Plain(Span(information_icon, []).toJson()).toJson()
header_div = Div(header_title_attr,
[
icon,
header
]).toJson()
outer_div = Div(outer_info_attr,
[
header_div,
content_div
]).toJson()
return outer_div
def parse_title(content):
if content['t'] == 'MetaString':
data = content['c']
elif content['t'] == 'MetaInlines':
data = ""
for inline in content['c']:
if inline['t'] == 'Str':
data += inline['c']
elif inline['t'] == 'Space':
data += " "
Publisher.publish("title", data)
def parse_course(content):
if content['t'] == 'MetaString':
data = content['c']
elif content['t'] == 'MetaInlines':
data = ""
for inline in content['c']:
if inline['t'] == 'Str':
data += inline['c']
elif inline['t'] == 'Space':
data += " "
Publisher.publish("course", data)
def parse_date(content):
if content['t'] == 'MetaString':
data = content['c']
elif content['t'] == 'MetaInlines':
data = ""
for inline in content['c']:
if inline['t'] == 'Str':
data += inline['c']
elif inline['t'] == 'Space':
data += " "
Publisher.publish("date", data)
class Theorem(CallbackClass):
def __init__(self):
self.counter = 1
def __call__(self, content):
internal_content = content['c']
internal_content[0]['c'].pop(0)
internal_content[0]['c'].pop(0)
outer_div_attr = Attr("", classes=["theorem"])
inner_div_attr = Attr("", classes=["theorem-content"])
bold_attr = Attr("", classes=["theorem-title"])
span_attr = Attr("")
theorem_string = "Theorem {}. ".format(self.counter)
title_content = Inline("Emph", [PString(theorem_string).toJson()]).toJson()
title = Span(span_attr, [title_content]).toJson()
internal_content[0]['c'].insert(0, title)
content_div = Div(inner_div_attr, internal_content).toJson()
outer_div = Div(outer_div_attr, [content_div]).toJson()
self.counter += 1
return outer_div
def clear(self):
self.counter = 1
class Proof(CallbackClass):
def __init__(self):
self.counter = 1
def clear(self):
self.counter = 1
def __call__(self, content):
internal_content = content['c']
internal_content[0]['c'].pop(0)
internal_content[0]['c'].pop(0)
outer_div_attr = Attr("", classes=["proof"])
inner_div_attr = Attr("", classes=["proof-content"])
span_attr = Attr("", classes=["proof-title"])
qed_attr = Attr("", classes=["proof-qed"])
proof_string = "Proof {}. ".format(self.counter)
title_content = Span(span_attr, [PString(proof_string).toJson()]).toJson()
title = Inline("Emph", [title_content]).toJson()
internal_content[0]['c'].insert(0, title)
qed_string = Plain(PString("").toJson()).toJson()
qed = Div(qed_attr, [qed_string]).toJson()
inner_div = Div(inner_div_attr, internal_content).toJson()
outer_div = Div(outer_div_attr, [inner_div, qed]).toJson()
return outer_div
class Penrose(CallbackClass):
def __init__(self, base_path):
self.data_path = base_path + "/data/penrose/"
def run(self, input_file_name, domain, style):
domain_path = self.data_path + domain + ".domain"
style_path = self.data_path + domain + ".style"
return subprocess.run(
["roger", "trio", input_file_name, domain_path, style_path, '--path', "/"],
text=True,
capture_output=True)
def __call__(self, content):
handle, file_path = tempfile.mkstemp(suffix=".substance", text=True)
text = content['c'][1]
with os.fdopen(handle, 'w') as f:
f.write(text)
data = self.run(file_path, "set", "set")
handle, file_path = tempfile.mkstemp(suffix=".svg", text=True)
with os.fdopen(handle, 'w') as f:
f.write(data.stdout)
img_attr = Attr("")
new_content = Image(img_attr, [{'t' : 'Str', 'c' : 'Penrose'}], "/generated/{}".format(file_path[5:])).toJson()
wrapper = Plain(new_content).toJson()
return wrapper
class Publisher:
topics = dict()
@classmethod
def publish(cls, topic, content):
if topic in Publisher.topics:
for client in Publisher.topics[topic]:
client["callback"](content)
@classmethod
def subscribe(cls, id, topic, callback):
if topic in Publisher.topics:
Publisher.topics[topic].append({"id":id, "callback":callback})
else:
Publisher.topics[topic] = [{"id":id, "callback":callback}]
class MainBodyHandler(RequestHandler):
body = ""
title = ""
date = ""
course = ""
@classmethod
def update_body(cls, content):
MainBodyHandler.body = content
@classmethod
def update_title(cls, content):
MainBodyHandler.title = content
@classmethod
def update_date(cls, content):
MainBodyHandler.date = content
@classmethod
def update_course(cls, content):
MainBodyHandler.course = content
def initialize(self, loader):
self.loader = loader
self.template = loader.load("index.html")
def get(self):
self.write(
self.template.generate(body_content=MainBodyHandler.body,
title=MainBodyHandler.title,
date=MainBodyHandler.date,
course=MainBodyHandler.course)
)
websockets = []
class PushPull(WebSocketHandler):
def check_origin(self, origin):
return True
@classmethod
def update_body(cls, content):
for socket in websockets:
socket.write_message({"show" : content})
@classmethod
def update_title(cls, content):
for socket in websockets:
socket.write_message({"title" : content})
@classmethod
def update_date(cls, content):
for socket in websockets:
socket.write_message({"date" : content})
@classmethod
def update_course(cls, content):
for socket in websockets:
socket.write_message({"course" : content})
@classmethod
def update_scroll(cls, content):
for socket in websockets:
socket.write_message({"scroll" : content[0]/float(content[1])})
def open(self):
if self not in websockets:
websockets.append(self)
def on_message(self, message):
print(message)
def on_close(self):
if self in websockets:
websockets.remove(self)
def on_stdin(fd, pipeline):
res = fd.read(4)
size = res[3] + (res[2]<<8) + (res[1]<<16) + (res[0]<<24)
read_data = fd.read(size)
data = msgpack.unpackb(read_data)
for key,value in data.items():
if key == "show":
Publisher.publish(key, pipeline(value))
else:
Publisher.publish(key, value)
def route_handler(base_path, loader):
return [
(r"/", MainBodyHandler, {"loader" : loader}),
(r"/ws", PushPull),
(r"/css/(.*)", StaticFileHandler, {"path" : r"{}/data/css".format(base_path)}),
(r"/lib/(.*)", StaticFileHandler, {"path" : r"{}/data/lib".format(base_path)}),
(r"/generated/(.*)", StaticFileHandler, {"path" : r"/tmp"})
]
def codeblock_filter(block):
return block['c'][0][1][0]
async def main(base_path):
loader = Loader("{}/template".format(base_path))
pipeline = RenderPipeline()
pipeline.AddMetaCallback('title', parse_title)
pipeline.AddMetaCallback('course', parse_course)
pipeline.AddMetaCallback('date', parse_date)
blockquote = MultiCallback(blockquote_filter)
blockquote.AddCallback('warning', create_warning)
blockquote.AddCallback('information', create_information)
theo = Theorem()
proo = Proof()
blockquote.AddCallback('theorem', theo)
blockquote.AddCallback('proof', proo)
codeblock = MultiCallback(codeblock_filter)
pen = Penrose(base_path)
codeblock.AddCallback("penrose", pen)
pipeline.AddCallback('BlockQuote', blockquote, replace=True)
pipeline.AddCallback('CodeBlock', codeblock, replace=True)
loop = asyncio.get_event_loop()
fd = open(sys.stdin.fileno(), 'rb', buffering=0)
loop.add_reader(fd, on_stdin, fd, pipeline)
application = Application(route_handler(base_path, loader))
Publisher.subscribe("MainbodyHandler", "show", MainBodyHandler.update_body)
Publisher.subscribe("MainbodyHandler", "title", MainBodyHandler.update_title)
Publisher.subscribe("MainbodyHandler", "course", MainBodyHandler.update_course)
Publisher.subscribe("MainbodyHandler", "date", MainBodyHandler.update_date)
Publisher.subscribe("PushPull", "show", PushPull.update_body)
Publisher.subscribe("PushPull", "title", PushPull.update_title)
Publisher.subscribe("PushPull", "course", PushPull.update_course)
Publisher.subscribe("PushPull", "date", PushPull.update_date)
Publisher.subscribe("PushPull", "scroll", PushPull.update_scroll)
application.listen(8888)
await asyncio.Event().wait()