Skip to content

Feature: Add DjangoInstanceField to allow reuse queryset on DjangoObjectType #1133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 10 additions & 20 deletions graphene_django/converter.py
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
from graphql.pyutils import register_description

from .compat import ArrayField, HStoreField, JSONField, PGJSONField, RangeField
from .fields import DjangoListField, DjangoConnectionField
from .fields import DjangoListField, DjangoConnectionField, DjangoInstanceField
from .settings import graphene_settings
from .utils.str_converters import to_const

@@ -230,23 +230,6 @@ def convert_time_to_string(field, registry=None):
)


@convert_django_field.register(models.OneToOneRel)
def convert_onetoone_field_to_djangomodel(field, registry=None):
model = field.related_model

def dynamic_type():
_type = registry.get_type_for_model(model)
if not _type:
return

# We do this for a bug in Django 1.8, where null attr
# is not available in the OneToOneRel instance
null = getattr(field, "null", True)
return Field(_type, required=not null)

return Dynamic(dynamic_type)


@convert_django_field.register(models.ManyToManyField)
@convert_django_field.register(models.ManyToManyRel)
@convert_django_field.register(models.ManyToOneRel)
@@ -289,6 +272,7 @@ def dynamic_type():

@convert_django_field.register(models.OneToOneField)
@convert_django_field.register(models.ForeignKey)
@convert_django_field.register(models.OneToOneRel)
def convert_field_to_djangomodel(field, registry=None):
model = field.related_model

@@ -297,10 +281,16 @@ def dynamic_type():
if not _type:
return

return Field(
if isinstance(field, models.OneToOneRel):
description = get_django_field_description(field.field)
else:
description = get_django_field_description(field)

return DjangoInstanceField(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also lookup the primary field of the model and pass the result in as unique_fields? I could see a model having a different primary key field name and I think that would cause this to break

_type,
description=get_django_field_description(field),
description=description,
required=not field.null,
is_foreign_key=True,
)

return Dynamic(dynamic_type)
102 changes: 102 additions & 0 deletions graphene_django/fields.py
Original file line number Diff line number Diff line change
@@ -246,3 +246,105 @@ def wrap_resolve(self, parent_resolver):

def get_queryset_resolver(self):
return self.resolve_queryset


class DjangoInstanceField(Field):
def __init__(self, _type, *args, **kwargs):
from .types import DjangoObjectType

self.unique_fields = kwargs.pop("unique_fields", ("id",))
self.is_foreign_key = kwargs.pop("is_foreign_key", False)

assert not isinstance(
self.unique_fields, list
), "unique_fields argument needs to be a list"

if isinstance(_type, NonNull):
_type = _type.of_type

super(DjangoInstanceField, self).__init__(_type, *args, **kwargs)

assert issubclass(
self._underlying_type, DjangoObjectType
), "DjangoInstanceField only accepts DjangoObjectType types"

@property
def _underlying_type(self):
_type = self._type
while hasattr(_type, "of_type"):
_type = _type.of_type
return _type

@property
def model(self):
return self._underlying_type._meta.model

def get_manager(self):
return self.model._default_manager

@staticmethod
def instance_resolver(
django_object_type,
unique_fields,
resolver,
default_manager,
is_foreign_key,
root,
info,
**args
):

queryset = None
unique_filter = {}
if is_foreign_key:
pk_name = "{}_id".format(info.field_name)
pk = None
if hasattr(root, pk_name):
pk = getattr(root, pk_name)
else:
fk_obj = getattr(root, info.field_name)
if fk_obj:
pk = fk_obj.pk

if pk is not None:
unique_filter["pk"] = pk
unique_fields = ()
else:
return None
else:
queryset = maybe_queryset(resolver(root, info, **args))

if queryset is None:
queryset = maybe_queryset(default_manager)

if isinstance(queryset, QuerySet):
# Pass queryset to the DjangoObjectType get_queryset method
queryset = maybe_queryset(django_object_type.get_queryset(queryset, info))
for field in unique_fields:
key = field if field != "id" else "pk"
value = args.get(field)

if value is not None:
unique_filter[key] = value

assert len(unique_filter.keys()) > 0, (
"You need to model unique arguments. The declared unique fields are: {}."
).format(", ".join(unique_fields))

try:
return queryset.get(**unique_filter)
except django_object_type._meta.model.DoesNotExist:
return None

return queryset

def wrap_resolve(self, parent_resolver):
resolver = super(DjangoInstanceField, self).wrap_resolve(parent_resolver)
return partial(
self.instance_resolver,
self._underlying_type,
self.unique_fields,
resolver,
self.get_manager(),
self.is_foreign_key,
)
7 changes: 5 additions & 2 deletions graphene_django/tests/models.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,9 @@ class Person(models.Model):
class Pet(models.Model):
name = models.CharField(max_length=30)
age = models.PositiveIntegerField()
owner = models.ForeignKey(
"Person", on_delete=models.CASCADE, null=True, blank=True, related_name="pets"
)


class FilmDetails(models.Model):
@@ -91,8 +94,8 @@ class Meta:

class Article(models.Model):
headline = models.CharField(max_length=100)
pub_date = models.DateField()
pub_date_time = models.DateTimeField()
pub_date = models.DateField(auto_now_add=True)
pub_date_time = models.DateTimeField(auto_now_add=True)
reporter = models.ForeignKey(
Reporter, on_delete=models.CASCADE, related_name="articles"
)
145 changes: 144 additions & 1 deletion graphene_django/tests/test_fields.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@

from graphene import List, NonNull, ObjectType, Schema, String

from ..fields import DjangoListField
from ..fields import DjangoListField, DjangoInstanceField
from ..types import DjangoObjectType
from .models import Article as ArticleModel
from .models import Reporter as ReporterModel
@@ -302,6 +302,149 @@ def resolve_reporters(_, info):
assert not result.errors
assert result.data == {"reporters": [{"firstName": "Tara"}]}

def test_get_queryset_filter_instance(self):
"""Resolving prefilter list to get instance"""

class Reporter(DjangoObjectType):
class Meta:
model = ReporterModel
fields = ("first_name", "articles")

@classmethod
def get_queryset(cls, queryset, info):
# Only get reporters with at least 1 article
return queryset.annotate(article_count=Count("articles")).filter(
article_count__gt=0
)

class Query(ObjectType):
reporter = DjangoInstanceField(
Reporter,
unique_fields=("first_name",),
first_name=String(required=True),
)

schema = Schema(query=Query)

query = """
query {
reporter(firstName: "Tara") {
firstName
}
}
"""

r1 = ReporterModel.objects.create(first_name="Tara", last_name="West")
ReporterModel.objects.create(first_name="Debra", last_name="Payne")

ArticleModel.objects.create(
headline="Amazing news",
reporter=r1,
pub_date=datetime.date.today(),
pub_date_time=datetime.datetime.now(),
editor=r1,
)

result = schema.execute(query)

assert not result.errors
assert result.data == {"reporter": {"firstName": "Tara"}}

def test_get_queryset_filter_instance_null(self):
"""Resolving prefilter list with no results"""

class Reporter(DjangoObjectType):
class Meta:
model = ReporterModel
fields = ("first_name", "articles")

@classmethod
def get_queryset(cls, queryset, info):
# Only get reporters with at least 1 article
return queryset.annotate(article_count=Count("articles")).filter(
article_count__gt=0
)

class Query(ObjectType):
reporter = DjangoInstanceField(
Reporter,
unique_fields=("first_name",),
first_name=String(required=True),
)

schema = Schema(query=Query)

query = """
query {
reporter(firstName: "Debra") {
firstName
}
}
"""

r1 = ReporterModel.objects.create(first_name="Tara", last_name="West")
ReporterModel.objects.create(first_name="Debra", last_name="Payne")

ArticleModel.objects.create(
headline="Amazing news",
reporter=r1,
pub_date=datetime.date.today(),
pub_date_time=datetime.datetime.now(),
editor=r1,
)

result = schema.execute(query)

assert not result.errors
assert result.data == {"reporter": None}

def test_get_queryset_filter_instance_plain(self):
"""Resolving a plain object should work (and not call get_queryset)"""

class Reporter(DjangoObjectType):
class Meta:
model = ReporterModel
fields = ("first_name", "articles")

@classmethod
def get_queryset(cls, queryset, info):
# Only get reporters with at least 1 article
return queryset.annotate(article_count=Count("articles")).filter(
article_count__gt=0
)

class Query(ObjectType):
reporter = DjangoInstanceField(Reporter, first_name=String(required=True))

def resolve_reporter(_, info, first_name):
return ReporterModel.objects.get(first_name=first_name)

schema = Schema(query=Query)

query = """
query {
reporter(firstName: "Debra") {
firstName
}
}
"""

r1 = ReporterModel.objects.create(first_name="Tara", last_name="West")
ReporterModel.objects.create(first_name="Debra", last_name="Payne")

ArticleModel.objects.create(
headline="Amazing news",
reporter=r1,
pub_date=datetime.date.today(),
pub_date_time=datetime.datetime.now(),
editor=r1,
)

result = schema.execute(query)

assert not result.errors
assert result.data == {"reporter": {"firstName": "Debra"}}

def test_resolve_list(self):
"""Resolving a plain list should work (and not call get_queryset)"""

345 changes: 345 additions & 0 deletions graphene_django/tests/test_get_queryset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
import pytest

import graphene
from graphene.relay import Node

from graphql_relay import to_global_id

from ..fields import DjangoConnectionField, DjangoInstanceField
from ..types import DjangoObjectType

from .models import Article, Reporter


class TestShouldCallGetQuerySetOnForeignKey:
"""
Check that the get_queryset method is called in both forward and reversed direction
of a foreignkey on types.
(see issue #1111)
"""

@pytest.fixture(autouse=True)
def setup_schema(self):
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter

@classmethod
def get_queryset(cls, queryset, info):
if info.context and info.context.get("admin"):
return queryset
raise Exception("Not authorized to access reporters.")

class ArticleType(DjangoObjectType):
class Meta:
model = Article

@classmethod
def get_queryset(cls, queryset, info):
return queryset.exclude(headline__startswith="Draft")

class Query(graphene.ObjectType):
reporter = DjangoInstanceField(ReporterType, id=graphene.ID(required=True))
article = DjangoInstanceField(ArticleType, id=graphene.ID(required=True))

self.schema = graphene.Schema(query=Query)

self.reporter = Reporter.objects.create(
first_name="Jane", last_name="Doe", pk=2
)

self.articles = [
Article.objects.create(
headline="A fantastic article",
reporter=self.reporter,
editor=self.reporter,
),
Article.objects.create(
headline="Draft: My next best seller",
reporter=self.reporter,
editor=self.reporter,
),
]

def test_get_queryset_called_on_field(self):
# If a user tries to access an article it is fine as long as it's not a draft one
query = """
query getArticle($id: ID!) {
article(id: $id) {
headline
}
}
"""
# Non-draft
result = self.schema.execute(query, variables={"id": self.articles[0].id})
assert not result.errors
assert result.data["article"] == {
"headline": "A fantastic article",
}
# Draft
result = self.schema.execute(query, variables={"id": self.articles[1].id})
assert not result.errors
assert result.data["article"] is None

# If a non admin user tries to access a reporter they should get our authorization error
query = """
query getReporter($id: ID!) {
reporter(id: $id) {
firstName
}
}
"""

result = self.schema.execute(query, variables={"id": self.reporter.id})
assert len(result.errors) == 1
assert result.errors[0].message == "Not authorized to access reporters."

# An admin user should be able to get reporters
query = """
query getReporter($id: ID!) {
reporter(id: $id) {
firstName
}
}
"""

result = self.schema.execute(
query, variables={"id": self.reporter.id}, context_value={"admin": True},
)
assert not result.errors
assert result.data == {"reporter": {"firstName": "Jane"}}

def test_get_queryset_called_on_foreignkey(self):
# If a user tries to access a reporter through an article they should get our authorization error
query = """
query getArticle($id: ID!) {
article(id: $id) {
headline
reporter {
firstName
}
}
}
"""

result = self.schema.execute(query, variables={"id": self.articles[0].id})
assert len(result.errors) == 1
assert result.errors[0].message == "Not authorized to access reporters."

# An admin user should be able to get reporters through an article
query = """
query getArticle($id: ID!) {
article(id: $id) {
headline
reporter {
firstName
}
}
}
"""

result = self.schema.execute(
query, variables={"id": self.articles[0].id}, context_value={"admin": True},
)
assert not result.errors
assert result.data["article"] == {
"headline": "A fantastic article",
"reporter": {"firstName": "Jane"},
}

# An admin user should not be able to access draft article through a reporter
query = """
query getReporter($id: ID!) {
reporter(id: $id) {
firstName
articles {
headline
}
}
}
"""

result = self.schema.execute(
query, variables={"id": self.reporter.id}, context_value={"admin": True},
)
assert not result.errors
assert result.data["reporter"] == {
"firstName": "Jane",
"articles": [{"headline": "A fantastic article"}],
}


class TestShouldCallGetQuerySetOnForeignKeyNode:
"""
Check that the get_queryset method is called in both forward and reversed direction
of a foreignkey on types using a node interface.
(see issue #1111)
"""

@pytest.fixture(autouse=True)
def setup_schema(self):
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node,)

@classmethod
def get_queryset(cls, queryset, info):
if info.context and info.context.get("admin"):
return queryset
raise Exception("Not authorized to access reporters.")

class ArticleType(DjangoObjectType):
class Meta:
model = Article
interfaces = (Node,)

@classmethod
def get_queryset(cls, queryset, info):
return queryset.exclude(headline__startswith="Draft")

class Query(graphene.ObjectType):
reporter = Node.Field(ReporterType)
article = Node.Field(ArticleType)

self.schema = graphene.Schema(query=Query)

self.reporter = Reporter.objects.create(first_name="Jane", last_name="Doe")

self.articles = [
Article.objects.create(
headline="A fantastic article",
reporter=self.reporter,
editor=self.reporter,
),
Article.objects.create(
headline="Draft: My next best seller",
reporter=self.reporter,
editor=self.reporter,
),
]

def test_get_queryset_called_on_node(self):
# If a user tries to access an article it is fine as long as it's not a draft one
query = """
query getArticle($id: ID!) {
article(id: $id) {
headline
}
}
"""
# Non-draft
result = self.schema.execute(
query, variables={"id": to_global_id("ArticleType", self.articles[0].id)}
)
assert not result.errors
assert result.data["article"] == {
"headline": "A fantastic article",
}
# Draft
result = self.schema.execute(
query, variables={"id": to_global_id("ArticleType", self.articles[1].id)}
)
assert not result.errors
assert result.data["article"] is None

