-
Notifications
You must be signed in to change notification settings - Fork 17.7k
community[patch]: Make sql record manager fully compatible with async #20735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
community[patch]: Make sql record manager fully compatible with async #20735
Conversation
Add RunnableWithKwargs Add first optimization
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
This code is mutating in place attributes on the classes.
While attributes are not marked explicitly as private, it's safest to still treat these as such. Are you able to pass required information via the initializer? If so, what information do you need? def __init__(
self,
namespace: str,
*,
connection: Optional[Union[??]], # <-- Is there something that can be provided here that will accommodate your use use case,
engine: Optional[Union[Engine, AsyncEngine]] = None,
db_url: Union[None, str, URL] = None,
engine_kwargs: Optional[Dict[str, Any]] = None,
async_mode: bool = False,
) -> None: |
The SQLRecordManager can not be initialized with connection. And, it's not enough. The It's a complex process. The The LCEL approch propose to use a singleton to describe the chain. So, you must use an Engine, and not a Connection. The connection is open on when it's need, one for each job. The attributes |
Hello from Paris 😄 To fully grasp the difficulty of this, one must understand that One could have a global variable However, this doesn't work for our use case because, during the initialization of the One could imagine proposing an alternative to If, before the call, So, to conclude, the only viable solution, in my opinion, is the one I propose. By offering the ability to modify the |
* Updating the provider docs page. The RAG example was meant to be moved to cookbook, but was merged by mistake. * Fix bug in Groundedness Check --------- Co-authored-by: JuHyung-Son <[email protected]> Co-authored-by: Erick Friis <[email protected]>
…angchain-ai#20849) mv StructuredQuery to core
…langchain-ai#19743) OpenAI API compatible server may not support `safe_len_embedding`, use `disable_safe_len_embeddings=True` to disable it. --------- Co-authored-by: Bagatur <[email protected]>
…ames method (langchain-ai#20467) Add the remove_unwanted_classnames method to the BeautifulSoupTransformer class, which can filter more effectively. --------- Co-authored-by: Bagatur <[email protected]> Co-authored-by: Bagatur <[email protected]>
api docs build doesn't tolerate them
…i#19406) - **Description:** Adapt JinaEmbeddings to run with the new Jina AI Rerank API - **Twitter handle:** https://twitter.com/JinaAI_ - [ ] **Add tests and docs**: If you're adding a new integration, please include 1. a test for the integration, preferably unit tests that do not rely on network access, 2. an example notebook showing its use. It lives in `docs/docs/integrations` directory. - [ ] **Lint and test**: Run `make format`, `make lint` and `make test` from the root of the package(s) you've modified. See contribution guidelines for more: https://python.langchain.com/docs/contributing/ --------- Co-authored-by: Bagatur <[email protected]> Co-authored-by: Bagatur <[email protected]>
Thank you for contributing to LangChain! - [x] **PR title**: Fix misplaced zep cloud example links - [x] **PR message**: - **Description:** Fixes misplaced links for vector store and memory zep cloud examples - [x] **Add tests and docs**: If you're adding a new integration, please include 1. a test for the integration, preferably unit tests that do not rely on network access, 2. an example notebook showing its use. It lives in `docs/docs/integrations` directory. - [x] **Lint and test**: Run `make format`, `make lint` and `make test` from the root of the package(s) you've modified. See contribution guidelines for more: https://python.langchain.com/docs/contributing/ Additional guidelines: - Make sure optional dependencies are imported within a function. - Please do not add dependencies to pyproject.toml files (even optional ones) unless they are required for unit tests. - Most PRs should not touch more than one package. - Changes should be backwards compatible. - If you are adding something to community, do not re-import it in langchain. If no one reviews your PR within a few days, please @-mention one of baskaryan, efriis, eyurtsev, hwchase17.
Thank you for contributing to LangChain! - [x] **PR title**: "package: description" - Where "package" is whichever of langchain, community, core, experimental, etc. is being modified. Use "docs: ..." for purely docs changes, "templates: ..." for template changes, "infra: ..." for CI changes. - Example: "community: add foobar LLM" **Description:** In VoyageAI text-embedding examples use voyage-law-2 model - [x] **Add tests and docs**: If you're adding a new integration, please include 1. a test for the integration, preferably unit tests that do not rely on network access, 2. an example notebook showing its use. It lives in `docs/docs/integrations` directory. - [x] **Lint and test**: Run `make format`, `make lint` and `make test` from the root of the package(s) you've modified. See contribution guidelines for more: https://python.langchain.com/docs/contributing/ Additional guidelines: - Make sure optional dependencies are imported within a function. - Please do not add dependencies to pyproject.toml files (even optional ones) unless they are required for unit tests. - Most PRs should not touch more than one package. - Changes should be backwards compatible. - If you are adding something to community, do not re-import it in langchain. If no one reviews your PR within a few days, please @-mention one of baskaryan, efriis, eyurtsev, hwchase17.
…of RecursiveUrlLoader document (langchain-ai#20875) **Description:** The RecursiveUrlLoader loader offers a link_regex parameter that can filter out URLs. However, this filtering capability is limited, and if the internal links of the website change, unexpected resources may be loaded. These resources, such as font files, can cause problems in subsequent embedding processing. > https://blog.langchain.dev/assets/fonts/source-sans-pro-v21-latin-ext_latin-regular.woff2?v=0312715cbf We can add the Content-Type in the HTTP response headers to the document metadata so developers can choose which resources to use. This allows developers to make their own choices. For example, the following may be a good choice for text knowledge. - text/plain - simple text file - text/html - HTML web page - text/xml - XML format file - text/json - JSON format data - application/pdf - PDF file - application/msword - Word document and ignore the following - text/css - CSS stylesheet - text/javascript - JavaScript script - application/octet-stream - binary data - image/jpeg - JPEG image - image/png - PNG image - image/gif - GIF image - image/svg+xml - SVG image - audio/mpeg - MPEG audio files - video/mp4 - MP4 video file - application/font-woff - WOFF font file - application/font-ttf - TTF font file - application/zip - ZIP compressed file - application/octet-stream - binary data **Twitter handle:** @coolbeevip --------- Co-authored-by: Bagatur <[email protected]>
…angchain-ai#20876) For driver connection reusage, we introduce passing the graph object to neo4j integrations
**Description:** This PR removes an unnecessary code snippet from the documentation. The snippet in question is not relevant to the content and does not contribute to the overall understanding of the topic. It contained redundant imports and unused code, potentially causing confusion for readers. **Issue:** There is no specific issue number associated with this change. **Dependencies:** No additional dependencies are required for this change. --------- Co-authored-by: Bagatur <[email protected]> Co-authored-by: Bagatur <[email protected]>
Does not update docs.
…angchain-ai#20269) **Description**: _PebbloSafeLoader_: Add support for pebblo server and client version **Documentation:** NA **Unit test:** NA **Issue:** NA **Dependencies:** None --------- Co-authored-by: Bagatur <[email protected]>
…gchain-ai#17663) **Description:** AzureSearch vector store has no tests. This PR adds initial tests to validate the code can be imported and used. **Issue:** N/A **Dependencies:** azure-search-documents and azure-identity are added as optional dependencies for testing --------- Co-authored-by: Matt Gotteiner <[email protected]> Co-authored-by: Bagatur <[email protected]>
…hain-ai#20905) `langchain_pinecone.Pinecone` is deprecated in favor of `PineconeVectorStore`, and is currently a subclass of `PineconeVectorStore`. ```python @deprecated(since="0.0.3", removal="0.2.0", alternative="PineconeVectorStore") class Pinecone(PineconeVectorStore): """Deprecated. Use PineconeVectorStore instead.""" pass ```
Hello @eyurtsev Can you reconsider this very important small PR? Without it, it's not possible to have a resilient RAG in production. |
@@ -60,6 +60,10 @@ def parse_result( | |||
json_object = super().parse_result(result) | |||
return self._parse_obj(json_object) | |||
|
|||
def parse_result( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be included
@@ -175,7 +175,7 @@ def _make_session(self) -> Generator[Session, None, None]: | |||
async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: | |||
"""Create a session and close it after use.""" | |||
|
|||
if not isinstance(self.session_factory, async_sessionmaker): | |||
if not isinstance(self.engine, AsyncEngine): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't the original code work?
…langchain-ai#20735) The `_amake_session()` method does not allow modifying the `self.session_factory` with anything other than `async_sessionmaker`. This prohibits advanced uses of `index()`. In a RAG architecture, it is necessary to import document chunks. To keep track of the links between chunks and documents, we can use the `index()` API. This API proposes to use an SQL-type record manager. In a classic use case, using `SQLRecordManager` and a vector database, it is impossible to guarantee the consistency of the import. Indeed, if a crash occurs during the import (problem with the network, ...) there is an inconsistency between the SQL database and the vector database. With the [PR](langchain-ai/langchain-postgres#32) we are proposing for `langchain-postgres`, it is now possible to guarantee the consistency of the import of chunks into a vector database. It's possible only if the outer session is built with the connection. ```python def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, ) record_manager.create_schema() with engine.connect() as connection: session_maker = scoped_session(sessionmaker(bind=connection)) # NOTE: Update session_factories record_manager.session_factory = session_maker pgvector.session_maker = session_maker with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = index( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) ``` The same thing is possible asynchronously, but a bug in `sql_record_manager.py` in `_amake_session()` must first be fixed. ```python async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: """Create a session and close it after use.""" # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~ if not isinstance(self.engine, AsyncEngine): raise AssertionError("This method is not supported for sync engines.") async with self.session_factory() as session: yield session ``` Then, it is possible to do the same thing asynchronously: ```python async def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_async_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, async_mode=True, ) await record_manager.acreate_schema() async with engine.connect() as connection: session_maker = async_scoped_session( async_sessionmaker(bind=connection), scopefunc=current_task) record_manager.session_factory = session_maker pgvector.session_maker = session_maker async with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = await aindex( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) asyncio.run(main()) ``` --------- Signed-off-by: Rahul Tripathi <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Sean <[email protected]> Co-authored-by: JuHyung-Son <[email protected]> Co-authored-by: Erick Friis <[email protected]> Co-authored-by: YISH <[email protected]> Co-authored-by: Bagatur <[email protected]> Co-authored-by: Jason_Chen <[email protected]> Co-authored-by: Joan Fontanals <[email protected]> Co-authored-by: Pavlo Paliychuk <[email protected]> Co-authored-by: fzowl <[email protected]> Co-authored-by: samanhappy <[email protected]> Co-authored-by: Lei Zhang <[email protected]> Co-authored-by: Tomaz Bratanic <[email protected]> Co-authored-by: merdan <[email protected]> Co-authored-by: ccurme <[email protected]> Co-authored-by: Andres Algaba <[email protected]> Co-authored-by: davidefantiniIntel <[email protected]> Co-authored-by: Jingpan Xiong <[email protected]> Co-authored-by: kaka <[email protected]> Co-authored-by: jingsi <[email protected]> Co-authored-by: Eugene Yurtsev <[email protected]> Co-authored-by: Rahul Triptahi <[email protected]> Co-authored-by: Rahul Tripathi <[email protected]> Co-authored-by: Shengsheng Huang <[email protected]> Co-authored-by: Michael Schock <[email protected]> Co-authored-by: Anish Chakraborty <[email protected]> Co-authored-by: am-kinetica <[email protected]> Co-authored-by: Dristy Srivastava <[email protected]> Co-authored-by: Matt <[email protected]> Co-authored-by: William FH <[email protected]>
I made the mistake of doing a rebase while a review was in progress. This seems to block the process. There's a ‘1 change requested’ request that I can't validate. I propose another PR identical to the [previous one](#32). This PR adds the **async** approach for pgvector. Some remarks: - We use assert to check invocations and not if. Thus, in production, it is possible to remove these checks with `python -O ...` - We propose a public `session_maker` attribute. This is very important for resilient scenarios. In a RAG architecture, it is necessary to import document chunks. To keep track of the links between chunks and documents, we can use the [index()](https://python.langchain.com/docs/modules/data_connection/indexing/) API. This API proposes to use an SQL-type record manager. In a classic use case, using `SQLRecordManager` and a vector database, it is impossible to guarantee the consistency of the import. Indeed, if a crash occurs during the import, there is an inconsistency between the SQL database and the vector database. **PGVector is the solution to this problem.** Indeed, it is possible to use a single database (and not a two-phase commit with 2 technologies, if they are both compatible). But, for this, it is necessary to be able to combine the transactions between the use of `SQLRecordManager` and `PGVector` as a vector database. This is only possible if it is possible to intervene on the `session_maker`. This is why we propose to make this attribute public. By unifying the `session_maker` of `SQLRecordManager` and `PGVector`, it is possible to guarantee that all processes will be executed in a single transaction. This is, moreover, the only solution we know of to guarantee the consistency of the import of chunks into a vector database. It's possible only if the outer session is built with the connection. ```python def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, ) record_manager.create_schema() with engine.connect() as connection: session_maker = scoped_session(sessionmaker(bind=connection)) # NOTE: Update session_factories record_manager.session_factory = session_maker pgvector.session_maker = session_maker with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = index( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) ``` The same thing is possible asynchronously, but a bug in `sql_record_manager.py` in `_amake_session()` must first be fixed (See [PR](langchain-ai/langchain#20735) ). ```python async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]: """Create a session and close it after use.""" # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~ if not isinstance(self.engine, AsyncEngine): raise AssertionError("This method is not supported for sync engines.") async with self.session_factory() as session: yield session ``` Then, it is possible to do the same thing asynchronously: ```python async def main(): db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/" engine = create_async_engine(db_url, echo=True) embeddings = FakeEmbeddings() pgvector:VectorStore = PGVector( embeddings=embeddings, connection=engine, ) record_manager = SQLRecordManager( namespace="namespace", engine=engine, async_mode=True, ) await record_manager.acreate_schema() async with engine.connect() as connection: session_maker = async_scoped_session( async_sessionmaker(bind=connection), scopefunc=current_task) record_manager.session_factory = session_maker pgvector.session_maker = session_maker async with connection.begin(): loader = CSVLoader( "data/faq/faq.csv", source_column="source", autodetect_encoding=True, ) result = await aindex( source_id_key="source", docs_source=loader.load()[:1], cleanup="incremental", vector_store=pgvector, record_manager=record_manager, ) print(result) asyncio.run(main()) ``` The promise of the constructor, with the `create_extension` parameter, is to guarantee that the extension is added before the APIs are used. Since this promise cannot be kept in an `async` scenario, there is an alternative: - Remove this parameter, since the promise cannot be kept. Otherwise, an `async` method is needed to install the extension before the APIs are used, and to check that this method has been invoked at the start of each API. - Use a lazy approach as suggested, which simply respects the constructor's promise.
The
_amake_session()
method does not allow modifying theself.session_factory
withanything other than
async_sessionmaker
. This prohibits advanced uses ofindex()
.In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
index()
API.This API proposes to use an SQL-type record manager.
In a classic use case, using
SQLRecordManager
and a vector database, it is impossibleto guarantee the consistency of the import. Indeed, if a crash occurs during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector database.
With the PR we are proposing for
langchain-postgres
,it is now possible to guarantee the consistency of the import of chunks into
a vector database. It's possible only if the outer session is built
with the connection.
The same thing is possible asynchronously, but a bug in
sql_record_manager.py
in
_amake_session()
must first be fixed.Then, it is possible to do the same thing asynchronously: