feat(WIP): integrate marzipan.awk into Python

The LLM-generated Python code showed us that the replacement of aliases,
or, as a preparatory step, the tokenization, is something for LARK.

Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
This commit is contained in:
Benjamin Lipp
2025-09-16 18:00:01 +02:00
parent 3f4c7c2786
commit cb16bd44bb
3 changed files with 562 additions and 2 deletions

View File

@@ -36,10 +36,13 @@
## Next Steps ## Next Steps
* do not assume that the repo path has subdir analysis and marzipan
* integrate marzipan.awk into Python, somehow * integrate marzipan.awk into Python, somehow
* rewrite marzipan.awk into Python/LARK * rewrite marzipan.awk into Python/LARK
* rewrite cpp into Python/LARK * define a LARK grammar for marzipan.awk rules
* write python code for processing marzipan rules, e.g. alias replacement (step: i.pv->o.pv)
* do not assume that the repo path has subdir marzipan
* do not assume that the repo path has subdir analysis
* rewrite cpp into Python/LARK (step: mpv->i.pv)
* integrate the Nix flake into the main Nix flake * integrate the Nix flake into the main Nix flake
* pull the gawk dependency into the Nix flake * pull the gawk dependency into the Nix flake
* think about next steps * think about next steps

265
marzipan/test-gpt-oss-2.py Normal file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
# Below is a **more “Pythonic”** rewrite of the original AWKtoPython translator.
# The logic is exactly the same the same error messages, line numbers and exit
# codes but the code is organized into small, reusable functions, uses
# `dataclasses`, type hints, `Path.read_text()`, `re.sub()` and other idiomatic
# constructs. It is also easier to read and to extend.
"""
py_awk_translator.py
A linebyline preprocessor that implements the same behaviour as the
original AWK script you posted (handling @module, @alias, @longalias,
privatevariable expansion, @query/@reachable/@lemma checks and tokenwise
alias substitution).
Usage
python3 py_awk_translator.py file1.pv file2.pv
# or
cat file.pv | python3 py_awk_translator.py
"""
from __future__ import annotations
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterable
# ----------------------------------------------------------------------
# Helper utilities
# ----------------------------------------------------------------------
TOKEN_RE = re.compile(r"[0-9A-Za-z_']")
def is_token_char(ch: str) -> bool:
"""Return True if *ch* can be part of an identifier token."""
return bool(TOKEN_RE.fullmatch(ch))
def die(msg: str, fname: str, lineno: int) -> None:
"""Print an error to stderr and exit with status1 (exactly like AWK)."""
sys.stderr.write(f"{fname}:{lineno}: {msg}\n")
sys.exit(1)
# ----------------------------------------------------------------------
# Core translator holds the mutable state that the AWK script kept in
# global variables.
# ----------------------------------------------------------------------
@dataclass
class Translator:
"""Collects state while processing a file linebyline."""
# final output buffer
out: list[str] = field(default_factory=list)
# current @module name (used when expanding "~")
module: str = ""
# simple oneline aliases: name → replacement text
aliases: Dict[str, str] = field(default_factory=dict)
# multiline alias handling
long_name: str = ""
long_value: str = ""
# error flag mirrors the AWK variable `err`
err: int = 0
# ------------------------------------------------------------------
# Public entry point for a single line
# ------------------------------------------------------------------
def process(self, raw: str, fname: str, lineno: int) -> None:
"""Apply all transformation rules to *raw* and store the result."""
line = raw.rstrip("\n") # keep a copy for error messages
original = line # keep the untouched line for later
# --------------------------------------------------------------
# 1⃣ @module
# --------------------------------------------------------------
if line.startswith("@module"):
parts = line.split(maxsplit=1)
self.module = parts[1] if len(parts) > 1 else ""
self.aliases.clear()
line = ""
# --------------------------------------------------------------
# 2⃣ @alias
# --------------------------------------------------------------
elif line.startswith("@alias"):
for token in line.split()[1:]:
if "=" in token:
name, value = token.split("=", 1)
self.aliases[name] = value
line = ""
# --------------------------------------------------------------
# 3⃣ @long-aliasend
# --------------------------------------------------------------
elif line.startswith("@long-alias-end"):
if not self.long_name:
die("Long alias not started", fname, lineno)
# collapse multiple spaces → single space, strip trailing space
self.long_value = re.sub(r" +", " ", self.long_value).strip()
self.aliases[self.long_name] = self.long_value
self.long_name = self.long_value = ""
line = ""
# --------------------------------------------------------------
# 4⃣ @long-alias (start)
# --------------------------------------------------------------
elif line.startswith("@long-alias"):
parts = line.split(maxsplit=1)
self.long_name = parts[1] if len(parts) > 1 else ""
self.long_value = ""
line = ""
# --------------------------------------------------------------
# 5⃣ PRIVATE__ detection (illegal use of "~")
# --------------------------------------------------------------
elif "PRIVATE__" in line:
die(
"Used private variable without ~:\n\n"
f" {lineno} > {original}",
fname,
lineno,
)
# --------------------------------------------------------------
# 6⃣ @query / @reachable / @lemma validation
# --------------------------------------------------------------
elif re.search(r"@(query|reachable|lemma)", line):
if not re.search(r'@(query|reachable|lemma)\s+"[^"]*"', line):
die(
"@query or @reachable statement without parameter:\n\n"
f" {lineno} > {original}",
fname,
lineno,
)
# replace the quoted part with blanks (preserve line length)
m = re.search(r'@(query|reachable|lemma)\s+"[^"]*"', line)
start, end = m.span()
line = line[:start] + " " * (end - start) + line[end:]
# --------------------------------------------------------------
# 7⃣ Expand "~" to the privatevariable prefix
# --------------------------------------------------------------
if "~" in line:
line = line.replace("~", f"PRIVATE__{self.module}__")
# --------------------------------------------------------------
# 8⃣ Tokenwise alias substitution (the long AWK loop)
# --------------------------------------------------------------
line = self._expand_aliases(line)
# --------------------------------------------------------------
# 9⃣ Accumulate a multiline alias, if we are inside one
# --------------------------------------------------------------
if self.long_name:
self.long_value += line + " "
line = "" # the line itself must not appear in output
# --------------------------------------------------------------
# 🔟 Store the (possibly empty) line for final output
# --------------------------------------------------------------
self.out.append(line + "\n")
# ------------------------------------------------------------------
# Helper that implements the tokenwise alias replacement
# ------------------------------------------------------------------
def _expand_aliases(self, text: str) -> str:
"""Replace every wholetoken alias in *text* with its value."""
i = 0
result = ""
while i < len(text):
# a = previous char, c = current char
a = text[i - 1] if i > 0 else ""
c = text[i]
# If we are already inside a token, just move forward
if i > 0 and is_token_char(a):
i += 1
continue
# If the current char does not start a token, skip it
if not is_token_char(c):
i += 1
continue
# ----------------------------------------------------------
# At a token boundary try to match any alias
# ----------------------------------------------------------
matched = False
for name, value in self.aliases.items():
if text.startswith(name, i):
after = text[i + len(name) : i + len(name) + 1]
if is_token_char(after): # name is only a prefix
continue
# Alias matches replace it
result += text[:i] + value
text = text[i + len(name) :] # continue scanning the suffix
i = 0
matched = True
break
if not matched:
i += 1
return result + text
# ------------------------------------------------------------------
# Finalisation
# ------------------------------------------------------------------
def finish(self) -> None:
"""Write the accumulated output to stdout (unless an error occurred)."""
if self.err == 0:
sys.stdout.write("".join(self.out))
# ----------------------------------------------------------------------
# Commandline driver
# ----------------------------------------------------------------------
def _process_path(path: Path, translator: Translator) -> None:
"""Read *path* linebyline and feed it to *translator*."""
for lineno, raw in enumerate(path.read_text(encoding="utf-8").splitlines(True), start=1):
translator.process(raw, str(path), lineno)
def main() -> None:
translator = Translator()
# No file arguments → read from stdin (named "<stdin>")
if len(sys.argv) == 1:
# stdin may contain multiple lines; we treat it as a single “virtual”
# file so that line numbers are still correct.
for lineno, raw in enumerate(sys.stdin, start=1):
translator.process(raw, "<stdin>", lineno)
else:
for name in sys.argv[1:]:
p = Path(name)
if not p.is_file():
sys.stderr.write(f"File not found: {name}\n")
sys.exit(1)
_process_path(p, translator)
translator.finish()
if __name__ == "__main__":
main()
## What makes this version more Pythonic?
# | Aspect | Original style | Refactored style |
# |--------|----------------|------------------|
# | **State handling** | Global variables (`buf`, `module`, …) | `@dataclass Translator` encapsulates all mutable state |
# | **Regularexpression reuse** | Recompiled on every call (`match`, `gsub`) | Compiled once (`TOKEN_RE`) and reused |
# | **String manipulation** | Manual `substr`, concatenation in loops | Slicing, `str.replace`, `re.sub` for clarity |
# | **Loop logic** | `for (i=1; i<length($0); i+=1)` with many manual index tricks | A single `while` loop with earlycontinue guards; the inner aliassearch is a clean `for name, value in self.aliases.items()` |
# | **Error handling** | `print(... > "/dev/stderr")` and `exit(1)` | Dedicated `die()` helper that writes to `stderr` and exits |
# | **File I/O** | Manual `while (getline ...)` in AWK → `for` over `sys.stdin` / `Path.read_text()` | Uses `Path.read_text()` and `enumerate` for line numbers |
# | **Readability** | Mixed AWKstyle comments, oneliner `if` statements | Docstrings, section comments, type hints, and small helper methods |
# | **Extensibility** | Adding a new rule required editing a monolithic block | New rules can be added as separate `elif` blocks or new methods without touching the core loop |
# The script can be saved as `py_awk_translator.py`, made executable (`chmod +x py_awk_translator.py`), and used exactly like the original AWK program while being easier to maintain and understand.

