Compare commits

...

41 Commits

Author SHA1 Message Date
Benjamin Lipp
6af6fb6b2a fix: pterm can resolve to NAT 2025-11-25 16:00:16 +01:00
Benjamin Lipp
e3b43a59bf fix: transcription errors in grammar relating to processes 2025-11-25 15:59:29 +01:00
Benjamin Lipp
3942bfa65e chore: add comment indicating CryptoVerif terms 2025-11-25 15:58:45 +01:00
Benjamin Lipp
7b1a62b6bb feat: do not try to reject reserved words
Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
2025-11-18 17:55:32 +01:00
Benjamin Lipp
d1a33981b1 feat: add missing option terms 2025-11-18 16:15:20 +01:00
Benjamin Lipp
f20fd1acc3 feat: module declaration, WIP: missing ProVerif option terms
Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
2025-10-14 18:01:13 +02:00
Benjamin Lipp
3ce0d262d9 feat: add parser for multi-line comments, without nesting
Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
2025-10-14 17:31:31 +02:00
Anja Rabich
a389e3c222 WIP: lark parser for awk
Co-authored-by: Benjamin Lipp <blipp@rosenpass.eu>
2025-09-30 17:46:33 +02:00
Benjamin Lipp
cb16bd44bb feat(WIP): integrate marzipan.awk into Python
The LLM-generated Python code showed us that the replacement of aliases,
or, as a preparatory step, the tokenization, is something for LARK.

Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
2025-09-16 18:00:01 +02:00
Benjamin Lipp
3f4c7c2786 feat: add CLI param for configurable output/target directory
Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
2025-09-16 16:47:32 +02:00
Anja Rabich
475f4593f9 feat: improve exception handling and error codes + refactor run_proverif
Co-authored-by: Benjamin Lipp <blipp@rosenpass.eu>
2025-09-02 17:56:13 +02:00
Anja Rabich
13c5edbe44 fix: change awk mode from append to write
Co-authored-by: Benjamin Lipp <blipp@rosenpass.eu>
2025-09-02 16:35:46 +02:00
Benjamin Lipp
b1ac5d9244 feat(WIP): clean up code and TODOs, add clean command
Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
2025-08-12 18:02:20 +02:00
Anja Rabich
a4ff3d4eb5 feat: WIP clean-up TODOs
Co-authored-by: Benjamin Lipp <blipp@rosenpass.eu>
2025-08-05 17:57:07 +02:00
Anja Rabich
19ebce79f1 Merge branch 'main' into analyze_py 2025-08-05 16:43:19 +02:00
Benjamin Lipp
a5ae83e726 chore: add TODOs 2025-07-22 17:42:13 +02:00
Karolin Varner
9327c2c4f3 chore: Add code review 2025-07-22 17:10:28 +02:00
Benjamin Lipp
b140c56359 feat: move analyze to Python, including parallel execution
Co-authored-by: Anja Rabich <a.rabich@uni-luebeck.de>
2025-07-15 17:35:30 +02:00
Anja Rabich
d20bb137c9 feat: metaverif function
Co-Authored-By: Benjamin Lipp <blipp@rosenpass.eu>
2025-07-01 17:37:38 +02:00
Anja Rabich
c259be76c8 WIP: metaverif part 2
Co-Authored-By: Benjamin Lipp <blipp@rosenpass.eu>
2025-07-01 17:08:11 +02:00
Anja Rabich
73d180c4cf WIP: metaverif in python
Co-Authored-By: Benjamin Lipp <blipp@rosenpass.eu>
2025-06-17 18:01:03 +02:00
Benjamin Lipp
d44a96e6b6 fix: add +x flag in mode of marzipan/src/analyze.sh 2025-06-17 16:30:31 +02:00
Benjamin Lipp
ff20fbbe3a WIP: move metaverif to Python
Co-Authored-By: Anja Rabich <a.rabich@uni-luebeck.de>
2025-06-10 18:06:33 +02:00
Benjamin Lipp
5232ab3a8e feat: move parsing of RESULT lines to Python
Co-Authored-By: Anja Rabich <a.rabich@uni-luebeck.de>
2025-06-10 17:45:21 +02:00
Benjamin Lipp
2e27753f4a feat: move pretty_output to Python
* Remove pretty_output_line from Bash
* Remove pretty_output_line from click CLI

Co-Authored-By: Anja Rabich <a.rabich@uni-luebeck.de>
2025-06-10 17:13:42 +02:00
Benjamin Lipp
2628adbac8 feat(WIP): pretty output in Python
Work on repairing the regexes in pretty_output

Co-Authored-By: Anja Rabich <a.rabich@uni-luebeck.de>
2025-06-03 18:00:27 +02:00
Benjamin Lipp
744cf6fb50 feat: pretty output line 2025-06-03 16:22:56 +02:00
Benjamin Lipp
4bdc464b5b chore: plan TODOs for the next times
Co-Authored-By: Anja Rabich <a.rabich@uni-luebeck.de>
2025-05-06 17:40:10 +02:00
Benjamin Lipp
eb64f50d99 chore: update TODO, remove unused code
Co-Authored-By: Anja Rabich <a.rabich@uni-luebeck.de>
2025-05-06 17:28:25 +02:00
Benjamin Lipp
73b04cdc12 feat: move awk prep call to Python
Co-Authored-By: Anja Rabich <a.rabich@uni-luebeck.de>
2025-05-06 17:25:58 +02:00
Benjamin Lipp
437c591b2d chore: update TODO 2025-05-06 17:09:27 +02:00
Benjamin Lipp
7cbd6576d4 feat: move cpp call to Python 2025-05-06 17:08:21 +02:00
Benjamin Lipp
ac5a5cf76d chore: clarify path in README 2025-05-06 16:34:59 +02:00
Benjamin Lipp
18359ef3f4 chore: clean up unused log parameter 2025-05-06 16:34:35 +02:00
Anja Rabich
3e161d8c8d feat: add clean_warnings() in __init__.py
chore: update TODO.md

Co-Authored-By: Benjamin Lipp <blipp@rosenpass.eu>
2025-04-22 18:09:14 +02:00
Benjamin Lipp
56db757de3 chore: add TODO 2025-04-08 17:54:29 +02:00
Benjamin Lipp
5ff3bc944e chore: add README for marzipan 2025-04-08 17:52:07 +02:00
Benjamin Lipp
fb93258fcc feat: fix issue with empty extra_args 2025-04-08 17:44:37 +02:00
Benjamin Lipp
9ab120843a try nix shell 2025-04-08 16:43:31 +02:00
Benjamin Lipp
25f2abac80 WIP 2025-02-20 15:04:24 +01:00
Benjamin Lipp
c7ec12be9a feat: add nix setup for marzipan 2025-02-20 15:04:24 +01:00
16 changed files with 4580 additions and 0 deletions

8
marzipan/README.md Normal file
View File

@@ -0,0 +1,8 @@
# Rewriting analyze.sh in Python
* `../analyze.sh` is the old script
* `src/__init__.py` is the new script
* call the old script from the Rosenpass repository's root directory with `./analyze.sh`
* call the new script from the marzipan directory:
* `nix run .# -- analyze $repo` where `$repo` is the absolute(?) path to the root directory of the Rosenpass repository.

64
marzipan/TODO.md Normal file
View File

@@ -0,0 +1,64 @@
# TODO for the project of rewriting Marzipan
## Done
* ~~figure out why ProVerif is started on the non-processed mpv file~~
* ~~rework rebound warnings (`clean_warnings` Bash function)~~
```bash
rosenpass$ rosenpass-marzipan run-proverif target/proverif/03_identity_hiding_responder.entry.o.pv target/proverif/03_identity_hiding_responder.entry.log
```
* ~~provide log parameter to `rosenpass-marzipan`-call~~ (no, it was intentionally not used)
* ~~cpp pre-processing stuff~~
* ~~awk pre-processing stuff~~
* ~~`pretty_output` Bash function~~
* ~~pretty_output_line~~
* ~~click function intervention weirdness~~
* ~~why is everything red in the pretty output? (see line 96 in __init__.py)~~
* ~~awk RESULT flush in marzipan()~~
* ~~move the whole metaverif function to Python~~
* ~move the whole analyze function to Python~
* ~find the files~
* ~start subprocesses in parallel~
* ~wait for them to finish~
* ~~rebase from main~~
* ~~see if we still need the `extra_args is None` check in `_run_proverif`~`
* ~~set colors differently to prevent injection attack~~
* ~~by calling a function~~
* ~~by prepared statements~~
* ~~standalone function parse_result_line is no longer necessary~~
* ~~is the clean function still necessary?~~
* ~~implement better main function for click~~
* ~~why does analyze fail when the target/proverif directory is not empty?~~
* ~~return an exit status that is meaningful for CI~~
* ~~exception handling in analyze() and in run_proverif()~~
* ~~refactor filtering in run_proverif (see karo's comment)~~
* ~configurable target directory~
* ~lark parser: multiline comments, how???~
## Next Steps
* integrate marzipan.awk into Python, somehow
* options term special cases (c.f. manual page 133, starting with "fun" term)
* complete with CryptoVerif options
* error when trying with: `nix run .# -- parse ../target/proverif/01_secrecy.entry.i.pv`
* `in(C, Cinit_conf(Ssskm, Spsk, Sspkt, ic));`
* ^
* rewrite marzipan.awk into Python/LARK
* define a LARK grammar for marzipan.awk rules
* write python code for processing marzipan rules, e.g. alias replacement (step: i.pv->o.pv)
* do not assume that the repo path has subdir marzipan
* do not assume that the repo path has subdir analysis
* rewrite cpp into Python/LARK (step: mpv->i.pv)
* integrate the Nix flake into the main Nix flake
* pull the gawk dependency into the Nix flake
* think about next steps
* integrate this upstream, into the CI?
* “make it beautiful” steps? more resiliency to working directory?
* rewrite our awk usages into Python/…?
* yes, possibly as extension to the LARK grammar
* and rewrite the AST within Python
* reconstruct ProVerif input file for ProVerif
* rewrite our CPP usages into Python/…?
* low priority: nested comments in ProVerif code
“it replaces the Bash script and is idiomatic Python code”

