Source code for goblin.models.query

from __future__ import unicode_literals
import logging

from goblin._compat import float_types, print_
from goblin import connection
from goblin.exceptions import GoblinQueryError
from .element import Element
from goblin.constants import (EQUAL, NOT_EQUAL, GREATER_THAN,
                              GREATER_THAN_EQUAL, LESS_THAN,
                              LESS_THAN_EQUAL, WITHIN, INSIDE,
                              OUTSIDE, BETWEEN)
import copy
from goblin.properties.base import GraphProperty

logger = logging.getLogger(__name__)


[docs]class V(object): """ All query operations return a new query object, which currently deviates from blueprints. The blueprints query object modifies and returns the same object This method seems more flexible, and consistent w/ the rest of Gremlin. """ _limit = None def __init__(self, vertex): self._vertex = vertex self._steps = [] self._bindings = {}
[docs] def count(self, *args, **kwargs): """ :returns: number of matching vertices :rtype: int """ pass
[docs] def has(self, key, value, compare=EQUAL): """ :param key: key to lookup :type key: str | goblin.properties.GraphProperty :param value: value to compare :type value: str, float, int :param compare: comparison keyword :type compare: str :rtype: Query """ q = copy.copy(self) if issubclass(type(key), property): msg = "Use %s.get_property_by_name" % (self.__class__.__name__) logger.error(msg) raise GoblinQueryError(msg) binding = self._get_binding(value) if compare in [INSIDE, OUTSIDE, BETWEEN, WITHIN]: step = "has('{}', {}(*{}))".format(key, compare, binding) else: step = "has('{}', {}({}))".format(key, compare, binding) q._steps.append(step) return q
[docs] def has_label(self, *labels): labels = self._get_labels(labels) return self._unpack_step("hasLabel", labels)
[docs] def has_id(self, *ids): return self._unpack_step("hasId", ids)
# def has_key(self, *keys): # return self._unpack_step("hasKey", keys) # def has_value(self, *values): # return self._unpack_step("hasValue", values)
[docs] def out_step(self, *labels): labels = self._get_labels(labels) return self._unpack_step("out", labels)
[docs] def in_step(self, *labels): labels = self._get_labels(labels) return self._unpack_step("in", labels)
[docs] def both(self, *labels): labels = self._get_labels(labels) return self._unpack_step("both", labels)
[docs] def out_e(self, *labels): labels = self._get_labels(labels) return self._unpack_step("outE", labels)
[docs] def in_e(self, *labels): labels = self._get_labels(labels) return self._unpack_step("inE", labels)
[docs] def both_e(self, *labels): labels = self._get_labels(labels) return self._unpack_step("bothE", labels)
[docs] def out_v(self): return self._simple_step("outV")
[docs] def in_v(self): return self._simple_step("inV")
[docs] def both_v(self): return self._simple_step("bothV")
[docs] def other_v(self): return self._simple_step("otherV")
def _get_labels(self, labels): new_labels = [] for label in labels: try: label = label.get_label() new_labels.append(label) except: new_labels.append(label) return new_labels def _simple_step(self, func): q = copy.copy(self) step = '{}()'.format(func) q._steps.append(step) return q def _unpack_step(self, func, vals): q = copy.copy(self) binding = self._get_binding(vals) step = '{}(*{})'.format(func, binding) q._steps.append(step) return q def _get_binding(self, val): binding = 'b{}'.format(len(self._bindings)) self._bindings[binding] = val return binding
[docs] def limit(self, limit): pass
[docs] def get(self, deserialize=True, *args, **kwargs): script = "g.V(vid).{}".format(self._get()) self._bindings.update({"vid": self._vertex._id}) def process_results(results): if not results: results = [] if deserialize: results = [Element.deserialize(r) for r in results] return results future_results = connection.execute_query( script, bindings=self._bindings, handler=process_results, **kwargs) return future_results
def _get(self): return '.'.join(self._steps)