working from campus

This commit is contained in:
Spencer Killen 2022-06-29 16:06:09 -06:00
parent 181c53dfb0
commit 1f380c8619
No known key found for this signature in database
GPG Key ID: F5DA11602FA5BE3D
10 changed files with 126 additions and 34 deletions

0
Makefile Normal file
View File

Binary file not shown.

30
example.py Normal file
View File

@ -0,0 +1,30 @@
from mimetypes import init
import clingo
PROGRAM_A = """
#theory o {
kterm {- : 0, unary };
&o/0 : kterm, any
}.
a, b.
a :- &o{a}.
b :- &o{b}.
"""
class CheckAtoms:
def init(self, init: clingo.PropagateInit):
theory_lits = set(init.solver_literal(atom.literal) for atom in init.theory_atoms)
symbolic_lits = set(init.solver_literal(atom.literal) for atom in init.symbolic_atoms)
if len(theory_lits & symbolic_lits) == 0:
print("There is no solver literal overlap")
else:
print("There is overlap")
control = clingo.Control(["0"])
control.register_propagator(CheckAtoms())
control.add("base", [], PROGRAM_A)
control.ground([("base", [])])
control.solve()

View File

@ -1,4 +1,4 @@
from typing import Iterable, Set from typing import Iterable, Set, Union
models = None models = None
@ -13,13 +13,16 @@ def set_models(alphabet: Iterable[str], v: Iterable[Iterable[str]]):
def propagate(atoms: Set[str]) -> Set[str]: def propagate(atoms: Set[str]) -> Set[str]:
atoms = atoms.intersection(DL_ATOMS) atoms = atoms.intersection(DL_ATOMS)
sub_models = (model for model in models if atoms <= model) sup_models = (model for model in models if atoms <= model)
first: set = next(sub_models, None) first: set = next(sup_models, None)
if first is None: if first is None:
return set() return set()
return first.intersection(*sub_models) return first.intersection(*sup_models)
def check(atoms: Set[str]) -> bool: def check(atoms: Set[str]) -> Union[Set[str], None]:
atoms = atoms.intersection(DL_ATOMS) atoms = atoms.intersection(DL_ATOMS)
return atoms in models if atoms not in models:
return None
sub_models = (model for model in models if model < atoms)
return atoms - atoms.intersection(*sub_models)

View File

@ -0,0 +1,4 @@
a :- not b.
b :- not a.

View File

