This commit is contained in:
Spencer Killen 2022-06-28 16:37:06 -06:00
parent 99a9a6e6a1
commit 181c53dfb0
No known key found for this signature in database
GPG Key ID: 750742B5BFA28418
7 changed files with 137 additions and 42 deletions

17
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"args": ["programs/a.lp"],
"console": "integratedTerminal",
"justMyCode": true
}
]
}

4
NOTES.md Normal file
View File

@ -0,0 +1,4 @@
# Technical issues
- Clingo may or may not choose to propagate DL atoms before the ontology tells it to.
- Symbols for atoms are not always present in the control object (E.g. a rule `a :- a`)
Current workaround is to use signature (which don't get removed). This may have implications about what can be grounded.

Binary file not shown.

View File

@ -1,10 +1,15 @@
from typing import Set
from typing import Iterable, Set
models = (
{"a", "b", "c"},
)
models = None
DL_ATOMS = "abc"
def set_models(alphabet: Iterable[str], v: Iterable[Iterable[str]]):
global models, DL_ATOMS
DL_ATOMS = "".join(alphabet)
models = tuple(set(model) for model in v)
DL_ATOMS = set.union(*models)
def propagate(atoms: Set[str]) -> Set[str]:
atoms = atoms.intersection(DL_ATOMS)
@ -14,7 +19,7 @@ def propagate(atoms: Set[str]) -> Set[str]:
return set()
return first.intersection(*sub_models)
def check(atoms: Set[str]) -> bool:
atoms = atoms.intersection(DL_ATOMS)
return atoms in models

0
programs/a.lp Normal file
View File

6
programs/a.py Normal file
View File

@ -0,0 +1,6 @@
"ab"
(
"a",
"b",
"ab",
)

View File

@ -1,9 +1,7 @@
from copy import deepcopy
from itertools import chain
from operator import getitem
from typing import Iterable, List, Tuple
from functools import partial
from typing import Iterable, List, Set, Tuple
import clingo
from more_itertools import partition
from clingo import (
PropagateControl,
PropagateInit,
@ -11,8 +9,6 @@ from clingo import (
PropagatorCheckMode,
Assignment,
Symbol,
Control,
TheoryAtom,
)
@ -28,7 +24,7 @@ No way to strict atoms except through adding nogoods/clauses?
import ontology as O
class Ontology:
class OntologyPropagator:
def init(self, init: PropagateInit) -> None:
init.check_mode = PropagatorCheckMode.Total
@ -38,6 +34,8 @@ class Ontology:
init.solver_literal(atom.literal): str(atom.symbol)
for atom in init.symbolic_atoms
}
# Note that Clingo will (sometimes?) probably use the same solver literals for theory and symbolic atoms
# But storing them separately is safer
self.theory_atoms = {
init.solver_literal(atom.literal): atom.elements[0].terms[0].name
for atom in init.theory_atoms
@ -57,6 +55,7 @@ class Ontology:
theory_lit = self.theory_atoms_inv[theory_atom]
symbolic_lit = self.symbolic_atoms_inv[theory_atom]
# This is already implied if the symbolic and theory literals are the same
init.add_clause((-symbolic_lit, theory_lit))
def truthy_atoms_text(self):
@ -80,30 +79,64 @@ class Ontology:
if not self.assignment[atom]
)
def print_nogood(self, nogood: Tuple[int, ...]):
print("adding nogood: ", end="")
names = (
self.symbolic_atoms.get(abs(lit), self.theory_atoms.get(abs(lit)))
for lit in nogood
)
print(
" ".join(
f"(not {name})" if lit < 0 else f"({name})"
for lit, name in zip(nogood, names)
)
)
def assign_nogood_true(self, pcontrol: PropagateControl, lits: Iterable[int]):
not_lits = (-lit for lit in lits)
assignment = (
lit if is_pos else -lit
for lit, is_pos in chain(
self.symbolic_atoms.items(), self.theory_atoms.items()
)
)
pcontrol.add_nogood(chain(assignment, not_lits))
not_lits = set(-lit for lit in lits)
assert len(not_lits)
assignment = set(self.assignment_lits())
a = set(map(abs, not_lits))
b = set(map(abs, assignment))
assert len(a | b) == (len(a) + len(b))
nogood = tuple(chain(assignment, not_lits))
self.print_nogood(nogood)
pcontrol.add_nogood(nogood)
def assignment_lits(self):
return (lit if is_pos else -lit for lit, is_pos in self.assignment.items())
def conflict(self, pcontrol: PropagateControl):
print("conflict: ", end="")
self.print_assignment()
pcontrol.add_nogood(self.assignment_lits())
def propagate(self, pcontrol: PropagateControl, changes) -> None:
print("propagate: ", end="")
self.print_assignment()
for change in changes:
atom = abs(change)
assert atom not in self.assignment
self.assignment[atom] = change >= 0
in_atoms = set(self.truthy_atoms_text())
print("made truthy: ", " ".join(in_atoms))
out_atoms = set(self.atom_text_to_dl_atom(O.propagate(in_atoms)))
new_atoms = out_atoms - in_atoms
l = lambda atom: atom in self.assignment
new_atoms, prev_new_atoms = map(tuple, partition(l, new_atoms))
if any(not self.assignment[atom] for atom in prev_new_atoms):
self.conflict(pcontrol)
elif len(new_atoms):
self.assign_nogood_true(pcontrol, new_atoms)
else:
return
pcontrol.propagate()
print("propagate: ", end="")
self.print_assignment()
def undo(self, thread_id: int, assignment: Assignment, changes: List[int]):
for change in changes:
@ -113,6 +146,9 @@ class Ontology:
def check(self, pcontrol: PropagateControl) -> None:
print("check: ", end="")
self.print_assignment()
in_atoms = set(self.truthy_atoms_text())
if not O.check(in_atoms):
self.conflict(pcontrol)
def print_assignment(self):
print("assignment: ", end="")
@ -142,11 +178,6 @@ class Ontology:
return None, False, False
program = """
a :- not b.
b :- not a.
"""
# Need to ground program before we can look up symbolic atoms and
# Having two waves of grounding might be having some weird effects
# So we parse and ground and then revert back to text then reparse
@ -161,25 +192,57 @@ def add_external_atoms(program: str) -> str:
&o/0 : kterm, any
}.
"""
# Using .signatures here because symbols is unreliable
# E.g. a rule with the single rule `a :- a.` will not generate a symbol for an atom
external_atoms = "\n".join(
f"{atom} :- &o{{{atom}}}."
for atom in (str(atom.symbol) for atom in control.symbolic_atoms)
for atom in (sig for sig, _, _ in control.symbolic_atoms.signatures)
)
return theory_grammar + program + external_atoms
return theory_grammar + program + "\n" + external_atoms
program = add_external_atoms(program)
print(program)
control = clingo.Control(["0"])
propagator = Ontology()
control.register_propagator(propagator)
control.add("base", [], program)
control.ground([("base", [])])
def solve(program: str, O_alphabet: Iterable[str], O_models: Iterable[Iterable[str]]):
O.set_models(O_alphabet, O_models)
program = add_external_atoms(program)
print("USING ONTOLOGY WITH ALPHABET AND MODELS:")
print(O_alphabet)
print(O_models)
print("USING PROGRAM:")
print(program)
control = clingo.Control(["0"])
propagator = OntologyPropagator()
control.register_propagator(propagator)
control.add("base", [], program)
control.ground([("base", [])])
# control.add("external", [], theory_grammar + external_atoms)
answer_sets = []
# control.ground([("external", [])])
with control.solve(yield_=True) as solve_handle:
with control.solve(yield_=True) as solve_handle:
for model in solve_handle:
print("answer set:", model)
answer_sets.append(str(model))
print()
if len(answer_sets):
print("ALL FINISHED, ALL ANSWER SETS:")
print(*answer_sets, sep="\n")
else:
print("ALL DONE, NO ANSWER SETS")
def main():
from sys import argv
from os.path import splitext
assert len(argv) == 2, "Please provide an .lp file as an argument"
lp_filename = argv[1]
models_filename = splitext(lp_filename)[0] + ".py"
with open(lp_filename, "rt", encoding="utf8") as lp_fo:
with open(models_filename, "rt", encoding="utf8") as models_fo:
alphabet = eval(models_fo.readline())
models = eval(models_fo.read())
solve(lp_fo.read(), alphabet, models)
if __name__ == "__main__":
main()