Skip to content

community[patch]: Make sql record manager fully compatible with async #20735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
May 8, 2024

Conversation

pprados
Copy link
Contributor

@pprados pprados commented Apr 22, 2024

The _amake_session() method does not allow modifying the self.session_factory with
anything other than async_sessionmaker. This prohibits advanced uses of index().

In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the index() API.
This API proposes to use an SQL-type record manager.

In a classic use case, using SQLRecordManager and a vector database, it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector database.

With the PR we are proposing for langchain-postgres,
it is now possible to guarantee the consistency of the import of chunks into
a vector database. It's possible only if the outer session is built
with the connection.

def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)

The same thing is possible asynchronously, but a bug in sql_record_manager.py
in _amake_session() must first be fixed.

    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session

Then, it is possible to do the same thing asynchronously:

async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())

pprados added 2 commits April 22, 2024 12:01
Add RunnableWithKwargs
Add first optimization
Copy link

vercel bot commented Apr 22, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
langchain ⬜️ Ignored (Inspect) Visit Preview May 6, 2024 11:35am

@pprados pprados marked this pull request as ready for review April 22, 2024 11:05
@dosubot dosubot bot added size:XS This PR changes 0-9 lines, ignoring generated files. Ɑ: vector store Related to vector store module 🔌: postgres Related to postgres integrations 🤖:improvement Medium size change to existing code to handle new use-cases labels Apr 22, 2024
@eyurtsev
Copy link
Collaborator

This code is mutating in place attributes on the classes.

        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker

While attributes are not marked explicitly as private, it's safest to still treat these as such.


Are you able to pass required information via the initializer? If so, what information do you need?

    def __init__(
        self,
        namespace: str,
        *,
        connection: Optional[Union[??]], # <-- Is there something that can be provided here that will accommodate your use use case,
        engine: Optional[Union[Engine, AsyncEngine]] = None,
        db_url: Union[None, str, URL] = None,
        engine_kwargs: Optional[Dict[str, Any]] = None,
        async_mode: bool = False,
    ) -> None:

@eyurtsev eyurtsev self-assigned this Apr 22, 2024
@pprados
Copy link
Contributor Author

pprados commented Apr 25, 2024

The SQLRecordManager can not be initialized with connection. And, it's not enough.

The session_maker = scoped_session(sessionmaker(bind=connection)) and async_scoped_session(async_sessionmaker(bind=connection),scopefunc=current_task) must be used by all SQL tools.
May be, it's possible to add a parameter with current session_maker for PGVector and SQLRecordManager.

It's a complex process. The session_maker MUST be initialized with the CURRENT connection (one connection by thread or async job. It's not possible to share the connection with different threads. It's not reentrant). Only with this construction, it's possible to share all the sub-transaction with the outer transaction, and guarantee full execution of the import, whatever happens.

The LCEL approch propose to use a singleton to describe the chain. So, you must use an Engine, and not a Connection. The connection is open on when it's need, one for each job.

The attributes record_manager.session_factory and pgvector.session_maker may be updated for that.

@pprados
Copy link
Contributor Author

pprados commented Apr 26, 2024

Hello from Paris 😄

To fully grasp the difficulty of this, one must understand that scoped_session() is a class, which has, as an attribute, a local() allowing variables associated with the current thread. Therefore, within the same thread, multiple instances of scoped_session() can exist, each having its own local variable to keep the session active. Hence, if scoped_session() is used in both the PGVector() and SQLRecordManager() classes, they will not share the same session, leading to inconsistency in case of a crash during import.

One could have a global variable global_scoped_session with an instance of scoped_session() for langchain, and ensure that all SQLAlchemy APIs use this global variable for session creation. Here, similarly, it's the caller, responsible for creating the first session, who can decide how to create it.

However, this doesn't work for our use case because, during the initialization of the global_scoped_session variable, a sessionmaker(bind=connection) needs to be created, and it's not possible to use a connection for a global variable. Possibly, a sessionmaker(bind=engine), but it's strange to have an engine imposed by langchain globally.

One could imagine proposing an alternative to scoped_session that uses a global variable for local(): global_scoped_session. Thus, if both classes default to using global_scoped_session(), if the invocation is made directly, a session is created. There's no longer a need to modify session_(maker|factory).

If, before the call, global_scoped_session() is invoked, PGVector and SQLRecordManager use this session. Therefore, it's the caller who can decide how the original session will be created.

So, to conclude, the only viable solution, in my opinion, is the one I propose. By offering the ability to modify the session_factory and session_maker, my PR can be validated 😄.

baskaryan and others added 16 commits April 26, 2024 09:42
* Updating the provider docs page. 
The RAG example was meant to be moved to cookbook, but was merged by
mistake.

* Fix bug in Groundedness Check

---------

