Skip to content

Add sqlite_update_hook support #4234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ruifigueira
Copy link

First of all, this started as an exercise, as I'm trying to learn a bit of cloudflare workers internals, so feel free to reject this PR.

Nevertheless, the use case I have in mind is to easily be able to react to SQLite changes. For instance, we can have a websocket sending every change that occurs in a table by just listening to update events that are triggered via sqlite_update_hook.

Those events are synchronous, so with as little latency as sqlite / workerd allows.

Here's an example on how to use it:

import { DurableObject } from "cloudflare:workers";

export class WebSocketServer extends DurableObject {

  sql: SqlStorage;

  constructor(ctx: DurableObjectState, env: Env) {
    super(ctx, env);
    this.sql = ctx.storage.sql;
    this.sql.exec(`
      DROP TABLE IF EXISTS messages;
      CREATE TABLE messages (id INTEGER PRIMARY KEY, content TEXT NOT NULL);`)
  }

  upsert(id: number, text: string) {
    this.sql.exec("INSERT INTO messages (id, content) VALUES (?, ?) ON CONFLICT (id) DO UPDATE SET content = ?", id, text, text);
  }

  delete(id: number) {
    this.sql.exec("DELETE FROM messages WHERE id = ?", id);
  }

  async fetch(request) {
    // Creates two ends of a WebSocket connection.
    const webSocketPair = new WebSocketPair();
    const [client, server] = Object.values(webSocketPair);

    const sendUpdates = ({ action, tableName, rowId }) => {
      server.send(JSON.stringify({ action, tableName, rowId }));
    };

    this.sql.addEventListener("update", sendUpdates);
    server.addEventListener("close", () => {
      this.sql.removeEventListener("update", sendUpdates);
    });

    server.accept();

    return new Response(null, {
      status: 101,
      webSocket: client,
    });
  }
}

export default {
  async fetch(request: Request, env: Env) {
    const url = new URL(request.url);
    const id = env.WEBSOCKET_SERVER.idFromName("websocket-server");
    const stub = env.WEBSOCKET_SERVER.get(id);
    if (url.pathname === "/ws") {
      return await stub.fetch(request);
    } else if (url.pathname === "/messages") {
      if (!url.searchParams.has("id"))
        return new Response("Bad Request: Missing 'id' parameter", { status: 400 });

      const text = url.searchParams.get("t");
      const id = parseInt(url.searchParams.get("id")!, 10);

      const method = request.method;

      if (method === "DELETE" && id) {
        await stub.delete(id);
        return new Response(null, { status: 204 });
      } else {
        if (!text)
          return new Response("Bad Request: Missing 't' parameter", { status: 400 });

        await stub.upsert(id, text);
        return new Response(null, { status: 204 });
      }
    }

    return new Response("Not Found", { status: 404 });
  }

} satisfies ExportedHandler<Env>;

I did some quick performance tests comparing my built workerd with and without this changes and, with no registered listener, performance is basically the same, and with listeners I only noticed a small increase, maybe 5%.

It's not possible to run sql queries inside the listener yet (it crashes because currentParseContext prevents nested sql calls).

Copy link

github-actions bot commented Jun 3, 2025

All contributors have signed the CLA ✍️ ✅
Posted by the CLA Assistant Lite bot.

@ruifigueira
Copy link
Author

I have read the CLA Document and I hereby sign the CLA

github-actions bot added a commit that referenced this pull request Jun 3, 2025
@jasnell jasnell requested a review from kentonv June 3, 2025 14:45
@kentonv
Copy link
Member

kentonv commented Jun 3, 2025

Hi @ruifigueira, thanks for contributing!

To set expectations a bit, normally for a feature like this we would start with a design doc proposing an API and spend some time bikeshedding discussing it before moving on to implementation. Once we add an API we have to support it forever, so we want to be pretty careful about this. It may take some time before the right people are able to review it, as we all have a lot of other priorities. It also happens that some people on the team have done some thinking about how to support this particular feature already, so we'll have to compare notes. Ultimately we might decide we like the API you've implemented but we could also decide that we'd prefer something different.

Your PR description shows an example, which is great, but doesn't directly describe the new API is. Would you mind writing that down to make this easier to review? Just write a reply on this thread containing roughly what reference documentation for this API might look like...

@ruifigueira
Copy link
Author

ruifigueira commented Jun 3, 2025

@kentonv my expectations are very low, and I know this is a very basic approach to a very complex problem, but as I didn't see any reference in this repository to sqlite hooks, I just thought to share my exercise, which honestly was more for me to get confortable with workerd code base.

Neverthelesss, here is my initial proposal, extending it to all sqlite hooks (for instance, sqlite session extension uses sqlite3_preupdate_hook and its companion functions).

SQL Storage Events API Proposal

This API allows database operations monitoring and interception at various stages of execution.
It provides a 1:1 mapping to their corresponding sqlite hooks, but in a more event style API.

Types

// integer if Number.MIN_SAFE_INTEGER <= rowId <= Number.MAX_SAFE_INTEGER,
// or bigint otherwise
type RowId = number | bigint;

// See https://sqlite.org/c3ref/preupdate_blobwrite.html
interface SqlStoragePreUpdateEvent extends Event {
  action: 'insert' | 'update' | 'delete';
  databaseName: string;
  tableName: string;
  rowId: RowId;
  newRowId?:RowId;
  // from sqlite3_preupdate_count
  get columnsCount(): number;
  // from sqlite3_preupdate_depth
  get depth(): number;
  // sqlite3_preupdate_old for each updated column (update or delete).
  // Throws exception if called on an insert event
  getOldValues<T extends Record<string, SqlStorageValue>>(): T;
  // sqlite3_preupdate_new for each updated column (insert or update).
  // Throws exception if called on an delete event
  getNewValues<T extends Record<string, SqlStorageValue>>(): T;
}

// See https://sqlite.org/c3ref/preupdate_blobwrite.html
interface SqlStorageUpdateEvent extends Event {
  action: 'insert' | 'update' | 'delete';
  databaseName: string;
  tableName: string;
  rowId: RowId;
}

// See https://sqlite.org/c3ref/commit_hook.html
interface SqlStorageCommitEvent extends Event {
  // forces a rollback by returning a non-zero value
  // on sqlite3_commit_hook callback
  rollback(): void;
}

type SqlStorageEventMap = {
  // sqlite3_preupdate_hook
  preupdate: SqlStoragePreUpdateEvent;
  // sqlite3_update_hook
  update: SqlStorageUpdateEvent;
  // sqlite3_commit_hook
  commit: SqlStorageCommitEvent;
  // sqlite3_rollback_hook
  rollback: Event;
};

interface SqlStorage extends EventTarget<SqlStorageEventMap> {
  //...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants