Skip to content

Implementation of upsert, aupsert, get_by_ids, aget_by_ids #83

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 113 additions & 53 deletions langchain_postgres/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
Callable,
Dict,
Generator,
Iterable,
List,
Optional,
Sequence,
Expand All @@ -27,6 +26,7 @@
import sqlalchemy
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.indexing import UpsertResponse
from langchain_core.utils import get_from_dict_or_env
from langchain_core.vectorstores import VectorStore
from sqlalchemy import SQLColumnExpression, cast, create_engine, delete, func, select
Expand Down Expand Up @@ -714,7 +714,7 @@ async def __afrom(

def add_embeddings(
self,
texts: Iterable[str],
texts: Sequence[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
Expand Down Expand Up @@ -770,7 +770,7 @@ def add_embeddings(

async def aadd_embeddings(
self,
texts: Iterable[str],
texts: Sequence[str],
embeddings: List[List[float]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
Expand Down Expand Up @@ -824,56 +824,6 @@ async def aadd_embeddings(

return ids

def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.

Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids for the texts.
If not provided, will generate a new id for each text.
kwargs: vectorstore specific parameters

Returns:
List of ids from adding the texts into the vectorstore.
"""
assert not self._async_engine, "This method must be called without async_mode"
embeddings = self.embedding_function.embed_documents(list(texts))
return self.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)

async def aadd_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.

Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids for the texts.
If not provided, will generate a new id for each text.
kwargs: vectorstore specific parameters

Returns:
List of ids from adding the texts into the vectorstore.
"""
await self.__apost_init__() # Lazy async init
embeddings = await self.embedding_function.aembed_documents(list(texts))
return await self.aadd_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)

def similarity_search(
self,
query: str,
Expand Down Expand Up @@ -1014,6 +964,7 @@ def _results_to_docs_and_scores(self, results: Any) -> List[Tuple[Document, floa
docs = [
(
Document(
id=str(result.EmbeddingStore.id),
page_content=result.EmbeddingStore.document,
metadata=result.EmbeddingStore.cmetadata,
),
Expand Down Expand Up @@ -2178,3 +2129,112 @@ async def _make_async_session(self) -> AsyncGenerator[AsyncSession, None]:
)
async with self.session_maker() as session:
yield typing_cast(AsyncSession, session)

def upsert(self, items: Sequence[Document], /, **kwargs: Any) -> UpsertResponse:
"""Upsert documents into the vectorstore.

Args:
items: Sequence of documents to upsert.
kwargs: vectorstore specific parameters

Returns:
UpsertResponse
"""
if self._async_engine:
raise AssertionError("This method must be called in sync mode.")
texts = [item.page_content for item in items]
metadatas = [item.metadata for item in items]
ids = [item.id if item.id is not None else str(uuid.uuid4()) for item in items]
embeddings = self.embedding_function.embed_documents(list(texts))
added_ids = self.add_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return {
"succeeded": added_ids,
"failed": [
item.id
for item in items
if item.id is not None and item.id not in added_ids
],
}

async def aupsert(
self, items: Sequence[Document], /, **kwargs: Any
) -> UpsertResponse:
"""Upsert documents into the vectorstore.

Args:
items: Sequence of documents to upsert.
kwargs: vectorstore specific parameters

Returns:
UpsertResponse
"""
if not self._async_engine:
raise AssertionError("This method must be called with async_mode")
texts = [item.page_content for item in items]
metadatas = [item.metadata for item in items]
ids = [item.id if item.id is not None else str(uuid.uuid4()) for item in items]
embeddings = await self.embedding_function.aembed_documents(list(texts))
added_ids = await self.aadd_embeddings(
texts=texts, embeddings=embeddings, metadatas=metadatas, ids=ids, **kwargs
)
return {
"succeeded": added_ids,
"failed": [
item.id
for item in items
if item.id is not None and item.id not in added_ids
],
}

def get_by_ids(self, ids: Sequence[str], /) -> List[Document]:
"""Get documents by ids."""
documents = []
with self._make_sync_session() as session:
collection = self.get_collection(session)
filter_by = [self.EmbeddingStore.collection_id == collection.uuid]
stmt = (
select(
self.EmbeddingStore,
)
.where(self.EmbeddingStore.id.in_(ids))
.filter(*filter_by)
)

for result in session.execute(stmt).scalars().all():
documents.append(
Document(
id=result.id,
page_content=result.document,
metadata=result.cmetadata,
)
)
return documents

async def aget_by_ids(self, ids: Sequence[str], /) -> List[Document]:
"""Get documents by ids."""
documents = []
async with self._make_async_session() as session:
collection = await self.aget_collection(session)
filter_by = [self.EmbeddingStore.collection_id == collection.uuid]

stmt = (
select(
self.EmbeddingStore,
)
.where(self.EmbeddingStore.id.in_(ids))
.filter(*filter_by)
)

results: Sequence[Any] = (await session.execute(stmt)).scalars().all()

for result in results:
documents.append(
Document(
id=str(result.id),
page_content=result.document,
metadata=result.cmetadata,
)
)
return documents
Loading
Loading