@ -1,5 +1,6 @@
"ab" "ab"
( (
"",
"a", "a",
"b", "b",
"ab", "ab",

2
programs/b.lp Normal file
View File

@ -0,0 +1,2 @@
a :- not b.
b :- not a.

6
programs/b.py Normal file
View File

@ -0,0 +1,6 @@
"ab"
(
"a",
"b",
"ab",
)

View File

@ -1,7 +1,7 @@
from itertools import chain from itertools import chain
from typing import Iterable, List, Set, Tuple from typing import Iterable, Iterator, List, Set, Tuple
import clingo import clingo
from more_itertools import partition from more_itertools import partition, unique_everseen
from clingo import ( from clingo import (
PropagateControl, PropagateControl,
PropagateInit, PropagateInit,
@ -10,7 +10,10 @@ from clingo import (
Assignment, Assignment,
Symbol, Symbol,
) )
from functools import partial
from sys import stderr
eprint = partial(print, file=stderr)
""" """
API notes: API notes:
@ -51,12 +54,14 @@ class OntologyPropagator:
# Could add these with additional rules, but I think that will change the semantics of the O atoms. # Could add these with additional rules, but I think that will change the semantics of the O atoms.
# An ontology atom must be true if it's regular counterpart is also true # An ontology atom must be true if it's regular counterpart is also true
# The opposite direction is already enforced by rules. # The opposite direction is already enforced by rules.
for theory_atom in self.theory_atoms_inv: # for theory_atom in self.theory_atoms_inv:
theory_lit = self.theory_atoms_inv[theory_atom] # theory_lit = self.theory_atoms_inv[theory_atom]
symbolic_lit = self.symbolic_atoms_inv[theory_atom] # symbolic_lit = self.symbolic_atoms_inv[theory_atom]
# This is already implied if the symbolic and theory literals are the same # # This is already implied if the symbolic and theory literals are the same
init.add_clause((-symbolic_lit, theory_lit)) # init.add_clause((-symbolic_lit, theory_lit))
assert len(set(self.symbolic_atoms) & set(self.theory_atoms)) == 0
def truthy_atoms_text(self): def truthy_atoms_text(self):
return ( return (
@ -80,19 +85,19 @@ class OntologyPropagator:
) )
def print_nogood(self, nogood: Tuple[int, ...]): def print_nogood(self, nogood: Tuple[int, ...]):
print("adding nogood: ", end="") eprint("adding nogood: ", end="")
names = ( names = (
self.symbolic_atoms.get(abs(lit), self.theory_atoms.get(abs(lit))) self.symbolic_atoms.get(abs(lit), self.theory_atoms.get(abs(lit)))
for lit in nogood for lit in nogood
) )
print( eprint(
" ".join( " ".join(
f"(not {name})" if lit < 0 else f"({name})" f"(not {name})" if lit < 0 else f"({name})"
for lit, name in zip(nogood, names) for lit, name in zip(nogood, names)
) )
) )
def assign_nogood_true(self, pcontrol: PropagateControl, lits: Iterable[int]): def assign_nogood(self, pcontrol: PropagateControl, lits: Iterable[int]):
not_lits = set(-lit for lit in lits) not_lits = set(-lit for lit in lits)
assert len(not_lits) assert len(not_lits)
assignment = set(self.assignment_lits()) assignment = set(self.assignment_lits())
@ -108,7 +113,7 @@ class OntologyPropagator:
return (lit if is_pos else -lit for lit, is_pos in self.assignment.items()) return (lit if is_pos else -lit for lit, is_pos in self.assignment.items())
def conflict(self, pcontrol: PropagateControl): def conflict(self, pcontrol: PropagateControl):
print("conflict: ", end="") eprint("conflict: ", end="")
self.print_assignment() self.print_assignment()
pcontrol.add_nogood(self.assignment_lits()) pcontrol.add_nogood(self.assignment_lits())
@ -120,10 +125,12 @@ class OntologyPropagator:
in_atoms = set(self.truthy_atoms_text()) in_atoms = set(self.truthy_atoms_text())
print("made truthy: ", " ".join(in_atoms)) eprint("made truthy: ", " ".join(in_atoms))
out_atoms = set(self.atom_text_to_dl_atom(O.propagate(in_atoms))) out_atoms = set(O.propagate(in_atoms))
# This is sort of special in that regular atoms won't propagate their ontology counterparts.
new_atoms = out_atoms - in_atoms new_atoms = out_atoms - in_atoms
new_atoms = set(self.atom_text_to_dl_atom(new_atoms))
l = lambda atom: atom in self.assignment l = lambda atom: atom in self.assignment
new_atoms, prev_new_atoms = map(tuple, partition(l, new_atoms)) new_atoms, prev_new_atoms = map(tuple, partition(l, new_atoms))
@ -131,11 +138,11 @@ class OntologyPropagator:
if any(not self.assignment[atom] for atom in prev_new_atoms): if any(not self.assignment[atom] for atom in prev_new_atoms):
self.conflict(pcontrol) self.conflict(pcontrol)
elif len(new_atoms): elif len(new_atoms):
self.assign_nogood_true(pcontrol, new_atoms) self.assign_nogood(pcontrol, new_atoms)
else: else:
return return
pcontrol.propagate() pcontrol.propagate()
print("propagate: ", end="") eprint("propagate: ", end="")
self.print_assignment() self.print_assignment()
def undo(self, thread_id: int, assignment: Assignment, changes: List[int]): def undo(self, thread_id: int, assignment: Assignment, changes: List[int]):
@ -143,19 +150,38 @@ class OntologyPropagator:
atom = abs(change) atom = abs(change)
del self.assignment[atom] del self.assignment[atom]
def lits_to_text(self, lits: Iterable[int]) -> Iterable[str]:
return (
self.symbolic_atoms.get(abs(lit), self.theory_atoms.get(abs(lit)))
for lit in lits
)
def check(self, pcontrol: PropagateControl) -> None: def check(self, pcontrol: PropagateControl) -> None:
print("check: ", end="") eprint("check: ", end="")
self.print_assignment() self.print_assignment()
in_atoms = set(self.truthy_atoms_text()) in_atoms = set(self.truthy_atoms_text())
if not O.check(in_atoms):
shrink = O.check(in_atoms)
if shrink is None:
self.conflict(pcontrol) self.conflict(pcontrol)
return
shrink = tuple(self.theory_atoms_inv[atom] for atom in shrink)
if any(self.assignment.get(abs(lit)) for lit in shrink):
self.conflict(pcontrol)
return
eprint("shrink with: ", " ".join(self.lits_to_text(shrink)))
for lit in shrink:
self.assign_nogood(pcontrol, (-lit,))
def print_assignment(self): def print_assignment(self):
print("assignment: ", end="") eprint("assignment: ", end="")
for lit in self.assignment: for lit in self.assignment:
lit = -lit if not self.assignment[lit] else lit lit = -lit if not self.assignment[lit] else lit
print(self.solver_lit_text(lit), end=" ") eprint(self.solver_lit_text(lit), end=" ")
print() eprint()
def is_theory_atom(self, lit: int): def is_theory_atom(self, lit: int):
_, _, a = self.lookup_solver_lit(lit) _, _, a = self.lookup_solver_lit(lit)
@ -204,11 +230,11 @@ def add_external_atoms(program: str) -> str:
def solve(program: str, O_alphabet: Iterable[str], O_models: Iterable[Iterable[str]]): def solve(program: str, O_alphabet: Iterable[str], O_models: Iterable[Iterable[str]]):
O.set_models(O_alphabet, O_models) O.set_models(O_alphabet, O_models)
program = add_external_atoms(program) program = add_external_atoms(program)
print("USING ONTOLOGY WITH ALPHABET AND MODELS:") eprint("USING ONTOLOGY WITH ALPHABET AND MODELS:")
print(O_alphabet) eprint(O_alphabet)
print(O_models) eprint(O_models)
print("USING PROGRAM:") eprint("USING PROGRAM:")
print(program) eprint(program)
control = clingo.Control(["0"]) control = clingo.Control(["0"])
propagator = OntologyPropagator() propagator = OntologyPropagator()
control.register_propagator(propagator) control.register_propagator(propagator)
@ -219,13 +245,13 @@ def solve(program: str, O_alphabet: Iterable[str], O_models: Iterable[Iterable[s
with control.solve(yield_=True) as solve_handle: with control.solve(yield_=True) as solve_handle:
for model in solve_handle: for model in solve_handle:
print("answer set:", model) eprint("answer set:", model)
answer_sets.append(str(model)) answer_sets.append(str(model))
print() eprint()
if len(answer_sets): if len(answer_sets):
print("ALL FINISHED, ALL ANSWER SETS:") print("ALL FINISHED, ALL ANSWER SETS:")
print(*answer_sets, sep="\n") print(*unique_everseen(answer_sets), sep="\n")
else: else:
print("ALL DONE, NO ANSWER SETS") print("ALL DONE, NO ANSWER SETS")

20
tests.py Normal file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from glob import glob
from os import system
from functools import partial
from sys import argv, stderr
from os.path import splitext
eprint = partial(print, file=stderr)
if len(argv) >= 2 and argv[1] == "-a":
accept = True
else:
accept = False
system("mkdir -p tests")
for lp in glob("programs/*.lp"):
base = splitext(lp)[0]