Would be really cool if rezolva could have a starter (non-performant) neural network example built out of the stdlib. Here's an example:
import math
import random
from rezolva import Entity, EntityResolver, SimpleBlocker, SimplePreprocessor
from rezolva.preprocessors.preprocessing_functions import (lowercase,
remove_punctuation,
strip_whitespace)
def sigmoid(x):
return 1 / (1 + math.exp(-x))
def sigmoid_derivative(x):
return x * (1 - x)
class SimpleNeuralNetwork:
def __init__(self, input_size, hidden_size=4, learning_rate=0.1):
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = 1
self.learning_rate = learning_rate
self.hidden_weights = [[random.random() for _ in range(input_size)] for _ in range(hidden_size)]
self.output_weights = [random.random() for _ in range(hidden_size)]
self.hidden_bias = [random.random() for _ in range(hidden_size)]
self.output_bias = random.random()
def forward(self, inputs):
hidden = [sigmoid(sum(i*w for i, w in zip(inputs, weights)) + b)
for weights, b in zip(self.hidden_weights, self.hidden_bias)]
output = sigmoid(sum(h*w for h, w in zip(hidden, self.output_weights)) + self.output_bias)
return output, hidden
def train(self, inputs, target):
output, hidden = self.forward(inputs)
output_error = target - output
output_delta = output_error * sigmoid_derivative(output)
hidden_errors = [w * output_delta for w in self.output_weights]
hidden_deltas = [error * sigmoid_derivative(h) for error, h in zip(hidden_errors, hidden)]
for i in range(self.hidden_size):
self.output_weights[i] += self.learning_rate * output_delta * hidden[i]
self.output_bias += self.learning_rate * output_delta
for i in range(self.hidden_size):
for j in range(self.input_size):
self.hidden_weights[i][j] += self.learning_rate * hidden_deltas[i] * inputs[j]
self.hidden_bias[i] += self.learning_rate * hidden_deltas[i]
class SimpleNNModelBuilder:
def __init__(self, attributes):
self.attributes = attributes
self.vocabulary = set()
def build(self, entities):
for entity in entities:
for attr in self.attributes:
self.vocabulary.update(entity.attributes[attr].split())
self.vocab_to_index = {word: i for i, word in enumerate(self.vocabulary)}
vectors = {}
for entity in entities:
vector = [0] * len(self.vocabulary)
for attr in self.attributes:
for word in entity.attributes[attr].split():
if word in self.vocab_to_index:
vector[self.vocab_to_index[word]] += 1
vectors[entity.id] = vector
return {"vectors": vectors, "entities": {e.id: e for e in entities}}
def transform(self, entity):
vector = [0] * len(self.vocabulary)
for attr in self.attributes:
for word in entity.attributes[attr].split():
if word in self.vocab_to_index:
vector[self.vocab_to_index[word]] += 1
return vector
class SimpleNNMatcher:
def __init__(self, input_size, hidden_size=4, threshold=0.5):
self.nn = SimpleNeuralNetwork(input_size * 2, hidden_size)
self.threshold = threshold
def train(self, model):
entities = list(model["entities"].values())
for _ in range(1000): # Training iterations
for i, entity1 in enumerate(entities):
for j, entity2 in enumerate(entities):
inputs = model["vectors"][entity1.id] + model["vectors"][entity2.id]
target = 1.0 if i == j else 0.0
self.nn.train(inputs, target)
def match(self, query_entity, candidate_entities):
query_vector = query_entity
matches = []
for candidate in candidate_entities:
inputs = query_vector + candidate
score, _ = self.nn.forward(inputs)
if score >= self.threshold:
matches.append((candidate, score))
return sorted(matches, key=lambda x: x[1], reverse=True)
# Set up components
preprocessor = SimplePreprocessor([lowercase, strip_whitespace, remove_punctuation])
model_builder = SimpleNNModelBuilder(["title", "description", "brand"])
matcher = SimpleNNMatcher(input_size=0) # We'll set the input size after building the model
blocker = SimpleBlocker(lambda e: e.attributes["brand"].lower())
# Create resolver
resolver = EntityResolver(preprocessor, model_builder, matcher, blocker)
# Training data
training_entities = [
Entity("1", {"title": "iPhone 12", "description": "Latest Apple smartphone with A14 Bionic chip", "brand": "Apple"}),
Entity("2", {"title": "iPhone 12 Pro", "description": "Premium Apple smartphone with LiDAR scanner", "brand": "Apple"}),
Entity("3", {"title": "Galaxy S21", "description": "Samsung's flagship phone with Exynos 2100", "brand": "Samsung"}),
Entity("4", {"title": "Pixel 5", "description": "Google's latest smartphone with 5G support", "brand": "Google"}),
]
# Train the resolver
resolver.train(training_entities)
# Set the input size for the matcher after building the model
matcher.nn.input_size = len(model_builder.vocabulary) * 2
matcher.train(resolver.model)
# New entities to resolve
new_entities = [
Entity("5", {"title": "iPhone 12 Pro Max", "description": "Apple's largest premium smartphone with A14 chip", "brand": "Apple"}),
Entity("6", {"title": "Galaxy S21 Ultra", "description": "Samsung's premium flagship with Exynos 2100 and S Pen support", "brand": "Samsung"}),
]
# Resolve entities
results = resolver.resolve(new_entities)
# Print results
for entity, matches in results:
print(f"Top matches for {entity.id} - {entity.attributes['title']}:")
for match, score in matches[:2]: # Limiting to top 2 matches
print(f" Match: {match.id} - {match.attributes['title']} (Score: {score:.2f})")
print()
Would be really cool if rezolva could have a starter (non-performant) neural network example built out of the stdlib. Here's an example:
Key points about this implementation: