"""TRAMP: The RDF API in (Monty) Python An experimental RDF API which may or may not be like RDF 1.0.""" import urllib, string __version__ = 1003635899 __author__ = 'Aaron Swartz ' __license__ = 'GNU General Public License Version 2' class Namespace: """A class so namespaced URIs can be abbreviated (like dc.subject).""" def __init__(self, prefix): self.prefix = prefix def __getattr__(self,name): return self.prefix + name class Node: nodeList = {} def __init__(self, uri=None): if uri is not None: self.uri = uri def literalToURI(value): """Converts a literal into a data: URI.""" return 'data:,' + str(urllib.quote(value)) def URIToLiteral(uri): """Converts a data: URI into a literal.""" if uri[0:6] != "data:,": BadURI = "Only deals with URIs of the form: data:,somedata" raise BadURI return urllib.unquote(uri[6:]) def node(value=None): """Interns a node. Use this instead of the Node() creator.""" if isinstance(value, Node): return value # It's already a Node. if value == None: return Node() # Return a blank Node. if type(value) is type(u''): # It's a Literal! value = literalToURI(value) # Convert it to a URI. if type(value) != type(''): # It's not a URI yet... something is wrong. InputError = "Must be a u'literal' 'uri' or Node()" raise InputError, value if not Node.nodeList.has_key(value): # Node hasn't been interned... Node.nodeList[value] = Node(value) # ... so do it now. return Node.nodeList[value] # Now return the node class Triple (Node): """An RDF triple, the basic unit of association.""" def __init__(self, store, s, p, o): # Intern terms if not already nodes: self.subject = node(s) self.predicate = node(p) self.object = node(o) def __eq__(self, other): if not isinstance(other, Triple): return self is other if (self.subject == other.subject and self.predicate == other.predicate and self.object == other.object): return 1 else: return 0 class Store: """A generic RDF store.""" def __init__(self): self.tripleList = [] # triples in this store def triple(self, s, p, o): result = Triple(self, s, p, o) self.tripleList.append(result) return result def query(self, s, p, o): """Searches thru the store for triples that match the spo signature. None matches anything. TODO: implement callbacks """ # Intern terms: if s is not None: s = node(s) if p is not None: p = node(p) if o is not None: o = node(o) results = [] for t in self.tripleList: if ((s is None or t.subject is s) and (p is None or t.predicate is p) and (o is None or t.object is o) ): results.append(t) return results