Co-authored-by: JuHyung-Son <[email protected]>
Co-authored-by: Erick Friis <[email protected]>
…langchain-ai#19743)

OpenAI API compatible server may not support `safe_len_embedding`, 

use `disable_safe_len_embeddings=True` to disable it.

---------

Co-authored-by: Bagatur <[email protected]>
…ames method (langchain-ai#20467)

Add the remove_unwanted_classnames method to the
BeautifulSoupTransformer class, which can filter more effectively.

---------

Co-authored-by: Bagatur <[email protected]>
Co-authored-by: Bagatur <[email protected]>
api docs build doesn't tolerate them
…i#19406)

- **Description:** Adapt JinaEmbeddings to run with the new Jina AI
Rerank API
- **Twitter handle:** https://twitter.com/JinaAI_


- [ ] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.


- [ ] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/

---------

Co-authored-by: Bagatur <[email protected]>
Co-authored-by: Bagatur <[email protected]>
Thank you for contributing to LangChain!

- [x] **PR title**: Fix misplaced zep cloud example links
- [x] **PR message**: 
- **Description:** Fixes misplaced links for vector store and memory zep
cloud examples

- [x] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.


- [x] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/

Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.

If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, hwchase17.
Thank you for contributing to LangChain!

- [x] **PR title**: "package: description"
- Where "package" is whichever of langchain, community, core,
experimental, etc. is being modified. Use "docs: ..." for purely docs
changes, "templates: ..." for template changes, "infra: ..." for CI
changes.
  - Example: "community: add foobar LLM"


**Description:** In VoyageAI text-embedding examples use voyage-law-2
model


- [x] **Add tests and docs**: If you're adding a new integration, please
include
1. a test for the integration, preferably unit tests that do not rely on
network access,
2. an example notebook showing its use. It lives in
`docs/docs/integrations` directory.


- [x] **Lint and test**: Run `make format`, `make lint` and `make test`
from the root of the package(s) you've modified. See contribution
guidelines for more: https://python.langchain.com/docs/contributing/

Additional guidelines:
- Make sure optional dependencies are imported within a function.
- Please do not add dependencies to pyproject.toml files (even optional
ones) unless they are required for unit tests.
- Most PRs should not touch more than one package.
- Changes should be backwards compatible.
- If you are adding something to community, do not re-import it in
langchain.

If no one reviews your PR within a few days, please @-mention one of
baskaryan, efriis, eyurtsev, hwchase17.
…of RecursiveUrlLoader document (langchain-ai#20875)

**Description:** 
The RecursiveUrlLoader loader offers a link_regex parameter that can
filter out URLs. However, this filtering capability is limited, and if
the internal links of the website change, unexpected resources may be
loaded. These resources, such as font files, can cause problems in
subsequent embedding processing.

>
https://blog.langchain.dev/assets/fonts/source-sans-pro-v21-latin-ext_latin-regular.woff2?v=0312715cbf

We can add the Content-Type in the HTTP response headers to the document
metadata so developers can choose which resources to use. This allows
developers to make their own choices.

For example, the following may be a good choice for text knowledge.

- text/plain - simple text file
- text/html - HTML web page
- text/xml - XML format file
- text/json - JSON format data
- application/pdf - PDF file
- application/msword - Word document

and ignore the following

- text/css - CSS stylesheet
- text/javascript - JavaScript script
- application/octet-stream - binary data
- image/jpeg - JPEG image
- image/png - PNG image
- image/gif - GIF image
- image/svg+xml - SVG image
- audio/mpeg - MPEG audio files
- video/mp4 - MP4 video file
- application/font-woff - WOFF font file
- application/font-ttf - TTF font file
- application/zip - ZIP compressed file
- application/octet-stream - binary data

**Twitter handle:** @coolbeevip

---------

Co-authored-by: Bagatur <[email protected]>
…angchain-ai#20876)

For driver connection reusage, we introduce passing the graph object to
neo4j integrations
**Description:** 
This PR removes an unnecessary code snippet from the documentation. The
snippet in question is not relevant to the content and does not
contribute to the overall understanding of the topic. It contained
redundant imports and unused code, potentially causing confusion for
readers.

**Issue:** 
There is no specific issue number associated with this change.

**Dependencies:** 
No additional dependencies are required for this change.

---------

Co-authored-by: Bagatur <[email protected]>
Co-authored-by: Bagatur <[email protected]>
dristysrivastava and others added 5 commits April 26, 2024 09:42
…angchain-ai#20269)

**Description**:
_PebbloSafeLoader_: Add support for pebblo server and client version


**Documentation:** NA
**Unit test:** NA
**Issue:** NA
**Dependencies:**  None

---------

Co-authored-by: Bagatur <[email protected]>
…gchain-ai#17663)

**Description:** AzureSearch vector store has no tests. This PR adds
initial tests to validate the code can be imported and used.
**Issue:** N/A
**Dependencies:** azure-search-documents and azure-identity are added as
optional dependencies for testing

---------

Co-authored-by: Matt Gotteiner <[email protected]>
Co-authored-by: Bagatur <[email protected]>
…hain-ai#20905)

`langchain_pinecone.Pinecone` is deprecated in favor of
`PineconeVectorStore`, and is currently a subclass of
`PineconeVectorStore`.
```python
@deprecated(since="0.0.3", removal="0.2.0", alternative="PineconeVectorStore")
class Pinecone(PineconeVectorStore):
    """Deprecated. Use PineconeVectorStore instead."""

    pass
```
@efriis efriis added the partner label Apr 26, 2024
@efriis efriis self-assigned this Apr 26, 2024
@dosubot dosubot bot added size:XXL This PR changes 1000+ lines, ignoring generated files. and removed size:XS This PR changes 0-9 lines, ignoring generated files. labels Apr 26, 2024
@dosubot dosubot bot added size:XS This PR changes 0-9 lines, ignoring generated files. and removed size:XXL This PR changes 1000+ lines, ignoring generated files. labels Apr 26, 2024
@pprados pprados marked this pull request as draft May 6, 2024 11:31
@pprados pprados marked this pull request as ready for review May 6, 2024 11:39
@dosubot dosubot bot added size:S This PR changes 10-29 lines, ignoring generated files. and removed size:XS This PR changes 0-9 lines, ignoring generated files. labels May 6, 2024
@pprados
Copy link
Contributor Author

pprados commented May 6, 2024

Hello @eyurtsev

Can you reconsider this very important small PR? Without it, it's not possible to have a resilient RAG in production.

@@ -60,6 +60,10 @@ def parse_result(
json_object = super().parse_result(result)
return self._parse_obj(json_object)

def parse_result(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be included

@@ -175,7 +175,7 @@ def _make_session(self) -> Generator[Session, None, None]:
async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
"""Create a session and close it after use."""

if not isinstance(self.session_factory, async_sessionmaker):
if not isinstance(self.engine, AsyncEngine):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't the original code work?

@dosubot dosubot bot added the lgtm PR looks good. Use to confirm that a PR is ready for merging. label May 8, 2024
@eyurtsev eyurtsev changed the title Make sql record manager fully compatible with async community[patch]: Make sql record manager fully compatible with async May 8, 2024
@eyurtsev eyurtsev merged commit 7be6822 into langchain-ai:master May 8, 2024
60 checks passed
kyle-cassidy pushed a commit to kyle-cassidy/langchain that referenced this pull request May 10, 2024
…langchain-ai#20735)

The `_amake_session()` method does not allow modifying the
`self.session_factory` with
anything other than `async_sessionmaker`. This prohibits advanced uses
of `index()`.

In a RAG architecture, it is necessary to import document chunks.
To keep track of the links between chunks and documents, we can use the
`index()` API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import. Indeed, if a crash occurs
during the import
(problem with the network, ...)
there is an inconsistency between the SQL database and the vector
database.

With the
[PR](langchain-ai/langchain-postgres#32) we are
proposing for `langchain-postgres`,
it is now possible to guarantee the consistency of the import of chunks
into
a vector database.  It's possible only if the outer session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed.

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 

Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

---------

Signed-off-by: Rahul Tripathi <[email protected]>
Co-authored-by: Bagatur <[email protected]>
Co-authored-by: Sean <[email protected]>
Co-authored-by: JuHyung-Son <[email protected]>
Co-authored-by: Erick Friis <[email protected]>
Co-authored-by: YISH <[email protected]>
Co-authored-by: Bagatur <[email protected]>
Co-authored-by: Jason_Chen <[email protected]>
Co-authored-by: Joan Fontanals <[email protected]>
Co-authored-by: Pavlo Paliychuk <[email protected]>
Co-authored-by: fzowl <[email protected]>
Co-authored-by: samanhappy <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: Tomaz Bratanic <[email protected]>
Co-authored-by: merdan <[email protected]>
Co-authored-by: ccurme <[email protected]>
Co-authored-by: Andres Algaba <[email protected]>
Co-authored-by: davidefantiniIntel <[email protected]>
Co-authored-by: Jingpan Xiong <[email protected]>
Co-authored-by: kaka <[email protected]>
Co-authored-by: jingsi <[email protected]>
Co-authored-by: Eugene Yurtsev <[email protected]>
Co-authored-by: Rahul Triptahi <[email protected]>
Co-authored-by: Rahul Tripathi <[email protected]>
Co-authored-by: Shengsheng Huang <[email protected]>
Co-authored-by: Michael Schock <[email protected]>
Co-authored-by: Anish Chakraborty <[email protected]>
Co-authored-by: am-kinetica <[email protected]>
Co-authored-by: Dristy Srivastava <[email protected]>
Co-authored-by: Matt <[email protected]>
Co-authored-by: William FH <[email protected]>
eyurtsev pushed a commit to langchain-ai/langchain-postgres that referenced this pull request Jun 10, 2024
I made the mistake of doing a rebase while a review was in progress.
This seems to block the process. There's a ‘1 change requested’ request
that I can't validate. I propose another PR identical to the [previous
one](#32).

This PR adds the **async** approach for pgvector.

Some remarks:
- We use assert to check invocations and not if. Thus, in production, it
is possible to
remove these checks with `python -O ...`
- We propose a public `session_maker` attribute. This is very important
for resilient
scenarios.

In a RAG architecture, it is necessary to import document chunks.

To keep track of the links between chunks and documents, we can use the 

[index()](https://python.langchain.com/docs/modules/data_connection/indexing/)
API.
This API proposes to use an SQL-type record manager.

In a classic use case, using `SQLRecordManager` and a vector database,
it is impossible
to guarantee the consistency of the import.

Indeed, if a crash occurs during the import, there is an inconsistency
between the SQL
database and the vector database.

**PGVector is the solution to this problem.**

Indeed, it is possible to use a single database (and not a two-phase
commit with 2
technologies, if they are both compatible). But, for this, it is
necessary to be able
to combine the transactions between the use of `SQLRecordManager` and
`PGVector` as a
vector database.

This is only possible if it is possible to intervene on the
`session_maker`.

This is why we propose to make this attribute public. By unifying the
`session_maker`
of `SQLRecordManager` and `PGVector`, it is possible to guarantee that
all processes will
be executed in a single transaction.

This is, moreover, the only solution we know of to guarantee the
consistency of the
import of chunks into a vector database. It's possible only if the outer
session is built
with the connection.

```python
def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )

    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
    )
    record_manager.create_schema()

    with engine.connect() as connection:
        session_maker = scoped_session(sessionmaker(bind=connection))
        # NOTE: Update session_factories
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        with connection.begin():
            loader = CSVLoader(
                    "data/faq/faq.csv",
                    source_column="source",
                    autodetect_encoding=True,
                )
            result = index(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)
```
The same thing is possible asynchronously, but a bug in
`sql_record_manager.py`
in `_amake_session()` must first be fixed (See
[PR](langchain-ai/langchain#20735) ).

```python
    async def _amake_session(self) -> AsyncGenerator[AsyncSession, None]:
        """Create a session and close it after use."""

        # FIXME: REMOVE if not isinstance(self.session_factory, async_sessionmaker):~~
        if not isinstance(self.engine, AsyncEngine):
            raise AssertionError("This method is not supported for sync engines.")

        async with self.session_factory() as session:
            yield session
``` 
Then, it is possible to do the same thing asynchronously:

```python
async def main():
    db_url = "postgresql+psycopg://postgres:password_postgres@localhost:5432/"
    engine = create_async_engine(db_url, echo=True)
    embeddings = FakeEmbeddings()
    pgvector:VectorStore = PGVector(
        embeddings=embeddings,
        connection=engine,
    )
    record_manager = SQLRecordManager(
        namespace="namespace",
        engine=engine,
        async_mode=True,
    )
    await record_manager.acreate_schema()

    async with engine.connect() as connection:
        session_maker = async_scoped_session(
            async_sessionmaker(bind=connection),
            scopefunc=current_task)
        record_manager.session_factory = session_maker
        pgvector.session_maker = session_maker
        async with connection.begin():
            loader = CSVLoader(
                "data/faq/faq.csv",
                source_column="source",
                autodetect_encoding=True,
            )
            result = await aindex(
                source_id_key="source",
                docs_source=loader.load()[:1],
                cleanup="incremental",
                vector_store=pgvector,
                record_manager=record_manager,
            )
            print(result)


asyncio.run(main())
```

The promise of the constructor, with the `create_extension` parameter,
is to guarantee that the extension is added before the APIs are used.
Since this promise cannot be kept in an `async` scenario, there is an
alternative:

- Remove this parameter, since the promise cannot be kept. Otherwise, an
`async` method is needed to install the extension before the APIs are
used, and to check that this method has been invoked at the start of
each API.
- Use a lazy approach as suggested, which simply respects the
constructor's promise.
@pprados pprados deleted the pprados/fix_sql_record_manager branch June 18, 2024 06:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:improvement Medium size change to existing code to handle new use-cases lgtm PR looks good. Use to confirm that a PR is ready for merging. partner 🔌: postgres Related to postgres integrations size:S This PR changes 10-29 lines, ignoring generated files. Ɑ: vector store Related to vector store module
Projects
None yet
Development

Successfully merging this pull request may close these issues.