# If a non admin user tries to access a reporter they should get our authorization error
query = """
query getReporter($id: ID!) {
reporter(id: $id) {
firstName
}
}
"""

result = self.schema.execute(
query, variables={"id": to_global_id("ReporterType", self.reporter.id)}
)
assert len(result.errors) == 1
assert result.errors[0].message == "Not authorized to access reporters."

# An admin user should be able to get reporters
query = """
query getReporter($id: ID!) {
reporter(id: $id) {
firstName
}
}
"""

result = self.schema.execute(
query,
variables={"id": to_global_id("ReporterType", self.reporter.id)},
context_value={"admin": True},
)
assert not result.errors
assert result.data == {"reporter": {"firstName": "Jane"}}

def test_get_queryset_called_on_foreignkey(self):
# If a user tries to access a reporter through an article they should get our authorization error
query = """
query getArticle($id: ID!) {
article(id: $id) {
headline
reporter {
firstName
}
}
}
"""

result = self.schema.execute(
query, variables={"id": to_global_id("ArticleType", self.articles[0].id)}
)
assert len(result.errors) == 1
assert result.errors[0].message == "Not authorized to access reporters."

# An admin user should be able to get reporters through an article
query = """
query getArticle($id: ID!) {
article(id: $id) {
headline
reporter {
firstName
}
}
}
"""