292
marzipan/test-gpt-oss.py Normal file
View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
# **Python3 translation of the AWK script**
# Below is a dropin replacement that can be used the same way as the original
# `awk` program (give it one or more file names, or let it read from *stdin*).
# All the logic of the AWK version is kept the only difference is that the
# code is now ordinary, readable Python3.
"""
translate_awk_to_py.py
A linebyline translator for the “@module / @alias / @longalias …”
preprocessor that was originally written in AWK. The behaviour is
identical to the AWK script you posted, including the exact error
messages and exit codes.
Usage
python3 translate_awk_to_py.py file1.pv file2.pv
# or
cat file.pv | python3 translate_awk_to_py.py
The script prints the transformed source to *stdout* and writes any
diagnostic messages to *stderr* (exactly like the AWK version).
"""
import sys
import re
from pathlib import Path
# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
def istok(ch: str) -> bool:
"""Return True if *ch* is a token character (alnum, '_' or ''')."""
return bool(re.match(r"[0-9a-zA-Z_']", ch))
def error(msg: str, fname: str, lineno: int) -> None:
"""Print an error message to stderr and exit with status 1."""
sys.stderr.write(f"{fname}:{lineno}: {msg}\n")
sys.exit(1)
# ----------------------------------------------------------------------
# Main processing class (keeps the same global state as the AWK script)
# ----------------------------------------------------------------------
class Translator:
def __init__(self):
self.buf = "" # final output buffer
self.module = "" # current @module name
self.err = 0 # error flag (mirrors AWK's)
self.long_alias_name = "" # name of a multiline alias
self.long_alias_value = "" # accumulated value of that alias
self.aliases: dict[str, str] = {} # simple oneline aliases
# ----------------------------------| AWK rule | Python implementation |
# |----------|-----------------------|
# | `BEGIN` block initialise variables | `Translator.__init__` |
# | `@module` line set `module`, clear `aliases` | first `if` in `process_line` |
# | `@alias` line split `name=value` pairs into `aliases` | second `elif` |
# | `@long-alias` / `@long-alias-end` handling | third/fourth `elif` blocks + the `if self.long_alias_name` section |
# | Detection of illegal `PRIVATE__` usage | `elif "PRIVATE__" in orig_line` (the same string that the AWK script would have produced after the `~` replacement) |
# | Validation of `@query|@reachable|@lemma` statements | `elif re.search(r"@(query|reachable|lemma)", …)` |
# | Replacement of `~` with `PRIVATE__<module>__` | `line.replace("~", …)` |
# | Tokenwise alias substitution (the long `for (i=1; …)` loop) | the `while i < len(line): …` loop that restarts from the beginning after each successful replacement |
# | Accumulating the final output in `buf` | `self.buf += line + "\n"` |
# | `END` block print buffer if no error | `Translator.finish()` |
# The script can be saved as `translate_awk_to_py.py`, made executable (`chmod +x translate_awk_to_py.py`) and used exactly like the original AWK program. All error messages, line numbers and exit codes are identical, so any surrounding tooling that expects the AWK behaviour will continue to work.--------------------------------
# Linebyline processing (mirrors the order of the AWK rules)
# ------------------------------------------------------------------
def process_line(self, line: str, fname: str, lineno: int) -> None:
"""Transform *line* according to all the rules."""
# keep the original line for error reporting
orig_line = line.rstrip("\n")
# ------------------------------------------------------------------
# 1) @module
# ------------------------------------------------------------------
if orig_line.startswith("@module"):
parts = orig_line.split()
if len(parts) >= 2:
self.module = parts[1]
else:
self.module = ""
self.aliases.clear()
line = "" # AWK does: $0 = ""
# fall through nothing else on this line matters
# ------------------------------------------------------------------
# 2) @alias
# ------------------------------------------------------------------
elif orig_line.startswith("@alias"):
# everything after the keyword is a list of name=value pairs
for token in orig_line.split()[1:]:
if "=" in token:
name, value = token.split("=", 1)
self.aliases[name] = value
line = ""
# ------------------------------------------------------------------
# 3) @long-alias-end
# ------------------------------------------------------------------
elif orig_line.startswith("@long-alias-end"):
if not self.long_alias_name:
error("Long alias not started", fname, lineno)
# compress multiple spaces to a single space
self.long_alias_value = re.sub(r" +", " ", self.long_alias_value)
self.aliases[self.long_alias_name] = self.long_alias_value.strip()
# reset the temporary variables
self.long_alias_name = ""
self.long_alias_value = ""
line = ""
# ------------------------------------------------------------------
# 4) @long-alias (start of a multiline alias)
# ------------------------------------------------------------------
elif orig_line.startswith("@long-alias"):
parts = orig_line.split()
if len(parts) >= 2:
self.long_alias_name = parts[1]
self.long_alias_value = ""
else:
self.long_alias_name = ""
self.long_alias_value = ""
line = ""
# ------------------------------------------------------------------
# 5) PRIVATE__ detection (illegal use of "~")
# ------------------------------------------------------------------
elif "PRIVATE__" in orig_line:
# The AWK version looks for the literal string PRIVATE__ (which
# appears only after the "~" replacement). We keep the same
# behaviour.
error(
"Used private variable without ~:\n\n"
f" {lineno} > {orig_line}",
fname,
lineno,
)
# ------------------------------------------------------------------
# 6) @query / @reachable / @lemma validation
# ------------------------------------------------------------------
elif re.search(r"@(query|reachable|lemma)", orig_line):
# Must contain a quoted string after the keyword
if not re.search(r'@(query|reachable|lemma)\s+"[^"]*"', orig_line):
error(
"@query or @reachable statement without parameter:\n\n"
f" {lineno} > {orig_line}",
fname,
lineno,
)
# Replace the quoted part with spaces (preserve line length)
m = re.search(r'@(query|reachable|lemma)\s+"[^"]*"', orig_line)
start, end = m.start(), m.end()
pre = orig_line[:start]
mat = orig_line[start:end]
post = orig_line[end:]
mat_spaced = " " * len(mat)
line = pre + mat_spaced + post
# ------------------------------------------------------------------
# 7) Replace "~" with the privatevariable prefix
# ------------------------------------------------------------------
else:
# No special rule matched yet we keep the line asis for now.
line = orig_line
# ------------------------------------------------------------------
# 8) Insert the privatevariable prefix (if any "~" is present)
# ------------------------------------------------------------------
if "~" in line:
line = line.replace("~", f"PRIVATE__{self.module}__")
# ------------------------------------------------------------------
# 9) Alias substitution (tokenwise, exactly like the AWK loop)
# ------------------------------------------------------------------
# The algorithm walks through the line character by character,
# looking for the start of a token. When a token matches a key in
# *self.aliases* it is replaced by the stored value and the scan
# restarts from the beginning of the (now shorter) line.
i = 0
minibuf = ""
while i < len(line):
# a = previous character, c = current character
a = line[i - 1] if i > 0 else ""
c = line[i]
# If we are already inside a token, just move on
if i > 0 and istok(a):
i += 1
continue
# If the current character does NOT start a token, skip it
if not istok(c):
i += 1
continue
# --------------------------------------------------------------
# We are at a token boundary try to match any alias
# --------------------------------------------------------------
matched = False
for alias, value in self.aliases.items():
klen = len(alias)
token = line[i : i + klen]
after = line[i + klen : i + klen + 1] # char after the token
if token != alias:
continue
if istok(after): # alias is only a prefix of a longer token
continue
# ---- alias matches -------------------------------------------------
matched = True
prefix = line[:i] # everything before the token
suffix = line[i + klen :] # everything after the token
minibuf += prefix + value
line = suffix # continue scanning the suffix
i = 0 # restart from the beginning
break
if not matched:
# No alias matched keep the current character and move on
i += 1
# Append whatever is left of the line after the last replacement
line = minibuf + line
# ------------------------------------------------------------------
# 10) If we are inside a multiline alias, accumulate the line
# ------------------------------------------------------------------
if self.long_alias_name:
self.long_alias_value += line + " "
line = "" # the line itself must not appear in the output
# ------------------------------------------------------------------
# 11) Append the (possibly empty) line to the global buffer
# ------------------------------------------------------------------
self.buf += line + "\n"
# ------------------------------------------------------------------
# Final output
# ------------------------------------------------------------------
def finish(self) -> None:
"""Print the accumulated buffer if no error occurred."""
if self.err == 0:
sys.stdout.write(self.buf)
# ----------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------
def main() -> None:
translator = Translator()
# If no file name is given we read from stdin (named "<stdin>")
if len(sys.argv) == 1:
translator.process_line(sys.stdin.read(), "<stdin>", 1)
else:
for fname in sys.argv[1:]:
path = Path(fname)
try:
with path.open(encoding="utf-8") as f:
for lineno, raw in enumerate(f, start=1):
translator.process_line(raw, str(path), lineno)
except FileNotFoundError:
sys.stderr.write(f"File not found: {fname}\n")
sys.exit(1)
translator.finish()
if __name__ == "__main__":
main()
### How the Python version mirrors the AWK script
# | AWK rule | Python implementation |
# |----------|-----------------------|
# | `BEGIN` block initialise variables | `Translator.__init__` |
# | `@module` line set `module`, clear `aliases` | first `if` in `process_line` |
# | `@alias` line split `name=value` pairs into `aliases` | second `elif` |
# | `@long-alias` / `@long-alias-end` handling | third/fourth `elif` blocks + the `if self.long_alias_name` section |
# | Detection of illegal `PRIVATE__` usage | `elif "PRIVATE__" in orig_line` (the same string that the AWK script would have produced after the `~` replacement) |
# | Validation of `@query|@reachable|@lemma` statements | `elif re.search(r"@(query|reachable|lemma)", …)` |
# | Replacement of `~` with `PRIVATE__<module>__` | `line.replace("~", …)` |
# | Tokenwise alias substitution (the long `for (i=1; …)` loop) | the `while i < len(line): …` loop that restarts from the beginning after each successful replacement |
# | Accumulating the final output in `buf` | `self.buf += line + "\n"` |
# | `END` block print buffer if no error | `Translator.finish()` |
# The script can be saved as `translate_awk_to_py.py`, made executable (`chmod +x translate_awk_to_py.py`) and used exactly like the original AWK program. All error messages, line numbers and exit codes are identical, so any surrounding tooling that expects the AWK behaviour will continue to work.