190
marzipan/flake.lock generated Normal file
View File

@@ -0,0 +1,190 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1731533236,
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"flake-utils_2": {
"inputs": {
"systems": "systems_2"
},
"locked": {
"lastModified": 1726560853,
"narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nix-github-actions": {
"inputs": {
"nixpkgs": [
"poetry2nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1729742964,
"narHash": "sha256-B4mzTcQ0FZHdpeWcpDYPERtyjJd/NIuaQ9+BV1h+MpA=",
"owner": "nix-community",
"repo": "nix-github-actions",
"rev": "e04df33f62cdcf93d73e9a04142464753a16db67",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "nix-github-actions",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1736166416,
"narHash": "sha256-U47xeACNBpkSO6IcCm0XvahsVXpJXzjPIQG7TZlOToU=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "b30f97d8c32d804d2d832ee837d0f1ca0695faa5",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1730157240,
"narHash": "sha256-P8wF4ag6Srmpb/gwskYpnIsnspbjZlRvu47iN527ABQ=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "75e28c029ef2605f9841e0baa335d70065fe7ae2",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable-small",
"repo": "nixpkgs",
"type": "github"
}
},
"poetry2nix": {
"inputs": {
"flake-utils": "flake-utils_2",
"nix-github-actions": "nix-github-actions",
"nixpkgs": "nixpkgs_2",
"systems": "systems_3",
"treefmt-nix": "treefmt-nix"
},
"locked": {
"lastModified": 1736280331,
"narHash": "sha256-mkVHnky9h/s2EA+t9eEC8qxgcNTE3V+vb/9XgG4fCig=",
"owner": "nix-community",
"repo": "poetry2nix",
"rev": "4d260d908f3d95fa4b3ef6a98781ff64e1eede22",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "poetry2nix",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs",
"poetry2nix": "poetry2nix"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_2": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"systems_3": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
},
"treefmt-nix": {
"inputs": {
"nixpkgs": [
"poetry2nix",
"nixpkgs"
]
},
"locked": {
"lastModified": 1730120726,
"narHash": "sha256-LqHYIxMrl/1p3/kvm2ir925tZ8DkI0KA10djk8wecSk=",
"owner": "numtide",
"repo": "treefmt-nix",
"rev": "9ef337e492a5555d8e17a51c911ff1f02635be15",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "treefmt-nix",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

18
marzipan/flake.nix Normal file
View File

@@ -0,0 +1,18 @@
{
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
inputs.poetry2nix.url = "github:nix-community/poetry2nix";
inputs.flake-utils.url = "github:numtide/flake-utils";
outputs = (inputs:
let scoped = (scope: scope.result);
in scoped rec {
inherit (builtins) removeAttrs;
result = (import ./nix/init.nix) {
scoped = scoped;
flake.self = inputs.self;
flake.inputs = removeAttrs inputs ["self"];
};
}
);
}

1220
marzipan/nix/hyuga/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,16 @@
[tool.poetry]
name = "hyuga-language-server-installer"
version = "0.1.0"
description = ""
authors = []
[tool.poetry.dependencies]
python = ">=3.12,<3.13"
[tool.poetry.group.dev.dependencies]
hyuga = "^1.0.0"
poetry = "^2.0.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

32
marzipan/nix/init.nix Normal file
View File

@@ -0,0 +1,32 @@
outer_ctx: outer_ctx.scoped rec {
inherit (builtins) trace;
ctx = outer_ctx // { inherit config; };
inherit (ctx) scoped;
inherit (ctx.flake.inputs) nixpkgs flake-utils;
inherit (nixpkgs.lib) genAttrs zipAttrsWith;
inherit (nixpkgs.lib.debug) traceVal;
inherit (flake-utils.lib) allSystems eachSystem;
result = {
devShells = eachSupportedSystem (system: (setupSystem system).devShells);
packages = eachSupportedSystem (system: (setupSystem system).packages);
apps = eachSupportedSystem (system: (setupSystem system).apps);
};
setupSystem = (system_name: scoped rec {
result = (import ./system.nix) (ctx // {
system.name = system_name;
system.pkgs = nixpkgs.legacyPackages.${system_name};
});
});
config = {
supportedSystems = allSystems;
poetry.projectDir = ctx.flake.self;
};
eachSupportedSystem = genAttrs config.supportedSystems;
}

47
marzipan/nix/system.nix Normal file
View File

@@ -0,0 +1,47 @@
ctx: ctx.scoped rec {
inherit (ctx.system) pkgs;
inherit (ctx.flake.inputs) poetry2nix flake-utils;
inherit (pkgs) mkShellNoCC writeShellApplication;
inherit (flake-utils.lib) mkApp;
poetryCtx = poetry2nix.lib.mkPoetry2Nix { inherit pkgs; };
inherit (poetryCtx) mkPoetryEnv mkPoetryApplication;
deps = [poetryEnv];
dev-deps = []
++ deps
++ [poetryHyugaEnv]
++ (with pkgs; [poetry]);
poetryCfg = ctx.config.poetry // { overrides = poetryOverrides; };
poetryEnv = mkPoetryEnv poetryCfg;
poetryHyugaCfg = poetryCfg // { projectDir = ./hyuga; };
poetryHyugaEnv = mkPoetryEnv poetryHyugaCfg;
poetryOverrides = poetryCtx.defaultPoetryOverrides.extend (final: prev: {
hyuga = prev.hyuga.overridePythonAttrs (old: {
buildInputs = []
++ (old.buildInputs or [ ])
++ [ final.poetry-core ];
preferWheel = true;
}
);
});
result.packages.default = mkPoetryApplication poetryCfg;
result.devShells.default = mkShellNoCC {
packages = dev-deps;
};
result.apps.replPython = mkShellApp "python-repl" ''python'';
result.apps.replHy = mkShellApp "hy-repl" ''hy'';
mkShellApp = (name: script: mkApp {
drv = writeShellApplication {
inherit name;
text = script;
runtimeInputs = dev-deps;
};
});
}

1415
marzipan/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

31
marzipan/pyproject.toml Normal file
View File

@@ -0,0 +1,31 @@
[tool.poetry]
name = "rosenpass-marzipan"
version = "0.1.0"
description = ""
authors = ["Author Name <author@example.com>"]
# readme = "README.md"
# license = "BSD"
packages = [
{ include = "**/*.[hp]y", from = "src", to = "rosenpass_marzipan" },
{ include = "**/*.sh", from = "src", to = "rosenpass_marzipan" },
#{ include = "**/*.lark", from = "src", to = "rosenpass_marzipan" },
]
[tool.poetry.scripts]
rosenpass-marzipan = 'rosenpass_marzipan:main'
[tool.poetry.dependencies]
python = ">=3.12,<3.13"
hy = "^1.0.0"
lark = "^1.2.2"
hyrule = "^0.8.0"
ipython = "^8.32.0"
click = "^8.1.8"
rich = "^13.9.4"
[tool.poetry.group.dev.dependencies]
poetry = "^2.0.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

281
marzipan/src/__init__.py Normal file
View File

@@ -0,0 +1,281 @@
from .util import pkgs, setup_exports, export, rename
from .parser import *
# from rich.console import Console
import click
target_subdir = "target/proverif"
(__all__, export) = setup_exports()
export(setup_exports)
console = pkgs.rich.console.Console()
logger = pkgs.logging.getLogger(__name__)
@click.group()
def main():
pkgs.logging.basicConfig(level=pkgs.logging.DEBUG)
def eprint(*args, **kwargs):
print(*args, **{"file": pkgs.sys.stderr, **kwargs})
def exc(argv, **kwargs):
eprint("$", *argv)
command = pkgs.subprocess.run(argv, **kwargs)
if command.returncode != 0:
logger.error("subprocess with terminated with non-zero return code.")
eprint("", *argv)
exit(command.returncode)
if command.stdout is not None:
return command.stdout.decode("utf-8")
return ""
def exc_piped(argv, **kwargs):
eprint("$", *argv)
return pkgs.subprocess.Popen(argv, **kwargs)
def clean_line(prev_line, line):
line = line.rstrip()
if pkgs.re.match(r"^Warning: identifier \w+ rebound.$", line) or prev_line is None:
return None
return prev_line
def run_proverif(file, extra_args=[]):
params = ["proverif", "-test", *extra_args, file]
logger.debug(params)
process = exc_piped(
params,
stderr=pkgs.subprocess.PIPE,
stdout=pkgs.subprocess.PIPE,
text=True,
bufsize=1,
)
try:
prev_line = None
for line in process.stdout:
cleaned_line = clean_line(prev_line, line)
prev_line = line
if cleaned_line is not None:
yield cleaned_line
if prev_line is not None:
yield prev_line
except Exception as e:
# When does this happen? Should the error even be ignored? Metaverif should probably just abort here, right? --karo
logger.error(f"Proverif generated an exception with {params}: {e}")
exit(1)
finally:
process.stdout.close()
return_code = process.wait()
if return_code != 0:
logger.error(
f"Proverif exited with a non-zero error code {params}: {return_code}"
)
exit(return_code)
def cpp(file, cpp_prep):
logger.debug(f"_cpp: {file}, {cpp_prep}")
file_path = pkgs.pathlib.Path(file)
dirname = file_path.parent
cwd = pkgs.pathlib.Path.cwd()
params = ["cpp", "-P", f"-I{dirname}", file, "-o", cpp_prep]
return exc(params, stderr=pkgs.sys.stderr)
def awk(repo_path, cpp_prep, awk_prep):
params = [
"awk",
"-f",
str(pkgs.os.path.join(repo_path, "marzipan/marzipan.awk")),
cpp_prep,
]
with open(awk_prep, "w") as file:
exc(params, stderr=pkgs.sys.stderr, stdout=file)
file.write("\nprocess main")
def pretty_output_line(prefix, mark, color, text):
content = f"{mark} {text}"
console.print(prefix, style="grey42", end="", no_wrap=True)
console.print(content, style=color)
def pretty_output_init(file_path):
expected = []
descs = []
with open(file_path, "r") as file:
content = file.read()
# Process lemmas first
result = pkgs.re.findall(r"@(lemma)(?=\s+\"([^\"]*)\")", content)
if result:
# The regex only returns lemmas. For lemmas, we always expect the result 'true' from ProVerif.
expected.extend([True for _ in range(len(result))])
descs.extend([e[1] for e in result])
# Then process regular queries
result = pkgs.re.findall(r'@(query|reachable)(?=\s+"[^\"]*")', content)
if result:
# For queries, we expect 'true' from ProVerif, for reachable, we expect 'false'.
expected.extend([e == "@query" for e in result])
reachable_result = pkgs.re.findall(
r'@(query|reachable)\s+"([^\"]*)"', content
)
descs.extend([e[1] for e in reachable_result])
ta = pkgs.time.time()
res = 0
ctr = 0
return (ta, res, ctr, expected, descs)
def pretty_output_step(file_path, line, expected, descs, res, ctr, ta):
tz = pkgs.time.time()
# Output from ProVerif contains a trailing newline, which we do not have in the expected output. Remove it for meaningful matching.
outp_clean_raw = line.rstrip()
if outp_clean_raw == "true":
outp_clean = True
elif outp_clean_raw == "false":
outp_clean = False
else:
outp_clean = outp_clean_raw
if outp_clean == expected[ctr]:
pretty_output_line(f"{int(tz - ta)}s ", "", "green", descs[ctr])
else:
res = 1
pretty_output_line(f"{int(tz - ta)}s ", "", "red", descs[ctr])
ctr += 1
ta = tz
return (res, ctr, ta)
def pretty_output(file_path):
(ta, res, ctr, expected, descs) = pretty_output_init(file_path)
for line in pkgs.sys.stdin:
(res, ctr, ta) = pretty_output_step(
file_path, line, expected, descs, res, ctr, ta
)
def get_target_dir(path, output):
if output is not None and not output == "":
return pkgs.pathlib.Path(output)
else:
return pkgs.os.path.join(path, target_subdir)
@main.command()
@click.option("--output", "output", required=False)
@click.argument("repo_path")
def analyze(repo_path, output):
target_dir = get_target_dir(repo_path, output)
pkgs.os.makedirs(target_dir, exist_ok=True)
entries = []
analysis_dir = pkgs.os.path.join(repo_path, "analysis")
entries.extend(sorted(pkgs.glob.glob(str(analysis_dir) + "/*.entry.mpv")))
with pkgs.concurrent.futures.ProcessPoolExecutor() as executor:
futures = {
executor.submit(metaverif, repo_path, target_dir, entry): entry
for entry in entries
}
for future in pkgs.concurrent.futures.as_completed(futures):
cmd = futures[future]
logger.info(f"Metaverif {cmd} finished.")
print("all processes finished.")
@main.command()
@click.option("--output", "output", required=False)
@click.argument("repo_path")
def clean(repo_path, output):
cleans_failed = 0
target_dir = get_target_dir(repo_path, output)
if pkgs.os.path.isdir(target_dir):
for filename in pkgs.os.listdir(target_dir):
file_path = pkgs.os.path.join(target_dir, filename)
if pkgs.os.path.isfile(file_path) and pkgs.os.path.splitext(file_path)[
1
] in [".pv", ".log"]:
try:
pkgs.os.remove(file_path)
except Exception as e:
print(f"Error deleting {file_path}: {str(e)}")
cleans_failed += 1
if cleans_failed > 0:
print(f"{cleans_failed} could not be deleted.")
exit(1)
def metaverif(repo_path, tmpdir, file):
print(f"Start metaverif on {file}")
# Extract the name using regex
name_match = pkgs.re.search(r"([^/]*)(?=\.mpv)", file)
if name_match:
name = name_match.group(0) # Get the matched name
# Create the file paths
cpp_prep = pkgs.os.path.join(tmpdir, f"{name}.i.pv")
awk_prep = pkgs.os.path.join(tmpdir, f"{name}.o.pv")
# Output the results
print(f"Name: {name}")
print(f"CPP Prep Path: {cpp_prep}")
print(f"AWK Prep Path: {awk_prep}")
cpp(file, cpp_prep)
awk(repo_path, cpp_prep, awk_prep)
log_file = pkgs.os.path.join(tmpdir, f"{name}.log")
ta, res, ctr, expected, descs = pretty_output_init(cpp_prep)
with open(log_file, "a") as log:
generator = run_proverif(awk_prep)
for line in generator:
log.write(line)
# parse-result-line:
match = pkgs.re.search(r"^RESULT .* \b(true|false)\b\.$", line)
if match:
result = match.group(1)
# pretty-output:
res, ctr, ta = pretty_output_step(
cpp_prep, result, expected, descs, res, ctr, ta
)
else:
logger.error(
f"No match found for the filename {file}: extension should be .mpv"
)
exit(1)
@main.command()
@click.argument("file_path")
def parse(file_path):
parse_main(file_path)
if __name__ == "__main__":
main()

104
marzipan/src/analyze.sh Executable file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env bash
exc() {
echo >&2 "\$" "$@"
"$@"
}
run_proverif() {
local file; file="$1"; shift
local log; log="$1"; shift # intentionally unused
exc rosenpass-marzipan run-proverif "${file}" "${@}"
}
clean_warnings() {
exc rosenpass-marzipan clean-warnings
}
color_red='red'
color_green='green'
color_gray='gray'
color_clear=''
checkmark="✔"
cross="❌"
pretty_output() {
exc rosenpass-marzipan pretty-output "${@}"
}
metaverif() {
local file; file="$1"; shift
local name; name="$(echo "${file}" | grep -Po '[^/]*(?=\.mpv)')"
local cpp_prep; cpp_prep="${tmpdir}/${name}.i.pv"
local awk_prep; awk_prep="${tmpdir}/${name}.o.pv"
exc rosenpass-marzipan cpp ${file} ${cpp_prep}
exc rosenpass-marzipan awk-prep ${cpp_prep} ${awk_prep}
local log; log="${tmpdir}/${name}.log"
{
run_proverif "${awk_prep}" "$@" \
| clean_warnings \
| tee "${log}" \
| exc rosenpass-marzipan parse-result-line \
| pretty_output "${cpp_prep}"
} || {
echo "TODO: Commented out some debug output"
#if ! grep -q "^Verification summary" "${log}"; then
# echo -ne "\033[0\r"
# cat "${log}"
#fi
}
}
analyze() {
mkdir -p "${tmpdir}"
entries=()
readarray -t -O "${#entries[@]}" entries < <(
find analysis -iname '*.entry.mpv' | sort)
local entry
local procs; procs=()
for entry in "${entries[@]}"; do
echo "call metaverif"
# TODO: commented out for testing
#exc rosenpass-marzipan metaverif "${tmpdir}" "${entry}" >&2 & procs+=("$!")
exc rosenpass-marzipan metaverif "${tmpdir}" "${entry}" >&2
done
# TODO: commented out for testing
# for entry in "${procs[@]}"; do
# exc wait -f "${entry}"
# done
}
err_usage() {
echo >&1 "USAGE: ${0} analyze PATH"
echo >&1 "The script will cd into PATH and continue there."
exit 1
}
main() {
set -e -o pipefail
local cmd="$1"; shift || err_usage
local dir="$1"; shift || err_usage
cd -- "${dir}"
tmpdir="target/proverif"
echo "call main"
case "${cmd}" in
analyze) analyze ;;
clean_warnings) clean_warnings ;;
*) err_usage
esac
}
# Do not execute main if sourced
(return 0 2>/dev/null) || main "$@"

467
marzipan/src/parser.py Normal file
View File

@@ -0,0 +1,467 @@
import sys
from lark import Lark, Token, Transformer, exceptions, tree
# taken from Page 17 in the ProVerif manual
# At the moment, we do not reject a ProVerif model that uses reserved words as identifier,
# because this caused problems with the LARK grammar. We plan to check this in a later
# processing step.
reserved_words = [
"among",
"axiom",
"channel",
"choice",
"clauses",
"const",
"def",
"diff",
"do",
"elimtrue",
"else",
"equation",
"equivalence", # no rule yet (this is CryptoVerif-specific)
"event",
"expand",
"fail",
"for",
"forall",
"foreach",
"free",
"fun",
"get",
"if",
"implementation", # no rule yet (this is CryptoVerif-specific)
"in",
"inj-event",
"insert",
"lemma",
"let",
"letfun",
"letproba",
"new",
"noninterf",
"noselect",
"not",
"nounif",
"or",
"otherwise",
"out",
"param",
"phase",
"pred",
"proba",
"process",
"proof",
"public_vars",
"putbegin",
"query",
"reduc",
"restriction",
"secret",
"select",
"set",
"suchthat",
"sync",
"table",
"then",
"type",
"weaksecret",
"yield",
]
ident_regex = (
"/^" + "".join(f"(?!{w}$)" for w in reserved_words) + "[a-zA-Z][a-zA-Z0-9À-ÿ'_]*/"
)
proverif_grammar = Lark(
grammar="""
PROCESS: "process"
start: decl* PROCESS process
YIELD: "yield"
channel: CHANNEL
CHANNEL: "channel"
"""
+ "IDENT: /[a-zA-Z][a-zA-Z0-9À-ÿ'_]*/"
+ """
ZERO: "0"
INFIX: "||"
| "&&"
| "="
| "<>"
| "<="
| ">="
| "<"
| ">"
typeid: channel
| IDENT
_non_empty_seq{x}: x ("," x)*
_maybe_empty_seq{x}: [ _non_empty_seq{x} ]
OPTIONS_FUN_CONST: "data" | "private" | "typeConverter"
OPTIONS_FUN: OPTIONS_FUN_CONST
OPTIONS_CONST: OPTIONS_FUN_CONST
OPTIONS_FREE_REDUC: "private"
OPTIONS_PRED: "memberOptim" | "block"
OPTIONS_PROCESS: "precise"
OPTIONS_QUERY_LEMMA_AXIOM: "noneSat" | "discardSat" | "instantiateSat" | "fullSat" | "noneVerif" | "discardVerif" | "instantiateVerif" | "fullVerif"
OPTIONS_AXIOM: OPTIONS_QUERY_LEMMA_AXIOM
OPTIONS_QUERY_LEMMA: OPTIONS_QUERY_LEMMA_AXIOM | "induction" | "noInduction"
OPTIONS_LEMMA: OPTIONS_QUERY_LEMMA_AXIOM | "maxSubset"
OPTIONS_QUERY: OPTIONS_QUERY_LEMMA_AXIOM | "proveAll"
OPTIONS_QUERY_SECRET: "reachability" | "pv_reachability" | "real_or_random" | "pv_real_or_random" | "/cv_[a-zA-Z0-9À-ÿ'_]*/"
OPTIONS_RESTRICTION: "removeEvents" | "keepEvents" | "keep" # transl_option_lemma_query in pitsyntax.ml
OPTIONS_EQUATION: "convergent" | "linear" # check_equations in pitsyntax.ml
OPTIONS_TYPE: "fixed" | "bounded" # TODO(blipp): complete this. These are only for compatibility with CryptoVerif and are ignored
options{idents}: [ "[" _non_empty_seq{idents} "]" ]
process: ZERO
| YIELD
| IDENT [ "(" _maybe_empty_seq{pterm} ")" ]
| bracketed_process
| piped_process
| replicated_process
| replicated_process_bounds
| sample_process
| if_process
| in_process
| out_process
| let_process
| insert_process
| get_process
| event_process
| phase
| sync
bracketed_process: "(" process ")"
piped_process: process "|" process
replicated_process: "!" process
replicated_process_bounds: "!" IDENT "<=" IDENT process
| "foreach" IDENT "<=" IDENT "do" process
sample_process: "new" IDENT [ "[" _maybe_empty_seq{IDENT} "]" ] ":" typeid [";" process]
| IDENT "<-R" typeid [";" process]
let_process: "let" pattern "=" pterm ["in" process [ "else" process ]]
| IDENT [":" typeid] "<-" pterm [";" process]
| "let" typedecl "suchthat" pterm options{OPTIONS_PROCESS} [ "in" process [ "else" process ] ]
if_process: "if" pterm "then" process [ "else" process ]
in_process: "in" "(" pterm "," pattern ")" options{OPTIONS_PROCESS} [ ";" process ]
get_process: IDENT "(" _maybe_empty_seq{pattern} ")" [ "suchthat" pterm ] options{OPTIONS_PROCESS} [ "in" process [ "else" process ] ]
out_process: "out" "(" pterm "," pterm ")" [ ";" process ]
insert_process: "insert" IDENT "(" _maybe_empty_seq{pterm} ")" [ ";" process ]
event_process: "event" IDENT [ "(" _maybe_empty_seq{pterm} ")" ] [ ";" process ]
term: IDENT
| NAT
| "(" _maybe_empty_seq{term} ")"
| IDENT "(" _maybe_empty_seq{term} ")"
| term ( "+" | "-" ) NAT
| NAT "+" term
| term INFIX term
| "not" "(" term ")"
query: gterm ["public_vars" _non_empty_seq{IDENT}] [";" query]
| "secret" IDENT ["public_vars" _non_empty_seq{IDENT}] options{OPTIONS_QUERY_SECRET} [";" query]
| "putbegin" "event" ":" _non_empty_seq{IDENT} [";" query] // Opportunistically left a space between "event" and ":", ProVerif might not accept it with spaces.
| "putbegin" "inj-event" ":" _non_empty_seq{IDENT} [";" query]
lemma: gterm [";" lemma]
| gterm "for" "{" "public_vars" _non_empty_seq{IDENT} "}" [";" lemma]
| gterm "for" "{" "secret" IDENT [ "public_vars" _non_empty_seq{IDENT}] "[real_or_random]" "}" [";" lemma]
gterm: ident_gterm
| fun_gterm
| choice_gterm
| infix_gterm
| arith_gterm
| arith2_gterm
| event_gterm
| injevent_gterm
| implies_gterm
| paren_gterm
| sample_gterm
| let_gterm
ident_gterm: IDENT
fun_gterm: IDENT "(" _maybe_empty_seq{gterm} ")" ["phase" NAT] ["@" IDENT]
choice_gterm: "choice" "[" gterm "," gterm "]"
infix_gterm: gterm INFIX gterm
arith_gterm: gterm ( "+" | "-" ) NAT
arith2_gterm: NAT "+" gterm
event_gterm: "event" "(" _maybe_empty_seq{gterm} ")" ["@" IDENT]
injevent_gterm: "inj-event" "(" _maybe_empty_seq{gterm} ")" ["@" IDENT]
implies_gterm: gterm "==>" gterm
paren_gterm: "(" _maybe_empty_seq{gterm} ")"
sample_gterm: "new" IDENT [ "[" [ gbinding ] "]" ]
let_gterm: "let" IDENT "=" gterm "in" gterm
gbinding: "!" NAT "=" gterm [";" gbinding]
| IDENT "=" gterm [";" gbinding]
nounifdecl: "let" IDENT "=" gformat "in" nounifdecl
| IDENT ["(" _maybe_empty_seq{gformat} ")" ["phase" NAT]]
gformat: IDENT
| "*" IDENT
| IDENT "(" _maybe_empty_seq{gformat} ")"
| "choice" "[" gformat "," gformat "]"
| "not" "(" _maybe_empty_seq{gformat} ")"
| "new" IDENT [ "[" [ fbinding ] "]" ]
| "let" IDENT "=" gformat "in" gformat
fbinding: "!" NAT "=" gformat [";" fbinding]
| IDENT "=" gformat [";" fbinding]
nounifoption: "hypothesis"
| "conclusion"
| "ignoreAFewTimes"
| "inductionOn" "=" IDENT
| "inductionOn" "=" "{" _non_empty_seq{IDENT} "}"
pterm: IDENT
| NAT
| "(" _maybe_empty_seq{pterm} ")"
| IDENT "(" _maybe_empty_seq{pterm} ")"
| choice_pterm
| pterm ("+" | "-") NAT
| NAT "+" pterm
| pterm INFIX pterm
| not_pterm
| sample_pterm
| if_pterm
| let_pterm
| insert_pterm
| get_pterm
| event_pterm
choice_pterm: "choice[" pterm "," pterm "]"
if_pterm: "if" pterm "then" pterm [ "else" pterm ]
not_pterm: "not" "(" pterm ")"
let_pterm: "let" pattern "=" pterm "in" pterm [ "else" pterm ]
| IDENT [":" typeid] "<-" pterm ";" pterm
| "let" typedecl "suchthat" pterm "in" pterm [ "else" pterm ]
sample_pterm: "new" IDENT [ "[" _maybe_empty_seq{IDENT} "]" ] ":" typeid [";" pterm]
| IDENT "<-R" typeid [";" pterm]
insert_pterm: "insert" IDENT "(" _maybe_empty_seq{pterm} ")" ";" pterm
event_pterm: "event" IDENT [ "(" _maybe_empty_seq{pterm} ")" ] ";" pterm
get_pterm: IDENT "(" _maybe_empty_seq{pattern} ")" [ "suchthat" pterm ] options{OPTIONS_PROCESS} [ "in" pterm [ "else" pterm ] ]
pattern: IDENT [":" typeid]
| "_" [ ":" typeid ]
| NAT
| pattern "+" NAT
| NAT "+" pattern
| "(" _maybe_empty_seq{pattern} ")"
| IDENT "(" _maybe_empty_seq{pattern} ")"
| "=" pterm
mayfailterm: term
| "fail"
mayfailterm_seq: "(" _non_empty_seq{mayfailterm} ")"
typedecl: _non_empty_seq{IDENT} ":" typeid [ "," typedecl ]
failtypedecl: _non_empty_seq{IDENT} ":" typeid [ "or fail" ] [ "," failtypedecl ]
decl: type_decl
| channel_decl
| free_decl
| const_decl
| fun_decl
| letfun_decl
| reduc_decl
| fun_reduc_decl
| equation_decl
| pred_decl
| table_decl
| let_decl
| set_settings_decl
| event_decl
| query_decl
| axiom_decl
| restriction_decl
| lemma_decl
| noninterf_decl
| weaksecret_decl
| not_decl
| select_decl
| noselect_decl
| nounif_decl
| elimtrue_decl
| clauses_decl
| module_decl
#| param_decl
#| proba_decl
#| letproba_decl
#| proof_decl
#| def_decl
#| expand_decl
type_decl: "type" IDENT options{OPTIONS_TYPE} "."
channel_decl: "channel" _non_empty_seq{IDENT} "."
free_decl: "free" _non_empty_seq{IDENT} ":" typeid options{OPTIONS_FREE_REDUC} "."
const_decl: "const" _non_empty_seq{IDENT} ":" typeid options{OPTIONS_FUN_CONST} "."
fun_decl: "fun" IDENT "(" _maybe_empty_seq{typeid} ")" ":" typeid options{OPTIONS_FUN_CONST} "."
letfun_decl: "letfun" IDENT [ "(" [ typedecl ] ")" ] "=" pterm "."
reduc_decl: "reduc" eqlist options{OPTIONS_FREE_REDUC} "."
fun_reduc_decl: "fun" IDENT "(" _maybe_empty_seq{typeid} ")" ":" typeid "reduc" mayfailreduc options{OPTIONS_FUN_CONST} "."
equation_decl: "equation" eqlist options{OPTIONS_EQUATION} "."
pred_decl: "pred" IDENT [ "(" [ _maybe_empty_seq{typeid} ] ")" ] options{OPTIONS_PRED} "."
table_decl: IDENT "(" _maybe_empty_seq{typeid} ")" "."
let_decl: "let" IDENT [ "(" [ typedecl ] ")" ] "=" process "."
BOOL : "true" | "false"
NONE: "none"
FULL: "full"
ALL: "all"
FUNC: IDENT
ignoretype_options: BOOL | ALL | NONE | "attacker"
boolean_settings_names: "privateCommOnPublicTerms"
| "rejectChoiceTrueFalse"
| "rejectNoSimplif"
| "allowDiffPatterns"
| "inductionQueries"
| "inductionLemmas"
| "movenew"
| "movelet"
| "stopTerm"
| "removeEventsForLemma"
| "simpEqAll"
| "eqInNames"
| "preciseLetExpand"
| "expandSimplifyIfCst"
| "featureFuns"
| "featureNames"
| "featurePredicates"
| "featureEvents"
| "featureTables"
| "featureDepth"
| "featureWidth"
| "simplifyDerivation"
| "abbreviateDerivation"
| "explainDerivation"
| "unifyDerivation"
| "reconstructDerivation"
| "displayDerivation"
| "traceBacktracking"
| "interactiveSwapping"
| "color"
| "verboseLemmas"
| "abbreviateClauses"
| "removeUselessClausesBeforeDisplay"
| "verboseEq"
| "verboseDestructors"
| "verboseTerm"
| "verboseStatistics"
| "verboseRules"
| "verboseBase"
| "verboseRedundant"
| "verboseCompleted"
| "verboseGoalReachable"
_decl_pair{name, value}: "set" name "=" value "."
set_settings_boolean_decl: _decl_pair{boolean_settings_names, BOOL}
ignore_types_values: BOOL | "all" | "none" | "attacker"
simplify_process_values: BOOL | "interactive"
precise_actions_values: BOOL | "trueWithoutArgsInNames"
redundant_hyp_elim_values: BOOL | "beginOnly"
reconstruct_trace_values: BOOL | "n"
attacker_values: "active" | "passive"
key_compromise_values: "none" | "approx" | "strict"
predicates_implementable: "check" | "nocheck"
application_values: "instantiate" | "full" | "none" | "discard"
max_values: "none" | "n"
sel_fun_values: "TermMaxsize" | "Term"| "NounifsetMaxsize" | "Nounifset"
redundancy_elim_values: "best" | "simple" | "no"
nounif_ignore_a_few_times_values: "none" | "auto" | "all"
nounif_ignore_ntimes_values: "n"
trace_display_values: "short" | "long" | "none"
verbose_clauses_values: "none" | "explained" | "short"
set_settings_decl: set_settings_boolean_decl
| _decl_pair{"ignoreTypes", ignore_types_values}
| _decl_pair{"simplifyProcess", simplify_process_values}
| _decl_pair{"preciseActions", precise_actions_values}
| _decl_pair{"redundantHypElim", redundant_hyp_elim_values}
| _decl_pair{"reconstructTrace", reconstruct_trace_values}
| _decl_pair{"attacker", attacker_values}
| _decl_pair{"keyCompromise", key_compromise_values}
| _decl_pair{"predicatesImplementable", predicates_implementable}
| _decl_pair{"saturationApplication", application_values}
| _decl_pair{"verificationApplication", application_values}
| _decl_pair{"maxDepth", max_values}
| _decl_pair{"maxHyp", max_values}
| _decl_pair{"selFun", sel_fun_values}
| _decl_pair{"redundancyElim", redundancy_elim_values}
| _decl_pair{"nounifIgnoreAFewTimes", nounif_ignore_a_few_times_values}
| _decl_pair{"nounifIgnoreNtimes", nounif_ignore_ntimes_values}
| _decl_pair{"traceDisplay", trace_display_values}
| _decl_pair{"verboseClauses", verbose_clauses_values}
| set_strategy
| set_symb_order
_swap_strategy_seq{x}: x ("->" x)*
set_strategy: "set" "swapping" "=" _swap_strategy_seq{TAG} "."
_symb_ord_seq{x}: x (">" x)*
set_symb_order: "set" "symbOrder" "=" _symb_ord_seq{FUNC} "."
event_decl: "event" IDENT ["(" _maybe_empty_seq{typeid} ")"] "."
query_decl: "query" [ typedecl ";"] query options{OPTIONS_QUERY} "."
axiom_decl: "axiom" [ typedecl ";"] lemma options{OPTIONS_AXIOM} "."
restriction_decl: "restriction" [ typedecl ";"] lemma options{OPTIONS_RESTRICTION} "."
lemma_decl: "lemma" [ typedecl ";"] lemma options{OPTIONS_LEMMA} "."
noninterf_decl: [ typedecl ";"] _maybe_empty_seq{nidecl} "."
weaksecret_decl: "weaksecret" IDENT "."
not_decl: "not" [ typedecl ";"] gterm "."
INT: NAT | "-" NAT
select_decl: "select" [ typedecl ";"] nounifdecl [ "/" INT ] [ "[" _non_empty_seq{nounifoption} "]" ] "."
noselect_decl: "noselect" [ typedecl ";"] nounifdecl [ "/" INT ] [ "[" _non_empty_seq{nounifoption} "]" ] "."
nounif_decl: "nounif" [ typedecl ";"] nounifdecl [ "/" INT ] [ "["_non_empty_seq{nounifoption} "]" ] "."
elimtrue_decl: "elimtrue" [ failtypedecl ";" ] term "."
clauses_decl: "clauses" clauses "."
module_decl: "@module" " " IDENT
# TODO: finish defining these (comes from Cryptoverif)
#param_decl: "param" _non_empty_seq{IDENT} options "."
#proba_decl: "proba" IDENT ["(...)"] options "."
#letproba_decl: "letproba" IDENT ["(...)"] "= ..." "."
#proof_decl: "proof" "{" proof "}"
#def_decl: "def" IDENT "(" _maybe_empty_seq{typeid} ")" "{" decl* "}"
#expand_decl: "expand" IDENT "(" _maybe_empty_seq{typeid} ")" "."
nidecl: IDENT [ "among" "(" _non_empty_seq{term} ")" ]
equality: term "=" term
| "let" IDENT "=" term "in" equality
mayfailequality: IDENT mayfailterm_seq "=" mayfailterm
eqlist: [ "forall" typedecl ";" ] equality [ ";" eqlist ]
clause: term
| term "->" term
| term "<->" term
| term "<=>" term
clauses: [ "forall" failtypedecl ";" ] clause [ ";" clauses ]
mayfailreduc: [ "forall" failtypedecl ";" ] mayfailequality [ "otherwise" mayfailreduc ]
NAT: DIGIT+
phase: "phase" NAT [";" process]
TAG: IDENT
sync: "sync" NAT ["[" TAG "]"] [";" process]
COMMENT: /\(\*(\*(?!\))|[^*])*\*\)/
%import common (WORD, DIGIT, NUMBER, WS) // imports from terminal library
%ignore WS // Disregard spaces in text
%ignore COMMENT
""",
debug=True,
# lexer_callbacks={"COMMENT": comments.append},
)
# COMMENT: /\(\*(\*(?!\))|[^*])*\*\)/
# COMMENT: "(*" /(\*(?!\))|[^*])*/ "*)"
# comment: /\(\*(?:(?!\(\*|\*\)).|(?R))*\*\)/
# TODO Open ProVerif compatibility questions
# TODO * does it allow leading zeros for NAT?
# TODO * tag is not defined? is it ident?
# TODO * are spaces between "event" and ":" allowed?
# TODO * spaces between "nat" and "("? "choice" and "["?
def parsertest(input):
parsetree = proverif_grammar.parse(input)
# tree.pydot__tree_to_png(parsetree, name + ".png")
return parsetree
def parse_main(file_path):
with open(file_path, "r") as f:
content = f.read()
# print(content)
parsertest(content)

130
marzipan/src/util.py Normal file
View File

@@ -0,0 +1,130 @@
from typing import Callable, Any, Tuple, List, TypeVar
from types import ModuleType as Module
from importlib import import_module
from dataclasses import dataclass
T = TypeVar('T')
def setup_exports() -> Tuple[List[str], Callable[[T], T]]:
__all__ = []
"""
Helper to provide an export() function with little boilerplate.
```
from marzipan.util import setup_exports
(__all__, export) = setup_exports()
```
"""
def export(what: T) -> T:
match what:
case str():
__all__.append(what)
case object(__name__ = name):
__all__.append(name)
case _:
raise TypeError(
f"Unsupported export type `{what}`: Export is neither `str` nor has it an attribute named `__name__`.")
return what
return (__all__, export)
(__all__, export) = setup_exports()
export(setup_exports)
@export
def rename(name: str) -> Callable[[T], T]:
def rename_impl(v: T) -> T:
v.__name__ = name
return v
return rename_impl
@export
def attempt(fn):
# TODO: Documentation tests
"""
Call a function returning a tuple of (result, exception).
The following example uses safe_call to implement a checked_divide
function that returns None if the division by zero is caught.
```python
try_divide = attempt(lambda a, b: a/b)
def checked_divide(a, b):
match try_divide(a, b):
case (result, None):
return result
case (None, ZeroDivisionError()):
return None
case _:
raise RuntimeError("Unreachable")
assert(checked_divide(1, 0) == None)
assert(checked_divide(0, 1) == 0)
assert(checked_divide(1, 1) == 1)
```
"""
def retfn(*args, **kwargs):
try:
return (fn(*args, **kwargs), None)
except Exception as e:
return (None, e)
retfn.__name__ = f"try_{fn.__name__}"
return retfn
@export
def scoped(fn: Callable[[], Any]) -> Any:
"""
Scoped variable assignment.
Just an alias for `call`. Use as a decorator to immediately call a function,
assigning the return value to the function name.
"""
return fn()
@export
def try_import(name : str) -> Tuple[Module | None, Exception | None]:
return attempt(import_module)(name)
@dataclass(frozen=True)
class Pkgs:
__mod__: Module | None
__prefix__: str | None
def __get__(self, k: str):
return getattr(self, k)
def __getattribute__(self, k: str):
match k:
case "__mod__" | "__prefix__" | "__class__":
# Access the underlying module value
return super().__getattribute__(k)
match self:
case Pkgs(None, None):
# Import package from root
return Pkgs(import_module(k), k)
# Try importing a subpackage
name = f"{self.__prefix__}.{k}"
match try_import(name):
case (child, None):
# Imported subpackage
return Pkgs(child, name)
case (_, ModuleNotFoundError()):
# No such module; access module property instead
return getattr(self.__mod__, k)
case (_, err):
# Unknown error, pass error on
raise err
@scoped
@export
def pkgs() -> Pkgs:
"""
Global package scope.
`pkgs.marzipan` imports the package `marzipan`
"""
return Pkgs(None, None)

265
marzipan/test-gpt-oss-2.py Normal file
View File

@@ -0,0 +1,265 @@
#!/usr/bin/env python3
# Below is a **more “Pythonic”** rewrite of the original AWKtoPython translator.
# The logic is exactly the same the same error messages, line numbers and exit
# codes but the code is organized into small, reusable functions, uses
# `dataclasses`, type hints, `Path.read_text()`, `re.sub()` and other idiomatic
# constructs. It is also easier to read and to extend.
"""
py_awk_translator.py
A linebyline preprocessor that implements the same behaviour as the
original AWK script you posted (handling @module, @alias, @longalias,
privatevariable expansion, @query/@reachable/@lemma checks and tokenwise
alias substitution).
Usage
python3 py_awk_translator.py file1.pv file2.pv
# or
cat file.pv | python3 py_awk_translator.py
"""
from __future__ import annotations
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterable
# ----------------------------------------------------------------------
# Helper utilities
# ----------------------------------------------------------------------
TOKEN_RE = re.compile(r"[0-9A-Za-z_']")
def is_token_char(ch: str) -> bool:
"""Return True if *ch* can be part of an identifier token."""
return bool(TOKEN_RE.fullmatch(ch))
def die(msg: str, fname: str, lineno: int) -> None:
"""Print an error to stderr and exit with status1 (exactly like AWK)."""
sys.stderr.write(f"{fname}:{lineno}: {msg}\n")
sys.exit(1)
# ----------------------------------------------------------------------
# Core translator holds the mutable state that the AWK script kept in
# global variables.
# ----------------------------------------------------------------------
@dataclass
class Translator:
"""Collects state while processing a file linebyline."""
# final output buffer
out: list[str] = field(default_factory=list)
# current @module name (used when expanding "~")
module: str = ""
# simple oneline aliases: name → replacement text
aliases: Dict[str, str] = field(default_factory=dict)
# multiline alias handling
long_name: str = ""
long_value: str = ""
# error flag mirrors the AWK variable `err`
err: int = 0
# ------------------------------------------------------------------
# Public entry point for a single line
# ------------------------------------------------------------------
def process(self, raw: str, fname: str, lineno: int) -> None:
"""Apply all transformation rules to *raw* and store the result."""
line = raw.rstrip("\n") # keep a copy for error messages
original = line # keep the untouched line for later
# --------------------------------------------------------------
# 1⃣ @module
# --------------------------------------------------------------
if line.startswith("@module"):
parts = line.split(maxsplit=1)
self.module = parts[1] if len(parts) > 1 else ""
self.aliases.clear()
line = ""
# --------------------------------------------------------------
# 2⃣ @alias
# --------------------------------------------------------------
elif line.startswith("@alias"):
for token in line.split()[1:]:
if "=" in token:
name, value = token.split("=", 1)
self.aliases[name] = value
line = ""
# --------------------------------------------------------------
# 3⃣ @long-aliasend
# --------------------------------------------------------------
elif line.startswith("@long-alias-end"):
if not self.long_name:
die("Long alias not started", fname, lineno)
# collapse multiple spaces → single space, strip trailing space
self.long_value = re.sub(r" +", " ", self.long_value).strip()
self.aliases[self.long_name] = self.long_value
self.long_name = self.long_value = ""
line = ""
# --------------------------------------------------------------
# 4⃣ @long-alias (start)
# --------------------------------------------------------------
elif line.startswith("@long-alias"):
parts = line.split(maxsplit=1)
self.long_name = parts[1] if len(parts) > 1 else ""
self.long_value = ""
line = ""
# --------------------------------------------------------------
# 5⃣ PRIVATE__ detection (illegal use of "~")
# --------------------------------------------------------------
elif "PRIVATE__" in line:
die(
"Used private variable without ~:\n\n"
f" {lineno} > {original}",
fname,
lineno,
)
# --------------------------------------------------------------
# 6⃣ @query / @reachable / @lemma validation
# --------------------------------------------------------------
elif re.search(r"@(query|reachable|lemma)", line):
if not re.search(r'@(query|reachable|lemma)\s+"[^"]*"', line):
die(
"@query or @reachable statement without parameter:\n\n"
f" {lineno} > {original}",
fname,
lineno,
)
# replace the quoted part with blanks (preserve line length)
m = re.search(r'@(query|reachable|lemma)\s+"[^"]*"', line)
start, end = m.span()
line = line[:start] + " " * (end - start) + line[end:]
# --------------------------------------------------------------
# 7⃣ Expand "~" to the privatevariable prefix
# --------------------------------------------------------------
if "~" in line:
line = line.replace("~", f"PRIVATE__{self.module}__")
# --------------------------------------------------------------
# 8⃣ Tokenwise alias substitution (the long AWK loop)
# --------------------------------------------------------------
line = self._expand_aliases(line)
# --------------------------------------------------------------
# 9⃣ Accumulate a multiline alias, if we are inside one
# --------------------------------------------------------------
if self.long_name:
self.long_value += line + " "
line = "" # the line itself must not appear in output
# --------------------------------------------------------------
# 🔟 Store the (possibly empty) line for final output
# --------------------------------------------------------------
self.out.append(line + "\n")
# ------------------------------------------------------------------
# Helper that implements the tokenwise alias replacement
# ------------------------------------------------------------------
def _expand_aliases(self, text: str) -> str:
"""Replace every wholetoken alias in *text* with its value."""
i = 0
result = ""
while i < len(text):
# a = previous char, c = current char
a = text[i - 1] if i > 0 else ""
c = text[i]
# If we are already inside a token, just move forward
if i > 0 and is_token_char(a):
i += 1
continue
# If the current char does not start a token, skip it
if not is_token_char(c):
i += 1
continue
# ----------------------------------------------------------
# At a token boundary try to match any alias
# ----------------------------------------------------------
matched = False
for name, value in self.aliases.items():
if text.startswith(name, i):
after = text[i + len(name) : i + len(name) + 1]
if is_token_char(after): # name is only a prefix
continue
# Alias matches replace it
result += text[:i] + value
text = text[i + len(name) :] # continue scanning the suffix
i = 0
matched = True
break
if not matched:
i += 1
return result + text
# ------------------------------------------------------------------
# Finalisation
# ------------------------------------------------------------------
def finish(self) -> None:
"""Write the accumulated output to stdout (unless an error occurred)."""
if self.err == 0:
sys.stdout.write("".join(self.out))
# ----------------------------------------------------------------------
# Commandline driver
# ----------------------------------------------------------------------
def _process_path(path: Path, translator: Translator) -> None:
"""Read *path* linebyline and feed it to *translator*."""
for lineno, raw in enumerate(path.read_text(encoding="utf-8").splitlines(True), start=1):
translator.process(raw, str(path), lineno)
def main() -> None:
translator = Translator()
# No file arguments → read from stdin (named "<stdin>")
if len(sys.argv) == 1:
# stdin may contain multiple lines; we treat it as a single “virtual”
# file so that line numbers are still correct.
for lineno, raw in enumerate(sys.stdin, start=1):
translator.process(raw, "<stdin>", lineno)
else:
for name in sys.argv[1:]:
p = Path(name)
if not p.is_file():
sys.stderr.write(f"File not found: {name}\n")
sys.exit(1)
_process_path(p, translator)
translator.finish()
if __name__ == "__main__":
main()
## What makes this version more Pythonic?
# | Aspect | Original style | Refactored style |
# |--------|----------------|------------------|
# | **State handling** | Global variables (`buf`, `module`, …) | `@dataclass Translator` encapsulates all mutable state |
# | **Regularexpression reuse** | Recompiled on every call (`match`, `gsub`) | Compiled once (`TOKEN_RE`) and reused |
# | **String manipulation** | Manual `substr`, concatenation in loops | Slicing, `str.replace`, `re.sub` for clarity |
# | **Loop logic** | `for (i=1; i<length($0); i+=1)` with many manual index tricks | A single `while` loop with earlycontinue guards; the inner aliassearch is a clean `for name, value in self.aliases.items()` |
# | **Error handling** | `print(... > "/dev/stderr")` and `exit(1)` | Dedicated `die()` helper that writes to `stderr` and exits |
# | **File I/O** | Manual `while (getline ...)` in AWK → `for` over `sys.stdin` / `Path.read_text()` | Uses `Path.read_text()` and `enumerate` for line numbers |
# | **Readability** | Mixed AWKstyle comments, oneliner `if` statements | Docstrings, section comments, type hints, and small helper methods |
# | **Extensibility** | Adding a new rule required editing a monolithic block | New rules can be added as separate `elif` blocks or new methods without touching the core loop |
# The script can be saved as `py_awk_translator.py`, made executable (`chmod +x py_awk_translator.py`), and used exactly like the original AWK program while being easier to maintain and understand.

292
marzipan/test-gpt-oss.py Normal file
View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
# **Python3 translation of the AWK script**
# Below is a dropin replacement that can be used the same way as the original
# `awk` program (give it one or more file names, or let it read from *stdin*).
# All the logic of the AWK version is kept the only difference is that the
# code is now ordinary, readable Python3.
"""
translate_awk_to_py.py
A linebyline translator for the “@module / @alias / @longalias …”
preprocessor that was originally written in AWK. The behaviour is
identical to the AWK script you posted, including the exact error
messages and exit codes.
Usage
python3 translate_awk_to_py.py file1.pv file2.pv
# or
cat file.pv | python3 translate_awk_to_py.py
The script prints the transformed source to *stdout* and writes any
diagnostic messages to *stderr* (exactly like the AWK version).
"""
import sys
import re
from pathlib import Path
# ----------------------------------------------------------------------
# Helper functions
# ----------------------------------------------------------------------
def istok(ch: str) -> bool:
"""Return True if *ch* is a token character (alnum, '_' or ''')."""
return bool(re.match(r"[0-9a-zA-Z_']", ch))
def error(msg: str, fname: str, lineno: int) -> None:
"""Print an error message to stderr and exit with status 1."""
sys.stderr.write(f"{fname}:{lineno}: {msg}\n")
sys.exit(1)
# ----------------------------------------------------------------------
# Main processing class (keeps the same global state as the AWK script)
# ----------------------------------------------------------------------
class Translator:
def __init__(self):
self.buf = "" # final output buffer
self.module = "" # current @module name
self.err = 0 # error flag (mirrors AWK's)
self.long_alias_name = "" # name of a multiline alias
self.long_alias_value = "" # accumulated value of that alias
self.aliases: dict[str, str] = {} # simple oneline aliases
# ----------------------------------| AWK rule | Python implementation |
# |----------|-----------------------|
# | `BEGIN` block initialise variables | `Translator.__init__` |
# | `@module` line set `module`, clear `aliases` | first `if` in `process_line` |
# | `@alias` line split `name=value` pairs into `aliases` | second `elif` |
# | `@long-alias` / `@long-alias-end` handling | third/fourth `elif` blocks + the `if self.long_alias_name` section |
# | Detection of illegal `PRIVATE__` usage | `elif "PRIVATE__" in orig_line` (the same string that the AWK script would have produced after the `~` replacement) |
# | Validation of `@query|@reachable|@lemma` statements | `elif re.search(r"@(query|reachable|lemma)", …)` |
# | Replacement of `~` with `PRIVATE__<module>__` | `line.replace("~", …)` |
# | Tokenwise alias substitution (the long `for (i=1; …)` loop) | the `while i < len(line): …` loop that restarts from the beginning after each successful replacement |
# | Accumulating the final output in `buf` | `self.buf += line + "\n"` |
# | `END` block print buffer if no error | `Translator.finish()` |
# The script can be saved as `translate_awk_to_py.py`, made executable (`chmod +x translate_awk_to_py.py`) and used exactly like the original AWK program. All error messages, line numbers and exit codes are identical, so any surrounding tooling that expects the AWK behaviour will continue to work.--------------------------------
# Linebyline processing (mirrors the order of the AWK rules)
# ------------------------------------------------------------------
def process_line(self, line: str, fname: str, lineno: int) -> None:
"""Transform *line* according to all the rules."""
# keep the original line for error reporting
orig_line = line.rstrip("\n")
# ------------------------------------------------------------------
# 1) @module
# ------------------------------------------------------------------
if orig_line.startswith("@module"):
parts = orig_line.split()
if len(parts) >= 2:
self.module = parts[1]
else:
self.module = ""
self.aliases.clear()
line = "" # AWK does: $0 = ""
# fall through nothing else on this line matters
# ------------------------------------------------------------------
# 2) @alias
# ------------------------------------------------------------------
elif orig_line.startswith("@alias"):
# everything after the keyword is a list of name=value pairs
for token in orig_line.split()[1:]:
if "=" in token:
name, value = token.split("=", 1)
self.aliases[name] = value
line = ""
# ------------------------------------------------------------------
# 3) @long-alias-end
# ------------------------------------------------------------------
elif orig_line.startswith("@long-alias-end"):
if not self.long_alias_name:
error("Long alias not started", fname, lineno)
# compress multiple spaces to a single space
self.long_alias_value = re.sub(r" +", " ", self.long_alias_value)
self.aliases[self.long_alias_name] = self.long_alias_value.strip()
# reset the temporary variables
self.long_alias_name = ""
self.long_alias_value = ""
line = ""
# ------------------------------------------------------------------
# 4) @long-alias (start of a multiline alias)
# ------------------------------------------------------------------
elif orig_line.startswith("@long-alias"):
parts = orig_line.split()
if len(parts) >= 2:
self.long_alias_name = parts[1]
self.long_alias_value = ""
else:
self.long_alias_name = ""
self.long_alias_value = ""
line = ""
# ------------------------------------------------------------------
# 5) PRIVATE__ detection (illegal use of "~")
# ------------------------------------------------------------------
elif "PRIVATE__" in orig_line:
# The AWK version looks for the literal string PRIVATE__ (which
# appears only after the "~" replacement). We keep the same
# behaviour.
error(
"Used private variable without ~:\n\n"
f" {lineno} > {orig_line}",
fname,
lineno,
)
# ------------------------------------------------------------------
# 6) @query / @reachable / @lemma validation
# ------------------------------------------------------------------
elif re.search(r"@(query|reachable|lemma)", orig_line):
# Must contain a quoted string after the keyword
if not re.search(r'@(query|reachable|lemma)\s+"[^"]*"', orig_line):
error(
"@query or @reachable statement without parameter:\n\n"
f" {lineno} > {orig_line}",
fname,
lineno,
)
# Replace the quoted part with spaces (preserve line length)
m = re.search(r'@(query|reachable|lemma)\s+"[^"]*"', orig_line)
start, end = m.start(), m.end()
pre = orig_line[:start]
mat = orig_line[start:end]
post = orig_line[end:]
mat_spaced = " " * len(mat)
line = pre + mat_spaced + post
# ------------------------------------------------------------------
# 7) Replace "~" with the privatevariable prefix
# ------------------------------------------------------------------
else:
# No special rule matched yet we keep the line asis for now.
line = orig_line
# ------------------------------------------------------------------
# 8) Insert the privatevariable prefix (if any "~" is present)
# ------------------------------------------------------------------
if "~" in line:
line = line.replace("~", f"PRIVATE__{self.module}__")
# ------------------------------------------------------------------
# 9) Alias substitution (tokenwise, exactly like the AWK loop)
# ------------------------------------------------------------------
# The algorithm walks through the line character by character,
# looking for the start of a token. When a token matches a key in
# *self.aliases* it is replaced by the stored value and the scan
# restarts from the beginning of the (now shorter) line.
i = 0
minibuf = ""
while i < len(line):
# a = previous character, c = current character
a = line[i - 1] if i > 0 else ""
c = line[i]
# If we are already inside a token, just move on
if i > 0 and istok(a):
i += 1
continue
# If the current character does NOT start a token, skip it
if not istok(c):
i += 1
continue
# --------------------------------------------------------------
# We are at a token boundary try to match any alias
# --------------------------------------------------------------
matched = False
for alias, value in self.aliases.items():
klen = len(alias)
token = line[i : i + klen]
after = line[i + klen : i + klen + 1] # char after the token
if token != alias:
continue
if istok(after): # alias is only a prefix of a longer token
continue
# ---- alias matches -------------------------------------------------
matched = True
prefix = line[:i] # everything before the token
suffix = line[i + klen :] # everything after the token
minibuf += prefix + value
line = suffix # continue scanning the suffix
i = 0 # restart from the beginning
break
if not matched:
# No alias matched keep the current character and move on
i += 1
# Append whatever is left of the line after the last replacement
line = minibuf + line
# ------------------------------------------------------------------
# 10) If we are inside a multiline alias, accumulate the line
# ------------------------------------------------------------------
if self.long_alias_name:
self.long_alias_value += line + " "
line = "" # the line itself must not appear in the output
# ------------------------------------------------------------------
# 11) Append the (possibly empty) line to the global buffer
# ------------------------------------------------------------------
self.buf += line + "\n"
# ------------------------------------------------------------------
# Final output
# ------------------------------------------------------------------
def finish(self) -> None:
"""Print the accumulated buffer if no error occurred."""
if self.err == 0:
sys.stdout.write(self.buf)
# ----------------------------------------------------------------------
# Entry point
# ----------------------------------------------------------------------
def main() -> None:
translator = Translator()
# If no file name is given we read from stdin (named "<stdin>")
if len(sys.argv) == 1:
translator.process_line(sys.stdin.read(), "<stdin>", 1)
else:
for fname in sys.argv[1:]:
path = Path(fname)
try:
with path.open(encoding="utf-8") as f:
for lineno, raw in enumerate(f, start=1):
translator.process_line(raw, str(path), lineno)
except FileNotFoundError:
sys.stderr.write(f"File not found: {fname}\n")
sys.exit(1)
translator.finish()
if __name__ == "__main__":
main()
### How the Python version mirrors the AWK script
# | AWK rule | Python implementation |
# |----------|-----------------------|
# | `BEGIN` block initialise variables | `Translator.__init__` |
# | `@module` line set `module`, clear `aliases` | first `if` in `process_line` |
# | `@alias` line split `name=value` pairs into `aliases` | second `elif` |
# | `@long-alias` / `@long-alias-end` handling | third/fourth `elif` blocks + the `if self.long_alias_name` section |
# | Detection of illegal `PRIVATE__` usage | `elif "PRIVATE__" in orig_line` (the same string that the AWK script would have produced after the `~` replacement) |
# | Validation of `@query|@reachable|@lemma` statements | `elif re.search(r"@(query|reachable|lemma)", …)` |
# | Replacement of `~` with `PRIVATE__<module>__` | `line.replace("~", …)` |
# | Tokenwise alias substitution (the long `for (i=1; …)` loop) | the `while i < len(line): …` loop that restarts from the beginning after each successful replacement |
# | Accumulating the final output in `buf` | `self.buf += line + "\n"` |
# | `END` block print buffer if no error | `Translator.finish()` |
# The script can be saved as `translate_awk_to_py.py`, made executable (`chmod +x translate_awk_to_py.py`) and used exactly like the original AWK program. All error messages, line numbers and exit codes are identical, so any surrounding tooling that expects the AWK behaviour will continue to work.