result = self.schema.execute(
query,
variables={"id": to_global_id("ArticleType", self.articles[0].id)},
context_value={"admin": True},
)
assert not result.errors
assert result.data["article"] == {
"headline": "A fantastic article",
"reporter": {"firstName": "Jane"},
}

# An admin user should not be able to access draft article through a reporter
query = """
query getReporter($id: ID!) {
reporter(id: $id) {
firstName
articles {
edges {
node {
headline
}
}
}
}
}
"""

result = self.schema.execute(
query,
variables={"id": to_global_id("ReporterType", self.reporter.id)},
context_value={"admin": True},
)
assert not result.errors
assert result.data["reporter"] == {
"firstName": "Jane",
"articles": {"edges": [{"node": {"headline": "A fantastic article"}}]},
}
77 changes: 74 additions & 3 deletions graphene_django/tests/test_query.py
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
from ..fields import DjangoConnectionField
from ..types import DjangoObjectType
from ..utils import DJANGO_FILTER_INSTALLED
from .models import Article, CNNReporter, Film, FilmDetails, Reporter
from .models import Article, CNNReporter, Film, FilmDetails, Person, Pet, Reporter


def test_should_query_only_fields():
@@ -251,8 +251,8 @@ def resolve_reporter(self, info):


def test_should_query_onetoone_fields():
film = Film(id=1)
film_details = FilmDetails(id=1, film=film)
film = Film.objects.create(id=1)
film_details = FilmDetails.objects.create(id=1, film=film)

class FilmNode(DjangoObjectType):
class Meta:
@@ -1251,6 +1251,7 @@ class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node,)
fields = "__all__"

class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
@@ -1455,6 +1456,7 @@ class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node,)
fields = "__all__"

class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
@@ -1494,6 +1496,7 @@ class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node,)
fields = "__all__"

class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
@@ -1527,6 +1530,7 @@ class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node,)
fields = "__all__"

class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
@@ -1561,6 +1565,7 @@ class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node,)
fields = "__all__"

class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
@@ -1586,3 +1591,69 @@ class Query(graphene.ObjectType):
"allReporters": {"edges": [{"node": {"firstName": "Jane", "lastName": "Roe"}},]}
}
assert result.data == expected


def test_should_query_nullable_foreign_key():
class PetType(DjangoObjectType):
class Meta:
model = Pet
fields = "__all__"

class PersonType(DjangoObjectType):
class Meta:
model = Person
fields = "__all__"

class Query(graphene.ObjectType):
pet = graphene.Field(PetType, name=graphene.String(required=True))
person = graphene.Field(PersonType, name=graphene.String(required=True))

def resolve_pet(self, info, name):
return Pet.objects.filter(name=name).first()

def resolve_person(self, info, name):
return Person.objects.filter(name=name).first()

schema = graphene.Schema(query=Query)

person = Person.objects.create(name="Jane")
pets = [
Pet.objects.create(name="Stray dog", age=1),
Pet.objects.create(name="Jane's dog", owner=person, age=1),
]

query_pet = """
query getPet($name: String!) {
pet(name: $name) {
owner {
name
}
}
}
"""
result = schema.execute(query_pet, variables={"name": "Stray dog"})
assert not result.errors
assert result.data["pet"] == {
"owner": None,
}

result = schema.execute(query_pet, variables={"name": "Jane's dog"})
assert not result.errors
assert result.data["pet"] == {
"owner": {"name": "Jane"},
}

query_owner = """
query getOwner($name: String!) {
person(name: $name) {
pets {
name
}
}
}
"""
result = schema.execute(query_owner, variables={"name": "Jane"})
assert not result.errors
assert result.data["person"] == {
"pets": [{"name": "Jane's dog"}],
}