"""
High-level convenience API for common KST operations.
Designed for users who want simple function calls without needing
to construct intermediate objects. Accepts plain Python types
(lists, dicts, tuples) instead of library-specific classes.
This module re-exports its functions at the package level, so you
can write::
import knowledgespaces
structure = ks.space_from_prerequisites(...)
instead of dealing with SurmiseRelation, KnowledgeStructure, etc.
"""
from __future__ import annotations
from collections.abc import Callable
import numpy as np
from knowledgespaces.assessment.adaptive import is_converged, select_item_eig
from knowledgespaces.assessment.blim import BLIM, BLIMParams, StatePosterior
from knowledgespaces.derivation.cbkst import derive_knowledge_structure as _derive_cbkst
from knowledgespaces.derivation.skill_map import SkillMap
from knowledgespaces.estimation.blim_em import ResponseMatrix, estimate_blim
from knowledgespaces.structures.knowledge_structure import KnowledgeStructure
from knowledgespaces.structures.relations import SurmiseRelation
# ------------------------------------------------------------------
# Structure creation (one call)
# ------------------------------------------------------------------
[docs]
def space_from_prerequisites(
items: list[str],
prerequisites: list[tuple[str, str]],
) -> KnowledgeStructure:
"""Build a knowledge structure from prerequisite pairs.
Parameters
----------
items : list[str]
All item names.
prerequisites : list[tuple[str, str]]
Pairs (a, b) meaning 'a is a prerequisite of b'.
Returns
-------
KnowledgeStructure
The ordinal knowledge structure (closed under union and intersection).
Examples
--------
>>> ks = space_from_prerequisites(
... ["add", "sub", "mul"],
... [("add", "sub"), ("sub", "mul")],
... )
>>> ks.n_states
4
"""
rel = SurmiseRelation(items, prerequisites)
return KnowledgeStructure.from_surmise_relation(rel.transitive_closure())
[docs]
def structure_from_skill_map(
skill_map: dict[str, list[str]],
skill_prerequisites: list[tuple[str, str]] | None = None,
) -> KnowledgeStructure:
"""Derive a knowledge structure from a skill map (CB-KST).
Note: with conjunctive skill maps (items requiring multiple
skills) the result is not necessarily union-closed and therefore
may not be a knowledge *space*. Use ``is_knowledge_space`` to
check.
Parameters
----------
skill_map : dict[str, list[str]]
For each item, the list of skills it requires.
Example: {"q1": ["s_add"], "q2": ["s_add", "s_carry"]}
skill_prerequisites : list[tuple[str, str]] or None
Pairs (a, b) meaning 'skill a is a prerequisite of skill b'.
If None, skills are treated as independent.
Returns
-------
KnowledgeStructure
The derived knowledge structure (may or may not be a space).
Examples
--------
>>> ks = structure_from_skill_map(
... {"q1": ["s1"], "q2": ["s1", "s2"]},
... [("s1", "s2")],
... )
"""
items = list(skill_map.keys())
all_skills: set[str] = set()
for skills in skill_map.values():
all_skills.update(skills)
skill_list = sorted(all_skills)
sm = SkillMap(items, skill_list, skill_map)
sr = SurmiseRelation(skill_list, skill_prerequisites or [])
result = _derive_cbkst(sm, sr)
return result.knowledge_structure
[docs]
def space_from_skill_map(
skill_map: dict[str, list[str]],
skill_prerequisites: list[tuple[str, str]] | None = None,
) -> KnowledgeStructure:
"""Deprecated: use :func:`structure_from_skill_map` instead.
This function was renamed because with conjunctive skill maps
the result is not necessarily a knowledge space.
"""
import warnings
warnings.warn(
"space_from_skill_map() is deprecated — use structure_from_skill_map(). "
"With conjunctive skill maps the result may not be a knowledge space.",
DeprecationWarning,
stacklevel=2,
)
return structure_from_skill_map(skill_map, skill_prerequisites)
# ------------------------------------------------------------------
# Assessment (simplified)
# ------------------------------------------------------------------
[docs]
def assess(
structure: KnowledgeStructure,
responses: dict[str, bool] | list[tuple[str, bool]],
beta: float | dict[str, float] = 0.1,
eta: float | dict[str, float] = 0.2,
prior: dict[frozenset[str], float] | None = None,
) -> dict:
"""Assess a student's knowledge state from their responses.
Parameters
----------
structure : KnowledgeStructure
The knowledge structure.
responses : dict[str, bool] or list[tuple[str, bool]]
Observed responses. Two formats accepted:
- ``dict``: one observation per item, e.g. ``{"add": True, "sub": False}``
- ``list of tuples``: multiple observations allowed per item
(from different instances), e.g.
``[("add", True), ("add", True), ("sub", False)]``
In the list format, the same item can appear multiple times.
Each observation updates the posterior independently (local
independence assumption).
beta : float or dict[str, float]
Slip parameter (scalar or per-item).
eta : float or dict[str, float]
Guess parameter (scalar or per-item).
prior : dict[frozenset[str], float] or None
Optional prior over states (e.g. from a previous EM fit).
If None, a uniform prior is used.
Returns
-------
dict
Keys: 'state' (most likely state as a set), 'probability',
'mastery' (per-item mastery probabilities),
'inner_fringe', 'outer_fringe'.
Examples
--------
One observation per item::
result = assess(structure, {"add": True, "sub": True, "mul": False})
Multiple instances of the same item::
result = assess(structure, [("add", True), ("add", True), ("sub", False)])
"""
blim = BLIM(structure, BLIMParams(beta=beta, eta=eta))
if prior is not None:
posterior = StatePosterior.from_prior(blim, prior)
else:
posterior = StatePosterior.uniform(blim)
observations = list(responses.items()) if isinstance(responses, dict) else responses
for item, correct in observations:
posterior = posterior.update(item, correct)
state, prob = posterior.most_likely_state
inner = structure.inner_fringe(state)
outer = structure.outer_fringe(state)
return {
"state": set(state),
"probability": prob,
"mastery": posterior.marginal_mastery(),
"inner_fringe": set(inner),
"outer_fringe": set(outer),
}
_FIT_REQUIRED_KEYS = {"beta", "eta", "pi"}
def _validate_fit(fit: dict) -> None:
missing = _FIT_REQUIRED_KEYS - set(fit)
if missing:
raise ValueError(
f"fit dict is missing keys {missing}. "
f"Expected output of fit_blim() with keys {_FIT_REQUIRED_KEYS}."
)
[docs]
def assess_from_fit(
structure: KnowledgeStructure,
fit: dict,
responses: dict[str, bool] | list[tuple[str, bool]],
) -> dict:
"""Assess using estimated parameters from :func:`fit_blim`.
Equivalent to ``assess(structure, responses, beta=..., eta=..., prior=...)``
with values taken from the fit result.
Parameters
----------
structure : KnowledgeStructure
The knowledge structure (same used for fitting).
fit : dict
Output of :func:`fit_blim`.
responses : dict or list
Observed responses (same format as :func:`assess`).
"""
_validate_fit(fit)
return assess(
structure,
responses,
beta=fit["beta"],
eta=fit["eta"],
prior=fit["pi"],
)
[docs]
def adaptive_assess(
structure: KnowledgeStructure,
ask_fn: Callable[[str], bool],
*,
instances: dict[str, list[str]] | None = None,
beta: float | dict[str, float] = 0.1,
eta: float | dict[str, float] = 0.2,
prior: dict[frozenset[str], float] | None = None,
threshold: float = 0.85,
max_questions: int = 25,
) -> dict:
"""Run a complete adaptive assessment.
Parameters
----------
structure : KnowledgeStructure
The knowledge structure.
ask_fn : callable
If instances is None: takes an item name (str) and returns bool.
If instances is provided: takes an instance ID (str) and returns bool.
instances : dict[str, list[str]] or None
Optional mapping {item: [instance_id, ...]} for multi-instance
assessment. When provided, the engine selects the best un-asked
instance (not item) and passes its ID to ask_fn. Different
instances of the same item are treated as equivalent by the BLIM.
beta, eta : float or dict[str, float]
BLIM parameters (scalar or per-item).
prior : dict[frozenset[str], float] or None
Optional prior over states. If None, uniform.
threshold : float
Stop when most likely state reaches this probability.
max_questions : int
Maximum number of questions to ask.
Returns
-------
dict
Keys: 'state', 'probability', 'mastery', 'inner_fringe',
'outer_fringe', 'questions_asked' (int), 'history' (list of
(instance_or_item, item, response) tuples).
Examples
--------
Simple (one instance per item)::
result = adaptive_assess(structure, lambda item: item in {"add", "sub"})
With multiple instances::
result = adaptive_assess(
structure,
lambda inst_id: ask_student(inst_id),
instances={
"addition": ["3+2", "7+5", "12+9"],
"subtraction": ["8-3", "15-7"],
},
)
"""
from knowledgespaces.assessment.instances import InstancePool, select_instance_eig
blim = BLIM(structure, BLIMParams(beta=beta, eta=eta))
if prior is not None:
posterior = StatePosterior.from_prior(blim, prior)
else:
posterior = StatePosterior.uniform(blim)
history: list[tuple[str, str, bool]] = []
if instances is not None:
# Instance-aware mode: validate and build pool
pool = InstancePool.from_dict(instances)
pool.validate_domain(structure.domain)
asked_instances: set[str] = set()
for _ in range(max_questions):
if is_converged(posterior, threshold=threshold):
break
try:
best = select_instance_eig(posterior, pool, asked=asked_instances)
except ValueError:
break # all instances exhausted
response = bool(ask_fn(best.instance_id))
posterior = posterior.update(best.item, response)
asked_instances.add(best.instance_id)
history.append((best.instance_id, best.item, response))
else:
# Simple mode: one instance per item — never re-ask
asked_items: set[str] = set()
for _ in range(max_questions):
if is_converged(posterior, threshold=threshold):
break
if len(asked_items) >= structure.n_items:
break # all items exhausted, stop
best = select_item_eig(posterior, exclude=asked_items)
response = bool(ask_fn(best.item))
posterior = posterior.update(best.item, response)
asked_items.add(best.item)
history.append((best.item, best.item, response))
state, prob = posterior.most_likely_state
return {
"state": set(state),
"probability": prob,
"mastery": posterior.marginal_mastery(),
"inner_fringe": set(structure.inner_fringe(state)),
"outer_fringe": set(structure.outer_fringe(state)),
"questions_asked": len(history),
"history": history,
}
[docs]
def adaptive_assess_from_fit(
structure: KnowledgeStructure,
fit: dict,
ask_fn: Callable[[str], bool],
*,
instances: dict[str, list[str]] | None = None,
threshold: float = 0.85,
max_questions: int = 25,
) -> dict:
"""Run adaptive assessment using parameters from :func:`fit_blim`.
Equivalent to ``adaptive_assess(structure, ask_fn, beta=..., eta=..., prior=...)``
with values taken from the fit result.
Parameters
----------
structure : KnowledgeStructure
The knowledge structure (same used for fitting).
fit : dict
Output of :func:`fit_blim`.
ask_fn : callable
Question function (same as :func:`adaptive_assess`).
instances, threshold, max_questions
Passed through to :func:`adaptive_assess`.
"""
_validate_fit(fit)
return adaptive_assess(
structure,
ask_fn,
instances=instances,
beta=fit["beta"],
eta=fit["eta"],
prior=fit["pi"],
threshold=threshold,
max_questions=max_questions,
)
# ------------------------------------------------------------------
# Estimation (simplified)
# ------------------------------------------------------------------
[docs]
def fit_blim(
structure: KnowledgeStructure,
items: list[str],
responses: np.ndarray | list[list[int]],
counts: np.ndarray | list[int] | None = None,
) -> dict:
"""Estimate BLIM parameters from response data via EM.
Parameters
----------
structure : KnowledgeStructure
The knowledge structure.
items : list[str]
Item names (column labels for the response matrix).
responses : array-like
Binary response matrix, shape (n_patterns, n_items).
counts : array-like or None
Optional frequency of each pattern.
Returns
-------
dict
Keys: 'beta' (dict item→float), 'eta' (dict item→float),
'pi' (dict frozenset→float, state prior probabilities),
'states' (list of frozensets, ordered as in pi),
'log_likelihood' (float), 'converged' (bool),
'n_iterations' (int), 'gof' (dict with G2, df, p_value,
npar, AIC, BIC).
Examples
--------
>>> result = fit_blim(structure, ["a","b","c"], [[1,1,1],[1,1,0],[1,0,0],[0,0,0]])
>>> result["converged"]
True
"""
patterns = np.asarray(responses, dtype=int)
c = np.asarray(counts, dtype=float) if counts is not None else None
data = ResponseMatrix(items=items, patterns=patterns, counts=c)
est = estimate_blim(structure, data)
states = sorted(structure.states, key=lambda s: (len(s), sorted(s)))
return {
"beta": dict(zip(est.items, est.beta.tolist(), strict=True)),
"eta": dict(zip(est.items, est.eta.tolist(), strict=True)),
"pi": dict(zip(states, est.pi.tolist(), strict=True)),
"states": states,
"log_likelihood": est.log_likelihood,
"converged": est.converged,
"n_iterations": est.n_iterations,
"gof": {
"G2": est.gof.G2,
"df": est.gof.df,
"p_value": est.gof.p_value,
"npar": est.gof.npar,
"AIC": est.gof.AIC,
"BIC": est.gof.BIC,
"BIC_N": est.gof.BIC_N,
},
}