I spent WAYYY too long trying to build a more accurate RAG retrieval system.
With Context Mesh Lite, I managed to combine hybrid vector search with SQL search (agentic text-to-sql) with graph search (shallow graph using dependent tables).
The results were a significantly more accurate (albeit slower) RAG system.
How does it work?
- SQL Functions do most of the heavy lifting, creating tables and table dependencies.
- Then Edge Functions call Gemini (embeddings 001 and 2.5 flash) to create vector embeddings and graph entity/predicate extraction.
REQUIREMENTS: This system was built to exist within a Supabase instance. It also requires a Gemini API key (set in your Edge Functions window).
I also connected the system to n8n workflows and it works like a charm. Anyway, I'm gonna give it to you. Maybe it'll be useful. Maybe you can improve on it.
So, first, go to your Supabase (the entire end-to-end system exists there...only the interface for document upsert and chat are external).
Step 1. Go to the SQL editor and paste this master query:
-- ===============================================================
-- CONTEXT MESH V9.0: GOLDEN MASTER (COMPOSITE RETRIEVAL)
-- UPDATED: Nov 25, 2025
-- ===============================================================
-- PART 1: EXTENSIONS
-- PART 2: STORAGE CONFIGURATION
-- PART 3: CORE TABLES (Docs, Graph, Queue, Config, Registry)
-- PART 4: INDEXES
-- PART 5: HELPER FUNCTIONS & TRIGGERS
-- PART 6: INGESTION FUNCTIONS (Universal V8 Logic)
-- PART 7: SEARCH FUNCTIONS (V9: FTS, Enrichment, Detection, Peek)
-- PART 8: CONFIGURATION LOGIC (V9 Weights)
-- PART 9: GRAPH CONTEXT RERANKER
-- PART 10: WORKER SETUP
-- PART 11: PERMISSIONS & REPAIRS
-- ===============================================================
-- ===============================================================
-- PART 1: EXTENSIONS
-- ===============================================================
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE EXTENSION IF NOT EXISTS pg_net; -- Required for Async Worker
-- ===============================================================
-- PART 2: STORAGE CONFIGURATION
-- ===============================================================
INSERT INTO storage.buckets (id, name, public)
VALUES ('raw_uploads', 'raw_uploads', false)
ON CONFLICT (id) DO NOTHING;
DROP POLICY IF EXISTS "Service Role Full Access" ON storage.objects;
CREATE POLICY "Service Role Full Access"
ON storage.objects FOR ALL
USING ( auth.role() = 'service_role' )
WITH CHECK ( auth.role() = 'service_role' );
-- ===============================================================
-- PART 3: CORE TABLES
-- ===============================================================
-- 3a. The Async Queue
CREATE TABLE IF NOT EXISTS public.ingestion_queue (
id uuid default gen_random_uuid() primary key,
uri text not null,
title text not null,
chunk_index int not null,
chunk_text text not null,
status text default 'pending',
error_log text,
created_at timestamptz default now()
);
-- 3b. RAG Tables
CREATE TABLE IF NOT EXISTS public.document (
id BIGSERIAL PRIMARY KEY,
uri TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
doc_type TEXT NOT NULL DEFAULT 'document',
meta JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS public.chunk (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT NOT NULL REFERENCES public.document(id) ON DELETE CASCADE,
ordinal INT NOT NULL,
text TEXT NOT NULL,
tsv TSVECTOR,
UNIQUE (document_id, ordinal)
);
-- PERFORMANCE: Using halfvec(768) for Gemini embeddings
CREATE TABLE IF NOT EXISTS public.chunk_embedding (
chunk_id BIGINT PRIMARY KEY REFERENCES public.chunk(id) ON DELETE CASCADE,
embedding halfvec(768) NOT NULL
);
-- 3c. Graph Tables
CREATE TABLE IF NOT EXISTS public.node (
id BIGSERIAL PRIMARY KEY,
key TEXT UNIQUE NOT NULL,
labels TEXT[] NOT NULL DEFAULT '{}',
props JSONB NOT NULL DEFAULT '{}'::jsonb
);
CREATE TABLE IF NOT EXISTS public.edge (
src BIGINT NOT NULL REFERENCES public.node(id) ON DELETE CASCADE,
dst BIGINT NOT NULL REFERENCES public.node(id) ON DELETE CASCADE,
type TEXT NOT NULL,
props JSONB NOT NULL DEFAULT '{}'::jsonb,
PRIMARY KEY (src, dst, type)
);
CREATE TABLE IF NOT EXISTS public.chunk_node (
chunk_id BIGINT NOT NULL REFERENCES public.chunk(id) ON DELETE CASCADE,
node_id BIGINT NOT NULL REFERENCES public.node(id) ON DELETE CASCADE,
rel TEXT NOT NULL DEFAULT 'MENTIONS',
PRIMARY KEY (chunk_id, node_id, rel)
);
-- 3d. Structured Data Registry (V8 Updated)
CREATE TABLE IF NOT EXISTS public.structured_table (
id BIGSERIAL PRIMARY KEY,
table_name TEXT NOT NULL UNIQUE,
document_id BIGINT REFERENCES public.document(id) ON DELETE CASCADE,
schema_def JSONB NOT NULL,
row_count INT DEFAULT 0,
-- V8 Metadata Columns
description TEXT,
column_semantics JSONB DEFAULT '{}'::jsonb,
graph_hints JSONB DEFAULT '[]'::jsonb,
sample_row JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- 3e. Configuration Table
CREATE TABLE IF NOT EXISTS public.app_config (
id INT PRIMARY KEY DEFAULT 1,
settings JSONB NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT single_row CHECK (id = 1)
);
-- ===============================================================
-- PART 4: INDEXES
-- ===============================================================
CREATE INDEX IF NOT EXISTS idx_queue_status ON public.ingestion_queue(status);
CREATE INDEX IF NOT EXISTS document_type_idx ON public.document(doc_type);
CREATE INDEX IF NOT EXISTS document_uri_idx ON public.document(uri);
CREATE INDEX IF NOT EXISTS chunk_tsv_gin ON public.chunk USING GIN (tsv);
CREATE INDEX IF NOT EXISTS chunk_doc_idx ON public.chunk(document_id);
-- Embedding Index (HNSW)
CREATE INDEX IF NOT EXISTS emb_hnsw_cos ON public.chunk_embedding USING HNSW (embedding halfvec_cosine_ops);
-- Graph Indexes
CREATE INDEX IF NOT EXISTS edge_src_idx ON public.edge (src);
CREATE INDEX IF NOT EXISTS edge_dst_idx ON public.edge (dst);
CREATE INDEX IF NOT EXISTS node_labels_gin ON public.node USING GIN (labels);
CREATE INDEX IF NOT EXISTS node_props_gin ON public.node USING GIN (props);
CREATE INDEX IF NOT EXISTS chunknode_node_idx ON public.chunk_node (node_id);
CREATE INDEX IF NOT EXISTS chunknode_chunk_idx ON public.chunk_node (chunk_id);
-- Registry Index
CREATE INDEX IF NOT EXISTS idx_structured_table_active ON public.structured_table(table_name) WHERE row_count > 0;
-- ===============================================================
-- PART 5: HELPER FUNCTIONS & TRIGGERS
-- ===============================================================
-- 5a. Full Text Search Update Trigger
CREATE OR REPLACE FUNCTION public.chunk_tsv_update()
RETURNS trigger LANGUAGE plpgsql AS $$
DECLARE doc_title text;
BEGIN
SELECT d.title INTO doc_title FROM public.document d WHERE d.id = NEW.document_id;
NEW.tsv :=
setweight(to_tsvector('english', coalesce(doc_title, '')), 'D') ||
setweight(to_tsvector('english', coalesce(NEW.text, '')), 'A');
RETURN NEW;
END $$;
DROP TRIGGER IF EXISTS chunk_tsv_trg ON public.chunk;
CREATE TRIGGER chunk_tsv_trg
BEFORE INSERT OR UPDATE OF text, document_id ON public.chunk
FOR EACH ROW EXECUTE FUNCTION public.chunk_tsv_update();
-- 5b. Sanitize Table Names
CREATE OR REPLACE FUNCTION public.sanitize_table_name(name TEXT)
RETURNS TEXT LANGUAGE sql IMMUTABLE AS $$
SELECT 'tbl_' || regexp_replace(lower(trim(name)), '[^a-z0-9_]', '_', 'g');
$$;
-- 5c. Data Extraction Helpers
CREATE OR REPLACE FUNCTION public.extract_numeric(text TEXT, key TEXT)
RETURNS NUMERIC LANGUAGE sql IMMUTABLE AS $$
SELECT (regexp_match(text, key || '\s*:\s*\$?([0-9,]+\.?[0-9]*)', 'i'))[1]::text::numeric;
$$;
-- 5d. Polymorphic Date Extraction (Supports Excel, ISO, US formats)
-- 1. Text
CREATE OR REPLACE FUNCTION public.extract_date(text TEXT)
RETURNS DATE LANGUAGE plpgsql IMMUTABLE AS $$
BEGIN
IF text ~ '^\d{5}$' THEN RETURN '1899-12-30'::date + (text::int); END IF;
IF text ~ '\d{4}-\d{2}-\d{2}' THEN RETURN (regexp_match(text, '(\d{4}-\d{2}-\d{2})'))[1]::date; END IF;
IF text ~ '\d{1,2}/\d{1,2}/\d{4}' THEN RETURN to_date((regexp_match(text, '(\d{1,2}/\d{1,2}/\d{4})'))[1], 'MM/DD/YYYY'); END IF;
RETURN NULL;
EXCEPTION WHEN OTHERS THEN RETURN NULL;
END $$;
-- 2. Numeric (Excel Serial)
CREATE OR REPLACE FUNCTION public.extract_date(val NUMERIC)
RETURNS DATE LANGUAGE plpgsql IMMUTABLE AS $$
BEGIN RETURN '1899-12-30'::date + (val::int); EXCEPTION WHEN OTHERS THEN RETURN NULL; END $$;
-- 3. Integer
CREATE OR REPLACE FUNCTION public.extract_date(val INTEGER)
RETURNS DATE LANGUAGE plpgsql IMMUTABLE AS $$
BEGIN RETURN '1899-12-30'::date + val; EXCEPTION WHEN OTHERS THEN RETURN NULL; END $$;
-- 4. Date (Pass-through)
CREATE OR REPLACE FUNCTION public.extract_date(val DATE)
RETURNS DATE LANGUAGE sql IMMUTABLE AS $$ SELECT val; $$;
CREATE OR REPLACE FUNCTION public.extract_keywords(p_text TEXT)
RETURNS TEXT[] LANGUAGE sql IMMUTABLE AS $$
SELECT array_agg(DISTINCT word) FROM (
SELECT unnest(tsvector_to_array(to_tsvector('english', p_text))) AS word
) words WHERE length(word) > 2 LIMIT 10;
$$;
-- 5e. Column Type Inference
CREATE OR REPLACE FUNCTION public.infer_column_type(sample_values TEXT[])
RETURNS TEXT LANGUAGE plpgsql IMMUTABLE AS $$
DECLARE
val TEXT;
numeric_count INT := 0;
date_count INT := 0;
boolean_count INT := 0;
total_non_null INT := 0;
BEGIN
FOR val IN SELECT unnest(sample_values) LOOP
IF val IS NOT NULL AND val != '' THEN
total_non_null := total_non_null + 1;
IF lower(val) IN ('true', 'false', 'yes', 'no', 't', 'f', 'y', 'n', '1', '0') THEN boolean_count := boolean_count + 1; END IF;
IF val ~ '\d+\s*x\s*\d+' THEN RETURN 'TEXT'; END IF;
IF val ~ '\d+\s*(cm|mm|m|km|in|ft|yd|kg|g|mg|lb|oz|ml|l|gal)' THEN RETURN 'TEXT'; END IF;
IF val ~ '^\$?-?[0-9,]+\.?[0-9]*$' THEN numeric_count := numeric_count + 1; END IF;
IF val ~ '^\d{4}-\d{2}-\d{2}' OR val ~ '^\d{1,2}/\d{1,2}/\d{4}' OR val ~ '^\d{5}$' THEN date_count := date_count + 1; END IF;
END IF;
END LOOP;
IF total_non_null = 0 THEN RETURN 'TEXT'; END IF;
IF boolean_count::float / total_non_null > 0.8 THEN RETURN 'BOOLEAN'; END IF;
IF numeric_count::float / total_non_null > 0.8 THEN RETURN 'NUMERIC'; END IF;
IF date_count::float / total_non_null > 0.8 THEN RETURN 'DATE'; END IF;
RETURN 'TEXT';
END $$;
-- ===============================================================
-- PART 6: INGESTION FUNCTIONS (RPC)
-- ===============================================================
-- 6a. Document Ingest (Standard)
CREATE OR REPLACE FUNCTION public.ingest_document_chunk(
p_uri TEXT, p_title TEXT, p_doc_meta JSONB,
p_chunk JSONB, p_nodes JSONB, p_edges JSONB, p_mentions JSONB
)
RETURNS JSONB LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pg_temp AS $$
DECLARE
v_doc_id BIGINT; v_chunk_id BIGINT; v_node JSONB; v_edge JSONB; v_mention JSONB;
v_src_id BIGINT; v_dst_id BIGINT;
BEGIN
INSERT INTO public.document(uri, title, doc_type, meta)
VALUES (p_uri, p_title, 'document', COALESCE(p_doc_meta, '{}'::jsonb))
ON CONFLICT (uri) DO UPDATE SET title = EXCLUDED.title, meta = public.document.meta || EXCLUDED.meta
RETURNING id INTO v_doc_id;
INSERT INTO public.chunk(document_id, ordinal, text)
VALUES (v_doc_id, (p_chunk->>'ordinal')::INT, p_chunk->>'text')
ON CONFLICT (document_id, ordinal) DO UPDATE SET text = EXCLUDED.text
RETURNING id INTO v_chunk_id;
IF (p_chunk ? 'embedding') THEN
INSERT INTO public.chunk_embedding(chunk_id, embedding)
VALUES (v_chunk_id, (SELECT array_agg((e)::float4 ORDER BY ord) FROM jsonb_array_elements_text(p_chunk->'embedding') WITH ORDINALITY t(e, ord))::halfvec(768))
ON CONFLICT (chunk_id) DO UPDATE SET embedding = EXCLUDED.embedding;
END IF;
FOR v_node IN SELECT * FROM jsonb_array_elements(COALESCE(p_nodes, '[]'::jsonb)) LOOP
INSERT INTO public.node(key, labels, props)
VALUES (v_node->>'key', COALESCE((SELECT array_agg(l::TEXT) FROM jsonb_array_elements_text(v_node->'labels') l), '{}'), COALESCE(v_node->'props', '{}'::jsonb))
ON CONFLICT (key) DO UPDATE SET props = public.node.props || EXCLUDED.props;
END LOOP;
FOR v_edge IN SELECT * FROM jsonb_array_elements(COALESCE(p_edges, '[]'::jsonb)) LOOP
SELECT id INTO v_src_id FROM public.node WHERE key = v_edge->>'src_key';
SELECT id INTO v_dst_id FROM public.node WHERE key = v_edge->>'dst_key';
IF v_src_id IS NOT NULL AND v_dst_id IS NOT NULL THEN
INSERT INTO public.edge(src, dst, type, props)
VALUES (v_src_id, v_dst_id, v_edge->>'type', COALESCE(v_edge->'props', '{}'::jsonb))
ON CONFLICT (src, dst, type) DO UPDATE SET props = public.edge.props || EXCLUDED.props;
END IF;
END LOOP;
FOR v_mention IN SELECT * FROM jsonb_array_elements(COALESCE(p_mentions, '[]'::jsonb)) LOOP
SELECT id INTO v_src_id FROM public.node WHERE key = v_mention->>'node_key';
IF v_chunk_id IS NOT NULL AND v_src_id IS NOT NULL THEN
INSERT INTO public.chunk_node(chunk_id, node_id, rel)
VALUES (v_chunk_id, v_src_id, COALESCE(v_mention->>'rel', 'MENTIONS'))
ON CONFLICT (chunk_id, node_id, rel) DO NOTHING;
END IF;
END LOOP;
RETURN jsonb_build_object('ok', true);
END $$;
-- 6b. Universal Spreadsheet Ingest (V8 Updated: Description Support)
DROP FUNCTION IF EXISTS public.ingest_spreadsheet(text, text, text, jsonb, jsonb, jsonb, jsonb);
CREATE OR REPLACE FUNCTION public.ingest_spreadsheet(
p_uri TEXT, p_title TEXT, p_table_name TEXT,
p_description TEXT, -- V8 Addition
p_rows JSONB, p_schema JSONB, p_nodes JSONB, p_edges JSONB
)
RETURNS JSONB LANGUAGE plpgsql SECURITY DEFINER SET search_path = public, pg_temp AS $$
DECLARE
v_doc_id BIGINT; v_safe_name TEXT; v_col_name TEXT; v_inferred_type TEXT;
v_cols TEXT[]; v_sample_values TEXT[]; v_row JSONB; v_node JSONB; v_edge JSONB;
v_src_id BIGINT; v_dst_id BIGINT; v_table_exists BOOLEAN; v_all_columns TEXT[];
v_schema_def JSONB;
BEGIN
INSERT INTO public.document(uri, title, doc_type, meta)
VALUES (p_uri, p_title, 'spreadsheet', jsonb_build_object('table_name', p_table_name))
ON CONFLICT (uri) DO UPDATE SET title = EXCLUDED.title RETURNING id INTO v_doc_id;
v_safe_name := public.sanitize_table_name(p_table_name);
SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = v_safe_name) INTO v_table_exists;
IF NOT v_table_exists THEN
-- Table Creation Logic
SELECT array_agg(DISTINCT key ORDER BY key) INTO v_all_columns FROM jsonb_array_elements(p_rows) AS r, jsonb_object_keys(r) AS key;
v_cols := ARRAY['id BIGSERIAL PRIMARY KEY'];
FOREACH v_col_name IN ARRAY v_all_columns LOOP
SELECT array_agg(kv.value::text) INTO v_sample_values FROM jsonb_array_elements(p_rows) r, jsonb_each_text(r) kv WHERE kv.key = v_col_name LIMIT 100;
v_inferred_type := public.infer_column_type(COALESCE(v_sample_values, ARRAY[]::TEXT[]));
v_cols := v_cols || format('%I %s', v_col_name, v_inferred_type);
END LOOP;
EXECUTE format('CREATE TABLE public.%I (%s)', v_safe_name, array_to_string(v_cols, ', '));
-- Permissions
EXECUTE format('GRANT ALL ON TABLE public.%I TO service_role', v_safe_name);
EXECUTE format('GRANT SELECT ON TABLE public.%I TO authenticated, anon', v_safe_name);
SELECT jsonb_object_agg(col_name, 'TEXT') INTO v_schema_def FROM unnest(v_all_columns) AS col_name;
END IF;
-- Insert Rows
FOR v_row IN SELECT * FROM jsonb_array_elements(p_rows) LOOP
DECLARE v_k TEXT; v_v TEXT; v_cl TEXT[] := ARRAY[]::TEXT[]; v_vl TEXT[] := ARRAY[]::TEXT[];
BEGIN
FOR v_k, v_v IN SELECT * FROM jsonb_each_text(v_row) LOOP v_cl := v_cl || quote_ident(v_k); v_vl := v_vl || quote_literal(v_v); END LOOP;
IF array_length(v_cl, 1) > 0 THEN EXECUTE format('INSERT INTO public.%I (%s) VALUES (%s)', v_safe_name, array_to_string(v_cl, ', '), array_to_string(v_vl, ', ')); END IF;
END;
END LOOP;
-- Upsert Registry (V8 with Description)
INSERT INTO public.structured_table(table_name, document_id, schema_def, row_count, description)
VALUES (v_safe_name, v_doc_id, COALESCE(v_schema_def, '{}'::jsonb), jsonb_array_length(p_rows), p_description)
ON CONFLICT (table_name) DO UPDATE SET updated_at = now(), description = EXCLUDED.description;
-- Upsert Graph Nodes
FOR v_node IN SELECT * FROM jsonb_array_elements(COALESCE(p_nodes, '[]'::jsonb)) LOOP
INSERT INTO public.node(key, labels, props)
VALUES (v_node->>'key', COALESCE((SELECT array_agg(l::TEXT) FROM jsonb_array_elements_text(v_node->'labels') l), '{}'), COALESCE(v_node->'props', '{}'::jsonb))
ON CONFLICT (key) DO UPDATE SET props = public.node.props || EXCLUDED.props;
END LOOP;
-- Upsert Graph Edges
FOR v_edge IN SELECT * FROM jsonb_array_elements(COALESCE(p_edges, '[]'::jsonb)) LOOP
SELECT id INTO v_src_id FROM public.node WHERE key = v_edge->>'src_key';
SELECT id INTO v_dst_id FROM public.node WHERE key = v_edge->>'dst_key';
IF v_src_id IS NOT NULL AND v_dst_id IS NOT NULL THEN
INSERT INTO public.edge(src, dst, type, props)
VALUES (v_src_id, v_dst_id, v_edge->>'type', COALESCE(v_edge->'props', '{}'::jsonb))
ON CONFLICT (src, dst, type) DO UPDATE SET props = public.edge.props || EXCLUDED.props;
END IF;
END LOOP;
RETURN jsonb_build_object('ok', true, 'table_name', v_safe_name);
END $$;
-- ===============================================================
-- PART 7: SEARCH FUNCTIONS (UPDATED V9 - COMPOSITE RETRIEVAL)
-- ===============================================================
-- 7a. Vector Search
CREATE OR REPLACE FUNCTION public.search_vector(
p_embedding VECTOR(768),
p_limit INT,
p_threshold FLOAT8 DEFAULT 0.65
)
RETURNS TABLE(chunk_id BIGINT, content TEXT, score FLOAT8)
LANGUAGE sql STABLE AS $$
SELECT
ce.chunk_id,
c.text as content,
1.0 / (1.0 + (ce.embedding <=> p_embedding)) AS score
FROM public.chunk_embedding ce
JOIN public.chunk c ON c.id = ce.chunk_id
WHERE (1.0 / (1.0 + (ce.embedding <=> p_embedding))) >= p_threshold
ORDER BY score DESC
LIMIT p_limit;
$$;
-- 7b. Multi-Strategy Full-Text Search (V9)
CREATE OR REPLACE FUNCTION public.search_fulltext(
p_query text,
p_limit integer
)
RETURNS TABLE(
chunk_id bigint,
content text,
score double precision
)
LANGUAGE sql STABLE AS $$
WITH query_variants AS (
SELECT
websearch_to_tsquery('english', p_query) AS tsq_websearch,
plainto_tsquery('english', p_query) AS tsq_plain,
to_tsquery('english', regexp_replace(p_query, '\s+', ' | ', 'g')) AS tsq_or
),
results AS (
-- Strategy 1: Websearch (most precise)
SELECT
c.id AS chunk_id,
c.text AS content,
ts_rank_cd(c.tsv, q.tsq_websearch)::float8 AS score,
1 as strategy
FROM public.chunk c
CROSS JOIN query_variants q
WHERE c.tsv @@ q.tsq_websearch
UNION ALL
-- Strategy 2: Plain text (more flexible)
SELECT
c.id AS chunk_id,
c.text AS content,
ts_rank_cd(c.tsv, q.tsq_plain)::float8 * 0.8 AS score,
2 as strategy
FROM public.chunk c
CROSS JOIN query_variants q
WHERE c.tsv @@ q.tsq_plain
AND NOT EXISTS (
SELECT 1 FROM public.chunk c2
CROSS JOIN query_variants q2
WHERE c2.id = c.id AND c2.tsv @@ q2.tsq_websearch
)
UNION ALL
-- Strategy 3: OR query (most flexible)
SELECT
c.id AS chunk_id,
c.text AS content,
ts_rank_cd(c.tsv, q.tsq_or)::float8 * 0.6 AS score,
3 as strategy
FROM public.chunk c
CROSS JOIN query_variants q
WHERE c.tsv @@ q.tsq_or
AND NOT EXISTS (
SELECT 1 FROM public.chunk c2
CROSS JOIN query_variants q2
WHERE c2.id = c.id
AND (c2.tsv @@ q2.tsq_websearch OR c2.tsv @@ q2.tsq_plain)
)
)
SELECT chunk_id, content, score
FROM results
ORDER BY score DESC
LIMIT p_limit;
$$;
GRANT EXECUTE ON FUNCTION public.search_fulltext TO anon, authenticated, service_role;
-- 7c. Table Peeking (V9: GENERIC FK DETECTION + REVERSE LOOKUP)
CREATE OR REPLACE FUNCTION public.get_table_context(p_table_name TEXT)
RETURNS JSONB
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public, pg_temp
AS $$
DECLARE
v_safe_name TEXT;
v_columns TEXT;
v_sample_row JSONB;
v_description TEXT;
v_semantics JSONB;
v_categorical_values JSONB := '{}'::jsonb;
v_related_tables JSONB := '[]'::jsonb;
v_cat_col TEXT;
v_cat_values JSONB;
v_distinct_count INT;
v_fk_col TEXT;
v_ref_table TEXT;
v_ref_table_exists BOOLEAN;
v_join_col TEXT;
v_ref_has_id BOOLEAN;
v_ref_has_name BOOLEAN;
v_reverse_rec RECORD;
BEGIN
v_safe_name := quote_ident(p_table_name);
-- Get schema
SELECT string_agg(column_name || ' (' || data_type || ')', ', ')
INTO v_columns
FROM information_schema.columns
WHERE table_name = p_table_name AND table_schema = 'public';
-- Get sample row
EXECUTE format('SELECT to_jsonb(t) FROM (SELECT * FROM public.%I LIMIT 1) t', v_safe_name)
INTO v_sample_row;
-- Get semantic metadata from structured_table
SELECT
description,
column_semantics
INTO v_description, v_semantics
FROM public.structured_table
WHERE table_name = p_table_name;
-- Get categorical values for columns with status/type/category in name
FOR v_cat_col IN
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = p_table_name
AND (
column_name LIKE '%status%' OR
column_name LIKE '%type%' OR
column_name LIKE '%category%' OR
column_name LIKE '%state%' OR
column_name LIKE '%priority%' OR
column_name LIKE '%level%'
)
LOOP
BEGIN
-- Check if column has reasonable number of distinct values
EXECUTE format('SELECT COUNT(DISTINCT %I) FROM public.%I', v_cat_col, v_safe_name)
INTO v_distinct_count;
-- Only include if 20 or fewer distinct values
IF v_distinct_count <= 20 THEN
EXECUTE format('SELECT jsonb_agg(DISTINCT %I ORDER BY %I) FROM public.%I',
v_cat_col, v_cat_col, v_safe_name)
INTO v_cat_values;
-- Add to categorical_values object
v_categorical_values := v_categorical_values || jsonb_build_object(v_cat_col, v_cat_values);
END IF;
EXCEPTION WHEN OTHERS THEN
-- Skip this column if any error
CONTINUE;
END;
END LOOP;
-- ========================================================================
-- PART 1: FORWARD FK DETECTION (this table references other tables)
-- ========================================================================
FOR v_fk_col IN
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = p_table_name
AND (
column_name LIKE '%\_id' OR -- customer_id, warehouse_id, manager_id
column_name LIKE '%\_name' -- manager_name, customer_name
)
AND column_name != 'id' -- Skip primary key
LOOP
-- Infer referenced table name
v_ref_table := regexp_replace(v_fk_col, '_(id|name)$', '');
-- Handle pluralization
-- carriers → carrier, employees → employee, warehouses → warehouse
IF v_ref_table ~ '(ss|us|ch|sh|x|z)es$' THEN
v_ref_table := regexp_replace(v_ref_table, 'es$', '');
ELSIF v_ref_table ~ 'ies$' THEN
v_ref_table := regexp_replace(v_ref_table, 'ies$', 'y');
ELSIF v_ref_table ~ 's$' THEN
v_ref_table := regexp_replace(v_ref_table, 's$', '');
END IF;
-- Add tbl_ prefix
v_ref_table := 'tbl_' || v_ref_table;
-- Check if referenced table exists
SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = 'public' AND tablename = v_ref_table
) INTO v_ref_table_exists;
IF v_ref_table_exists THEN
-- Determine join column in referenced table
v_join_col := NULL;
v_ref_has_id := FALSE;
v_ref_has_name := FALSE;
-- Check what columns the referenced table has
IF v_fk_col LIKE '%\_id' THEN
-- For FK columns ending in _id, look for matching ID column in ref table
-- customer_id → look for customer_id in tbl_customers
-- manager_id → look for employee_id in tbl_employees (special case)
SELECT bool_or(
column_name = v_fk_col OR
column_name = regexp_replace(v_ref_table, '^tbl_', '') || '_id'
)
INTO v_ref_has_id
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = v_ref_table;
IF v_ref_has_id THEN
-- Find the actual ID column name
SELECT column_name INTO v_join_col
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = v_ref_table
AND (column_name = v_fk_col OR
column_name = regexp_replace(v_ref_table, '^tbl_', '') || '_id')
LIMIT 1;
END IF;
END IF;
IF v_fk_col LIKE '%\_name' THEN
-- For FK columns ending in _name, look for 'name' column in ref table
SELECT bool_or(column_name = 'name')
INTO v_ref_has_name
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = v_ref_table;
IF v_ref_has_name THEN
v_join_col := 'name';
END IF;
END IF;
-- If we found a valid join column, add to related tables
IF v_join_col IS NOT NULL THEN
v_related_tables := v_related_tables || jsonb_build_array(
jsonb_build_object(
'table', v_ref_table,
'fk_column', v_fk_col,
'join_on', format('%I.%I = %I.%I',
p_table_name, v_fk_col,
v_ref_table, v_join_col),
'useful_columns', 'Details from ' || v_ref_table,
'use_when', format('Query mentions %s or asks about %s details',
regexp_replace(v_ref_table, '^tbl_', ''),
regexp_replace(v_fk_col, '_(id|name)$', ''))
)
);
END IF;
END IF;
END LOOP;
-- ========================================================================
-- PART 2: REVERSE FK DETECTION (other tables reference this table)
-- ========================================================================
-- Example: tbl_employees should know that tbl_warehouses.manager_name references it
FOR v_reverse_rec IN
SELECT DISTINCT
c.table_name,
c.column_name,
c.data_type
FROM information_schema.columns c
WHERE c.table_schema = 'public'
AND c.table_name LIKE 'tbl_%'
AND c.table_name != p_table_name
AND (
c.column_name LIKE '%\_id' OR
c.column_name LIKE '%\_name'
)
LOOP
v_ref_table := v_reverse_rec.table_name;
v_fk_col := v_reverse_rec.column_name;
v_join_col := NULL;
-- Extract the base entity name from the foreign key column
-- manager_id → manager, customer_name → customer
DECLARE
v_base_entity TEXT;
v_current_table_entity TEXT;
BEGIN
v_base_entity := regexp_replace(v_fk_col, '_(id|name)$', '');
v_current_table_entity := regexp_replace(p_table_name, '^tbl_', '');
-- Normalize pluralization for comparison
IF v_current_table_entity ~ 's$' THEN
v_current_table_entity := regexp_replace(v_current_table_entity, 's$', '');
END IF;
-- Check if this FK might reference the current table
-- Examples:
-- manager → employee (tbl_warehouses.manager_name → tbl_employees.name)
-- customer → customer (tbl_orders.customer_id → tbl_customers.customer_id)
-- employee → employee (tbl_employees.manager_id → tbl_employees.employee_id)
IF v_base_entity = v_current_table_entity OR
(v_base_entity = 'manager' AND v_current_table_entity = 'employee') OR
(v_base_entity = 'employee' AND v_current_table_entity = 'employee') THEN
-- Determine what column in current table this FK should join to
IF v_fk_col LIKE '%\_id' THEN
-- Look for matching ID column in current table
SELECT column_name INTO v_join_col
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = p_table_name
AND (
column_name = v_fk_col OR
column_name = p_table_name || '_id' OR
column_name = regexp_replace(p_table_name, '^tbl_', '') || '_id'
)
LIMIT 1;
ELSIF v_fk_col LIKE '%\_name' THEN
-- Look for 'name' column in current table
SELECT column_name INTO v_join_col
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = p_table_name
AND column_name = 'name'
LIMIT 1;
END IF;
-- If we found a matching join column, add to related tables
IF v_join_col IS NOT NULL THEN
-- Check if this relationship already exists (avoid duplicates from forward pass)
IF NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(v_related_tables) elem
WHERE elem->>'table' = v_ref_table
AND elem->>'fk_column' = v_fk_col
) THEN
v_related_tables := v_related_tables || jsonb_build_array(
jsonb_build_object(
'table', v_ref_table,
'fk_column', v_fk_col,
'join_on', format('%I.%I = %I.%I',
p_table_name, v_join_col,
v_ref_table, v_fk_col),
'useful_columns', 'Details from ' || v_ref_table,
'use_when', format('Query asks about %s that reference %s',
regexp_replace(v_ref_table, '^tbl_', ''),
regexp_replace(p_table_name, '^tbl_', ''))
)
);
END IF;
END IF;
END IF;
END;
END LOOP;
RETURN jsonb_build_object(
'table', p_table_name,
'schema', v_columns,
'sample', COALESCE(v_sample_row, '{}'::jsonb),
'description', v_description,
'column_semantics', COALESCE(v_semantics, '{}'::jsonb),
'categorical_values', v_categorical_values,
'related_tables', v_related_tables
);
EXCEPTION WHEN OTHERS THEN
RETURN jsonb_build_object('error', SQLERRM);
END $$;
GRANT EXECUTE ON FUNCTION public.get_table_context(text) TO service_role, authenticated, anon;
-- 7d. Hybrid Graph Search
CREATE OR REPLACE FUNCTION public.search_graph_hybrid(
p_entities TEXT[],
p_actions TEXT[],
p_limit INT DEFAULT 20
)
RETURNS TABLE(chunk_id BIGINT, content TEXT, score FLOAT8, strategy TEXT)
LANGUAGE plpgsql STABLE AS $$
DECLARE
v_has_actions BOOLEAN;
BEGIN
v_has_actions := (array_length(p_actions, 1) > 0);
RETURN QUERY
WITH relevant_nodes AS (
SELECT id FROM public.node
WHERE EXISTS (
SELECT 1 FROM unnest(p_entities) entity
WHERE public.node.key ILIKE '%' || entity || '%'
OR public.node.props->>'name' ILIKE '%' || entity || '%'
)
),
relevant_edges AS (
SELECT e.src, e.dst, e.type,
CASE
WHEN s.id IS NOT NULL AND d.id IS NOT NULL THEN 'entity-entity'
ELSE 'entity-action'
END as match_strategy,
CASE
WHEN s.id IS NOT NULL AND d.id IS NOT NULL THEN 2.0
ELSE 1.5
END as base_score
FROM public.edge e
LEFT JOIN relevant_nodes s ON e.src = s.id
LEFT JOIN relevant_nodes d ON e.dst = d.id
WHERE
(s.id IS NOT NULL AND d.id IS NOT NULL)
OR
(v_has_actions AND (s.id IS NOT NULL OR d.id IS NOT NULL) AND
EXISTS (SELECT 1 FROM unnest(p_actions) act WHERE e.type ILIKE act || '%')
)
),
hits AS (
SELECT
cn.chunk_id,
count(*) as mention_count,
max(base_score) as max_strategy_score,
string_agg(DISTINCT match_strategy, ', ') as strategies
FROM relevant_edges re
JOIN public.chunk_node cn ON cn.node_id = re.src
GROUP BY cn.chunk_id
)
SELECT
h.chunk_id,
c.text as content,
(log(h.mention_count + 1) * h.max_strategy_score)::float8 AS score,
h.strategies::text as strategy
FROM hits h
JOIN public.chunk c ON c.id = h.chunk_id
ORDER BY score DESC
LIMIT p_limit;
END $$;
-- 7e. Legacy Targeted Graph Search (RESTORED FOR COMPLETENESS)
CREATE OR REPLACE FUNCTION public.search_graph_targeted(
p_entities TEXT[],
p_actions TEXT[],
p_limit INT DEFAULT 20
)
RETURNS TABLE(chunk_id BIGINT, content TEXT, score FLOAT8)
LANGUAGE plpgsql STABLE AS $$
DECLARE
v_has_actions BOOLEAN;
BEGIN
v_has_actions := (array_length(p_actions, 1) > 0);
RETURN QUERY
WITH relevant_nodes AS (
SELECT id, props->>'name' as name
FROM public.node
WHERE EXISTS (
SELECT 1 FROM unnest(p_entities) entity
WHERE public.node.key ILIKE '%' || entity || '%'
OR public.node.props->>'name' ILIKE '%' || entity || '%'
)
),
relevant_edges AS (
SELECT e.src, e.dst, e.type
FROM public.edge e
JOIN relevant_nodes rn ON e.src = rn.id
),
hits AS (
SELECT
cn.chunk_id,
count(*) as mention_count
FROM relevant_edges re
JOIN public.chunk_node cn ON cn.node_id = re.src
GROUP BY cn.chunk_id
)
SELECT
h.chunk_id,
c.text as content,
(log(h.mention_count + 1) * 1.5)::float8 AS score
FROM hits h
JOIN public.chunk c ON c.id = h.chunk_id
ORDER BY score DESC
LIMIT p_limit;
END $$;
-- 7f. Graph Neighborhood (Update 4: Security Definer / Case Insensitive)
DROP FUNCTION IF EXISTS public.get_graph_neighborhood(text[]);
CREATE OR REPLACE FUNCTION public.get_graph_neighborhood(
p_entity_names TEXT[]
)
RETURNS TABLE(subject TEXT, action TEXT, object TEXT, context JSONB)
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public, pg_temp
AS $$
BEGIN
RETURN QUERY
WITH target_nodes AS (
SELECT id, key, props->>'name' as name
FROM public.node
WHERE
-- 1. Direct Key Match
key = ANY(p_entity_names)
-- 2. Name Match (Case Insensitive, Trimmed)
OR lower(trim(props->>'name')) = ANY(
SELECT lower(trim(x)) FROM unnest(p_entity_names) x
)
-- 3. Fuzzy Match (Substring)
OR EXISTS (
SELECT 1 FROM unnest(p_entity_names) term
WHERE length(term) > 3
AND public.node.props->>'name' ILIKE '%' || term || '%'
)
)
-- Outgoing Edges
SELECT n1.props->>'name', e.type, n2.props->>'name', e.props
FROM target_nodes tn
JOIN public.edge e ON tn.id = e.src
JOIN public.node n1 ON e.src = n1.id
JOIN public.node n2 ON e.dst = n2.id
UNION ALL
-- Incoming Edges
SELECT n1.props->>'name', e.type, n2.props->>'name', e.props
FROM target_nodes tn
JOIN public.edge e ON tn.id = e.dst
JOIN public.node n1 ON e.src = n1.id
JOIN public.node n2 ON e.dst = n2.id;
END $$;
GRANT EXECUTE ON FUNCTION public.get_graph_neighborhood(text[]) TO service_role, authenticated, anon;
-- 7g. Structured Search (Safe V6.1)
DROP FUNCTION IF EXISTS public.search_structured(text, int);
CREATE OR REPLACE FUNCTION public.search_structured(p_query_sql TEXT, p_limit INT DEFAULT 20)
RETURNS TABLE(table_name TEXT, row_data JSONB, score FLOAT8, rank INT)
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = public, pg_temp
AS $$
DECLARE
v_sql TEXT;
BEGIN
IF p_query_sql IS NULL OR length(trim(p_query_sql)) = 0 THEN RETURN; END IF;
v_sql := p_query_sql;
-- Sanitization
v_sql := regexp_replace(v_sql, '(\W)to\.([a-zA-Z0-9_]+)', '\1t_orders.\2', 'g');
v_sql := regexp_replace(v_sql, '\s+to\s+ON\s+', ' t_orders ON ', 'gi');
v_sql := regexp_replace(v_sql, '\s+AS\s+to\s+', ' AS t_orders ', 'gi');
v_sql := regexp_replace(v_sql, 'tbl_orders\s+to\s+', 'tbl_orders t_orders ', 'gi');
v_sql := regexp_replace(v_sql, '[;\s]+$', '');
RETURN QUERY EXECUTE format(
'WITH user_query AS (%s)
SELECT
''result''::text AS table_name,
to_jsonb(user_query.*) AS row_data,
1.0::float8 AS score,
(row_number() OVER ())::int AS rank
FROM user_query LIMIT %s',
v_sql, p_limit
);
EXCEPTION WHEN OTHERS THEN
RETURN QUERY SELECT 'ERROR'::text, jsonb_build_object('msg', SQLERRM, 'sql', v_sql), 1.0, 1;
END $$;
GRANT EXECUTE ON FUNCTION public.search_structured(text, int) TO service_role, authenticated, anon;
-- 7h. Smart Entity Detection (V9 - Composite Retrieval)
CREATE OR REPLACE FUNCTION public.detect_query_entities(
p_query TEXT
)
RETURNS TABLE(
entity_type TEXT,
table_name TEXT,
key_column TEXT,
key_value TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
-- Detect ORDER IDs (O followed by 5 digits)
IF p_query ~* 'O\d{5}' THEN
RETURN QUERY
SELECT
'order'::TEXT,
'tbl_orders'::TEXT,
'order_id'::TEXT,
(regexp_match(p_query, '(O\d{5})', 'i'))[1]::TEXT;
END IF;
-- Detect CUSTOMER IDs (CU followed by 3 digits)
IF p_query ~* 'CU\d{3}' THEN
RETURN QUERY
SELECT
'customer'::TEXT,
'tbl_customers'::TEXT,
'customer_id'::TEXT,
(regexp_match(p_query, '(CU\d{3})', 'i'))[1]::TEXT;
END IF;
-- Detect EMPLOYEE IDs (E followed by 3 digits)
IF p_query ~* 'E\d{3}' THEN
RETURN QUERY
SELECT
'employee'::TEXT,
'tbl_employees'::TEXT,
'employee_id'::TEXT,
(regexp_match(p_query, '(E\d{3})', 'i'))[1]::TEXT;
END IF;
-- Detect WAREHOUSE IDs (WH followed by 3 digits)
IF p_query ~* 'WH\d{3}' THEN
RETURN QUERY
SELECT
'warehouse'::TEXT,
'tbl_warehouses'::TEXT,
'warehouse_id'::TEXT,
(regexp_match(p_query, '(WH\d{3})', 'i'))[1]::TEXT;
END IF;
-- Detect CARRIER IDs (CR followed by 3 digits)
IF p_query ~* 'CR\d{3}' THEN
RETURN QUERY
SELECT
'carrier'::TEXT,
'tbl_carriers'::TEXT,
'carrier_id'::TEXT,
(regexp_match(p_query, '(CR\d{3})', 'i'))[1]::TEXT;
END IF;
RETURN;
END;
$$;
GRANT EXECUTE ON FUNCTION public.detect_query_entities TO anon, authenticated, service_role;
-- 7i. Context Enrichment (V9 - Composite Retrieval)
CREATE OR REPLACE FUNCTION public.enrich_query_context(
p_primary_table TEXT,
p_primary_key TEXT,
p_primary_value TEXT
)
RETURNS TABLE(
enrichment_type TEXT,
table_name TEXT,
row_data JSONB,
relationship TEXT
)
LANGUAGE plpgsql
AS $$
DECLARE
v_customer_id TEXT;
v_warehouse_id TEXT;
v_employee_id TEXT;
v_carrier_id TEXT;
v_primary_row JSONB;
BEGIN
-- ====================================================
-- STEP 1: GET PRIMARY ROW
-- ====================================================
EXECUTE format(
'SELECT to_jsonb(t.*) FROM public.%I t WHERE %I = $1 LIMIT 1',
p_primary_table,
p_primary_key
) INTO v_primary_row USING p_primary_value;
IF v_primary_row IS NULL THEN
RETURN;
END IF;
RETURN QUERY SELECT
'primary'::TEXT,
p_primary_table,
v_primary_row,
'direct_match'::TEXT;
-- ====================================================
-- STEP 2: FOLLOW FOREIGN KEYS (ENRICH!)
-- ====================================================
-- ORDERS TABLE
IF p_primary_table = 'tbl_orders' THEN
v_customer_id := v_primary_row->>'customer_id';
v_warehouse_id := v_primary_row->>'warehouse_id';
v_carrier_id := v_primary_row->>'carrier_id';
-- Customer
IF v_customer_id IS NOT NULL THEN
RETURN QUERY
SELECT 'enrichment'::TEXT, 'tbl_customers'::TEXT, to_jsonb(c.*), 'order_customer'::TEXT
FROM public.tbl_customers c WHERE c.customer_id = v_customer_id;
-- Customer History
RETURN QUERY
SELECT 'related_orders'::TEXT, 'tbl_orders'::TEXT, to_jsonb(o.*), 'customer_history'::TEXT
FROM public.tbl_orders o WHERE o.customer_id = v_customer_id AND o.order_id != p_primary_value
ORDER BY o.order_date DESC LIMIT 5;
END IF;
-- Warehouse
IF v_warehouse_id IS NOT NULL THEN
RETURN QUERY
SELECT 'enrichment'::TEXT, 'tbl_warehouses'::TEXT, to_jsonb(w.*), 'order_warehouse'::TEXT
FROM public.tbl_warehouses w WHERE w.warehouse_id = v_warehouse_id;
END IF;
-- Carrier
IF v_carrier_id IS NOT NULL THEN
RETURN QUERY
SELECT 'enrichment'::TEXT, 'tbl_carriers'::TEXT, to_jsonb(cr.*), 'order_carrier'::TEXT
FROM public.tbl_carriers cr WHERE cr.carrier_id = v_carrier_id;
END IF;
END IF;
-- CUSTOMERS TABLE
IF p_primary_table = 'tbl_customers' THEN
RETURN QUERY
SELECT 'enrichment'::TEXT, 'tbl_orders'::TEXT, to_jsonb(o.*), 'customer_orders'::TEXT
FROM public.tbl_orders o WHERE o.customer_id = p_primary_value
ORDER BY o.order_date DESC LIMIT 10;
END IF;
-- EMPLOYEES TABLE
IF p_primary_table = 'tbl_employees' THEN
v_employee_id := v_primary_row->>'employee_id';
RETURN QUERY
SELECT 'enrichment'::TEXT, 'tbl_employees'::TEXT, to_jsonb(e.*), 'direct_reports'::TEXT
FROM public.tbl_employees e WHERE e.manager_id = v_employee_id;
RETURN QUERY
SELECT 'enrichment'::TEXT, 'tbl_employees'::TEXT, to_jsonb(m.*), 'manager'::TEXT
FROM public.tbl_employees m WHERE m.employee_id = (v_primary_row->>'manager_id');
END IF;
-- WAREHOUSES TABLE
IF p_primary_table = 'tbl_warehouses' THEN
RETURN QUERY
SELECT 'enrichment'::TEXT, 'tbl_orders'::TEXT, to_jsonb(o.*), 'warehouse_orders'::TEXT
FROM public.tbl_orders o WHERE o.warehouse_id = p_primary_value
ORDER BY o.order_date DESC LIMIT 10;
END IF;
RETURN;
END;
$$;
GRANT EXECUTE ON FUNCTION public.enrich_query_context TO anon, authenticated, service_role;
-- ===============================================================
-- PART 8: CONFIGURATION SYSTEM
-- ===============================================================
INSERT INTO public.app_config (id, settings)
VALUES (1, '{
"chunk_size": 500,
"chunk_overlap": 100,
"graph_sample_rate": 5,
"worker_batch_size": 5,
"model_router": "gemini-2.5-flash",
"model_reranker": "gemini-2.5-flash-lite",
"model_sql": "gemini-2.5-flash",
"model_extraction": "gemini-2.5-flash",
"rrf_weight_enrichment": 15.0,
"rrf_weight_sql": 10.0,
"rrf_weight_graph": 5.0,
"rrf_weight_fts": 3.0,
"rrf_weight_vector": 1.0,
"rerank_depth": 15,
"min_vector_score": 0.01,
"search_limit": 10
}'::jsonb)
ON CONFLICT (id) DO NOTHING;
CREATE OR REPLACE FUNCTION public.configure_system(
p_chunk_size INT DEFAULT NULL,
p_chunk_overlap INT DEFAULT NULL,
p_graph_sample_rate INT DEFAULT NULL,
p_worker_batch_size INT DEFAULT NULL,
p_model_router TEXT DEFAULT NULL,
p_model_reranker TEXT DEFAULT NULL,
p_model_extraction TEXT DEFAULT NULL,
p_search_limit INT DEFAULT NULL,
p_rerank_depth INT DEFAULT NULL,
p_rrf_weight_vector NUMERIC DEFAULT NULL,
p_rrf_weight_graph NUMERIC DEFAULT NULL,
p_rrf_weight_sql NUMERIC DEFAULT NULL
)
RETURNS TEXT LANGUAGE plpgsql SECURITY DEFINER AS $$
DECLARE
current_settings JSONB;
new_settings JSONB;
BEGIN
SELECT settings INTO current_settings FROM public.app_config WHERE id = 1;
new_settings := current_settings;
IF p_chunk_size IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{chunk_size}', to_jsonb(p_chunk_size)); END IF;
IF p_chunk_overlap IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{chunk_overlap}', to_jsonb(p_chunk_overlap)); END IF;
IF p_graph_sample_rate IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{graph_sample_rate}', to_jsonb(p_graph_sample_rate)); END IF;
IF p_worker_batch_size IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{worker_batch_size}', to_jsonb(p_worker_batch_size)); END IF;
IF p_model_router IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{model_router}', to_jsonb(p_model_router)); END IF;
IF p_model_reranker IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{model_reranker}', to_jsonb(p_model_reranker)); END IF;
IF p_model_extraction IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{model_extraction}', to_jsonb(p_model_extraction)); END IF;
IF p_search_limit IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{search_limit}', to_jsonb(p_search_limit)); END IF;
IF p_rerank_depth IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{rerank_depth}', to_jsonb(p_rerank_depth)); END IF;
IF p_rrf_weight_vector IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{rrf_weight_vector}', to_jsonb(p_rrf_weight_vector)); END IF;
IF p_rrf_weight_graph IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{rrf_weight_graph}', to_jsonb(p_rrf_weight_graph)); END IF;
IF p_rrf_weight_sql IS NOT NULL THEN new_settings := jsonb_set(new_settings, '{rrf_weight_sql}', to_jsonb(p_rrf_weight_sql)); END IF;
UPDATE public.app_config SET settings = new_settings, updated_at = now() WHERE id = 1;
RETURN 'System configuration updated successfully.';
END;
$$;
-- ===============================================================
-- PART 9: GRAPH CONTEXT RERANKER
-- ===============================================================
DROP FUNCTION IF EXISTS public.get_graph_context(bigint[], text[]);
CREATE OR REPLACE FUNCTION public.get_graph_context(
p_chunk_ids BIGINT[],
p_keywords TEXT[] DEFAULT '{}',
p_actions TEXT[] DEFAULT '{}'
)
RETURNS TABLE (chunk_id BIGINT, graph_data JSONB)
LANGUAGE sql STABLE AS $$
WITH raw_edges AS (
SELECT
cn_src.chunk_id,
jsonb_build_object(
'subject', n1.props->>'name',
'action', e.type,
'object', n2.props->>'name',
'context', e.props
) as edge_json,
(
0
+ (SELECT COALESCE(MAX(CASE WHEN n1.props->>'name' ILIKE '%' || kw || '%' THEN 10 ELSE 0 END),0) FROM unnest(p_keywords) kw)
+ (SELECT COALESCE(MAX(CASE WHEN n2.props->>'name' ILIKE '%' || kw || '%' THEN 10 ELSE 0 END),0) FROM unnest(p_keywords) kw)
+ (SELECT COALESCE(MAX(CASE WHEN array_length(p_actions, 1) > 0 AND e.type ILIKE act || '%' THEN 20 ELSE 0 END),0) FROM unnest(p_actions) act)
) as relevance_score
FROM public.chunk_node cn_src
JOIN public.edge e ON cn_src.node_id = e.src
JOIN public.chunk_node cn_tgt ON e.dst = cn_tgt.node_id AND cn_src.chunk_id = cn_tgt.chunk_id
JOIN public.node n1 ON e.src = n1.id
JOIN public.node n2 ON e.dst = n2.id
WHERE cn_src.chunk_id = ANY(p_chunk_ids)
),
ranked_edges AS (
SELECT
chunk_id,
edge_json,
relevance_score,
ROW_NUMBER() OVER (PARTITION BY chunk_id ORDER BY relevance_score DESC, length(edge_json::text) ASC) as rn
FROM raw_edges
)
SELECT
chunk_id,
jsonb_agg(edge_json) as graph_data
FROM ranked_edges
WHERE rn <= 5
GROUP BY chunk_id;
$$;
-- ===============================================================
-- PART 10: WORKER SETUP
-- ===============================================================
CREATE OR REPLACE FUNCTION public.setup_worker(project_url TEXT, service_role_key TEXT)
RETURNS TEXT LANGUAGE plpgsql SECURITY DEFINER AS $$
BEGIN
EXECUTE format(
$f$
CREATE OR REPLACE FUNCTION public.trigger_ingest_worker()
RETURNS TRIGGER LANGUAGE plpgsql SECURITY DEFINER AS $func$
BEGIN
PERFORM net.http_post(
url := '%s/functions/v1/ingest-worker',
headers := jsonb_build_object('Content-Type', 'application/json', 'Authorization', 'Bearer %s'),
body := jsonb_build_object('type', 'INSERT', 'table', 'objects', 'schema', 'storage', 'record', row_to_json(NEW))
);
RETURN NEW;
END;
$func$;
$f$,
project_url, service_role_key
);
DROP TRIGGER IF EXISTS "trigger-ingest-worker" ON storage.objects;
CREATE TRIGGER "trigger-ingest-worker"
AFTER INSERT ON storage.objects
FOR EACH ROW
WHEN (NEW.bucket_id = 'raw_uploads')
EXECUTE FUNCTION public.trigger_ingest_worker();
RETURN 'Worker configured successfully!';
END;
$$;
-- ===============================================================
-- PART 11: PERMISSIONS & REPAIRS
-- ===============================================================
GRANT USAGE ON SCHEMA public TO service_role, authenticated, anon;
GRANT ALL ON ALL TABLES IN SCHEMA public TO service_role, authenticated;
GRANT ALL ON ALL SEQUENCES IN SCHEMA public TO service_role, authenticated;
GRANT ALL ON TABLE public.ingestion_queue TO service_role, postgres, anon, authenticated;
GRANT SELECT ON TABLE public.app_config TO anon, authenticated, service_role;
GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA public TO authenticated, service_role;
-- 🚨 REPAIR SCRIPT: Fix permissions for any EXISTING "tbl_" spreadsheets
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN (SELECT tablename FROM pg_tables WHERE schemaname = 'public' AND tablename LIKE 'tbl_%')
LOOP
EXECUTE format('GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE public.%I TO service_role', r.tablename);
EXECUTE format('GRANT SELECT ON TABLE public.%I TO anon, authenticated', r.tablename);
END LOOP;
END $$;
Step 2. This is minor but important. If you upload a BIG file, it needs to be able to process in the background. So in order to enable background functions you need to set up a worker. In a new query, paste this and then fill in your information before running it:
-- insert your url and service worker secret
SELECT setup_worker(
'https://YOUR URL.supabase.co',
'ey**YOUR SERVICE WORKER SERET***Tc'
);
Step 3. If you want to adjust weighting and other parameters, run any of these queries with your desired adjustments:
-- ===============================================================
-- CONTEXT MESH: SIMPLE CONFIGURATION SCRIPT
-- ===============================================================
-- Instructions: Replace the default values below and run any line
-- Each parameter can be updated independently
-- ===============================================================
-- ============================================================
-- INGESTION PARAMETERS
-- ============================================================
SELECT public.configure_system(p_chunk_size => 500);
-- What it does: Number of characters per document chunk
-- Suggestions:
-- Higher (800-1000) = Better context, slower processing
-- Lower (300-400) = Faster processing, more precise retrieval
-- Default (500) = Balanced performance
SELECT public.configure_system(p_chunk_overlap => 100);
-- What it does: Character overlap between adjacent chunks
-- Suggestions:
-- Higher (150-200) = Better continuity, more redundancy
-- Lower (50-75) = Less redundancy, faster processing
-- Default (100) = Balanced overlap
SELECT public.configure_system(p_graph_sample_rate => 5);
-- What it does: Extract graph relationships from every Nth chunk
-- Suggestions:
-- Higher (10-15) = Faster ingestion, sparser graph
-- Lower (1-3) = Dense graph, slower ingestion
-- Default (5) = Balanced graph density
SELECT public.configure_system(p_worker_batch_size => 5);
-- What it does: Queue items processed per worker batch
-- Suggestions:
-- Higher (10-20) = Faster bulk ingestion, higher memory
-- Lower (1-3) = Slower ingestion, lower memory
-- Default (5) = Balanced throughput
-- ============================================================
-- MODEL SELECTION
-- ============================================================
SELECT public.configure_system(p_model_router => 'gemini-2.5-flash');
-- What it does: LLM for query routing and entity extraction
-- Suggestions:
-- 'gemini-2.5-flash' = Fast, cost-effective (default)
-- 'gemini-2.0-flash-exp' = Experimental, free tier
-- 'gemini-2.5-pro' = Most accurate, expensive
SELECT public.configure_system(p_model_reranker => 'gemini-2.5-flash-lite');
-- What it does: LLM for reranking documents by relevance
-- Suggestions:
-- 'gemini-2.5-flash-lite' = Ultra-fast, cheap (default)
-- 'gemini-2.5-flash' = More accurate, slightly slower
-- 'gemini-2.0-flash-exp' = Free tier option
SELECT public.configure_system(p_model_sql => 'gemini-2.5-flash');
-- What it does: LLM for generating SQL queries
-- Suggestions:
-- 'gemini-2.5-flash' = Good balance (default)
-- 'gemini-2.5-pro' = Complex queries, better accuracy
-- 'gemini-2.0-flash-exp' = Free tier option
-- ============================================================
-- SEARCH WEIGHTS (RRF Fusion)
-- ============================================================
-- Higher weight = More influence in final ranking
-- ============================================================
SELECT public.configure_system(p_rrf_weight_enrichment => 15.0);
-- What it does: Weight for composite enrichment (SQL + FK relationships)
-- Suggestions:
-- Higher (20-30) = Prioritize structured data context
-- Lower (10-12) = Balance with documents
-- Default (15.0) = Highest priority (recommended)
SELECT public.configure_system(p_rrf_weight_sql => 10.0);
-- What it does: Weight for direct SQL query results
-- Suggestions:
-- Higher (15-20) = Prioritize exact matches
-- Lower (5-8) = More exploratory results
-- Default (10.0) = Strong influence
SELECT public.configure_system(p_rrf_weight_graph => 5.0);
-- What it does: Weight for knowledge graph relationships
-- Suggestions:
-- Higher (8-12) = More relationship context
-- Lower (2-4) = Focus on direct matches
-- Default (5.0) = Moderate context
SELECT public.configure_system(p_rrf_weight_fts => 3.0);
-- What it does: Weight for full-text keyword search
-- Suggestions:
-- Higher (5-7) = Better keyword matching
-- Lower (1-2) = Favor semantic search
-- Default (3.0) = Balanced keyword influence
SELECT public.configure_system(p_rrf_weight_vector => 1.0);
-- What it does: Weight for vector similarity search
-- Suggestions:
-- Higher (2-5) = More semantic similarity
-- Lower (0.5-0.8) = Favor exact matches
-- Default (1.0) = Lowest priority (as designed)
-- ============================================================
-- SEARCH THRESHOLDS
-- ============================================================
SELECT public.configure_system(p_rerank_depth => 15);
-- What it does: Number of documents sent to LLM reranker
-- Suggestions:
-- Higher (20-30) = Better accuracy, slower, more expensive
-- Lower (5-10) = Faster, cheaper, less accurate
-- Default (15) = Good balance
SELECT public.configure_system(p_search_limit => 10);
-- What it does: Maximum results returned to user
-- Suggestions:
-- Higher (20-30) = More comprehensive results
-- Lower (5-8) = Faster response, focused results
-- Default (10) = Standard result count
-- ============================================================
-- BATCH UPDATE (Update multiple at once)
-- ============================================================
-- Example: Update all weights at once
SELECT public.configure_system(
p_rrf_weight_enrichment => 15.0,
p_rrf_weight_sql => 10.0,
p_rrf_weight_graph => 5.0,
p_rrf_weight_fts => 3.0,
p_rrf_weight_vector => 1.0
);
-- Example: Update all models at once
SELECT public.configure_system(
p_model_router => 'gemini-2.5-flash',
p_model_reranker => 'gemini-2.5-flash-lite',
p_model_sql => 'gemini-2.5-flash',
p_model_extraction => 'gemini-2.5-flash'
);
-- ============================================================
-- VIEW CURRENT SETTINGS
-- ============================================================
SELECT jsonb_pretty(settings) FROM public.app_config WHERE id = 1;
-- ============================================================
-- RESET TO DEFAULTS
-- ============================================================
UPDATE public.app_config
SET settings = '{
"chunk_size": 500,
"chunk_overlap": 100,
"graph_sample_rate": 5,
"worker_batch_size": 5,
"model_router": "gemini-2.5-flash",
"model_reranker": "gemini-2.5-flash-lite",
"model_sql": "gemini-2.5-flash",
"model_extraction": "gemini-2.5-flash",
"rrf_weight_enrichment": 15.0,
"rrf_weight_sql": 10.0,
"rrf_weight_graph": 5.0,
"rrf_weight_fts": 3.0,
"rrf_weight_vector": 1.0,
"rerank_depth": 15,
"min_vector_score": 0.01,
"search_limit": 10
}'::jsonb
WHERE id = 1;
Step 4. Now go over to Edge Functions. Create the first one. Make sure you name it 'ingest-intelligent'...PLEASE NOTE, YOU MUST SET A SECRET FOR GOOGLE_API_KEY WITH YOUR GEMINI API KEY FOR THESE EDGE FUNCTIONS TO WORK :
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type'
};
// --- CONFIG LOADER ---
async function getConfig(supabase) {
const { data } = await supabase.from('app_config').select('settings').single();
const defaults = {
chunk_size: 600,
chunk_overlap: 100
};
return {
...defaults,
...data && data.settings ? data.settings : {}
};
}
// --- HELPER 1: SEMANTIC CHUNKER ---
function semanticChunker(text, maxSize = 600, overlap = 50) {
const cleanText = text.replace(/\r\n/g, "\n");
const separators = [
"\n\n",
"\n",
". ",
"? ",
"! ",
" "
];
function splitRecursive(input) {
if (input.length <= maxSize) return [
input
];
let splitBy = "";
for (const sep of separators) {
if (input.includes(sep)) {
splitBy = sep;
break;
}
}
if (!splitBy) {
const chunks = [];
for (let i = 0; i < input.length; i += maxSize) {
chunks.push(input.slice(i, i + maxSize));
}
return chunks;
}
const parts = input.split(splitBy);
const finalChunks = [];
let current = "";
for (const part of parts) {
const p = splitBy.trim() === "" ? part : part + splitBy;
if (current.length + p.length > maxSize) {
if (current.trim()) finalChunks.push(current.trim());
if (p.length > maxSize) finalChunks.push(...splitRecursive(p));
else current = p;
} else {
current += p;
}
}
if (current.trim()) finalChunks.push(current.trim());
return finalChunks;
}
let chunks = splitRecursive(cleanText);
if (overlap > 0 && chunks.length > 1) {
const overlapped = [
chunks[0]
];
for (let i = 1; i < chunks.length; i++) {
const prev = chunks[i - 1];
const tail = prev.length > overlap ? prev.slice(-overlap) : prev;
const snap = tail.indexOf(" ");
const cleanTail = snap > -1 ? tail.slice(snap + 1) : tail;
overlapped.push(cleanTail + " ... " + chunks[i]);
}
return overlapped;
}
return chunks;
}
// --- GEMINI CALLER ---
async function callGemini(prompt, apiKey, model = "gemini-2.5-flash") {
try {
const response = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:generateContent?key=${apiKey}`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
contents: [
{
parts: [
{
text: prompt
}
]
}
],
generationConfig: {
temperature: 0.1,
responseMimeType: "application/json"
}
})
});
const data = await response.json();
const text = data.candidates && data.candidates[0] && data.candidates[0].content && data.candidates[0].content.parts && data.candidates[0].content.parts[0] && data.candidates[0].content.parts[0].text || "{}";
return JSON.parse(text);
} catch (e) {
console.error("Gemini Error:", e);
return {
nodes: [],
edges: []
};
}
}
// --- SMART GRAPH BUILDER ---
async function buildGraphGeneric(rows, tableName, apiKey) {
console.log(`[Graph] Building graph for ${tableName} (${rows.length} rows)`);
const hints = await analyzeRelationships(rows, tableName, apiKey);
if (hints && hints.length > 0) {
console.log(`[Graph] Using AI hints (${hints.length} relationships found)`);
return buildFromHints(rows, hints);
}
console.log(`[Graph] No AI hints, using generic heuristics`);
return buildFromHeuristics(rows, tableName);
}
// --- AI Analysis ---
async function analyzeRelationships(rows, tableName, apiKey) {
const sample = rows.slice(0, 5);
const headers = Object.keys(sample[0] || {});
const prompt = `You are a data relationship analyzer.
Table: ${tableName}
Columns: ${headers.join(', ')}
Sample Data: ${JSON.stringify(sample, null, 2)}
TASK: Identify HIGH-VALUE relationships ONLY.
✅ PRIORITIZE (High-Value Relationships):
1. **Person-to-Person**: Manager-employee, mentor-mentee, colleague relationships
- employee_id → manager_id = "REPORTS_TO"
- manager_id → employee_id = "MANAGES"
2. **Business Process**: Order-customer, shipment-warehouse, payment-account
- order_id → customer_id = "PLACED_BY"
- order_id → warehouse_id = "FULFILLED_FROM"
- shipment_id → carrier_id = "SHIPPED_BY"
3. **Ownership/Assignment**: Asset-owner, project-lead, task-assignee
- warehouse_id → manager_name = "MANAGED_BY"
- project_id → owner_id = "OWNED_BY"
❌ IGNORE (Low-Value Relationships):
1. **Generic Attributes**: HAS_STATUS, HAS_TYPE, HAS_CATEGORY
- order_id → order_status (this is an attribute, not a relationship)
- item_id → item_type (this is classification, not a relationship)
2. **Carrier/Infrastructure**: Unless directly person-related
- order_id → carrier_id (weak relationship, often just logistics)
3. **Self-References**: Same entity on both sides
- employee_id → employee_id (invalid)
RELATIONSHIP QUALITY CRITERIA:
- **High**: Connects two different entities with meaningful business relationship
- **Medium**: Connects entities but relationship is transactional
- **Low**: Just describes an attribute or status
OUTPUT RULES:
- Only return relationships with confidence "high" or "medium"
- Skip any relationship that just describes an attribute
- Focus on relationships between ENTITIES, not entity-to-attribute
Output ONLY valid JSON array:
[
{
"from_col": "source_column",
"to_col": "target_column",
"relationship": "VERB_DESCRIBING_RELATIONSHIP",
"confidence": "high",
"explanation": "Why this relationship is valuable"
}
]
If no HIGH-VALUE relationships exist, return empty array [].`;
try {
const result = await callGemini(prompt, apiKey, "gemini-2.5-flash");
if (Array.isArray(result)) return result;
if (result.relationships) return result.relationships;
return [];
} catch (e) {
console.error("[Graph] AI analysis failed:", e);
return [];
}
}
// --- V10: GRAPH EDGE QUALITY VALIDATOR ---
function validateGraphEdge(srcNode, dstNode, edgeType, context) {
const validation = {
valid: true,
reason: null,
priority: 'normal'
};
// Get clean names for comparison
const srcName = srcNode.props?.name || srcNode.key || '';
const dstName = dstNode.props?.name || dstNode.key || '';
// Rule 1: Reject self-referential edges
if (srcName === dstName) {
validation.valid = false;
validation.reason = `Self-referential: ${srcName} → ${srcName}`;
return validation;
}
// Rule 2: Reject if both nodes have same key prefix (duplicates)
const srcPrefix = srcNode.key?.split(':')[0];
const dstPrefix = dstNode.key?.split(':')[0];
if (srcPrefix === dstPrefix && srcName === dstName) {
validation.valid = false;
validation.reason = `Duplicate nodes: ${srcNode.key} → ${dstNode.key}`;
return validation;
}
// Rule 3: Define relationship priorities
const VALUABLE_RELATIONSHIPS = new Set([
'REPORTS_TO',
'MANAGES',
'WORKS_WITH',
'ASSIGNED_TO',
'PLACED_BY',
'FULFILLED_FROM',
'SHIPPED_BY'
]);
const LOW_VALUE_RELATIONSHIPS = new Set([
'HAS_CARRIER',
'HAS_STATUS',
'HAS_TYPE',
'HAS_CATEGORY'
]);
// Rule 4: Reject generic "HAS_*" relationships unless high priority
if (edgeType.startsWith('HAS_') && !VALUABLE_RELATIONSHIPS.has(edgeType)) {
if (LOW_VALUE_RELATIONSHIPS.has(edgeType)) {
validation.valid = false;
validation.reason = `Low-value relationship: ${edgeType}`;
return validation;
}
}
// Rule 5: Reject if context suggests it's inferred from ID column only
const contextStr = context?.context || context?.explanation || '';
if (contextStr.includes('Inferred from carrier_id') || contextStr.includes('Inferred from status') || contextStr.includes('Inferred from type')) {
validation.valid = false;
validation.reason = `Low-confidence inference: ${contextStr}`;
return validation;
}
// Rule 6: Boost person-to-person relationships
const isPersonToPerson = (srcNode.labels?.includes('Person') || srcNode.labels?.includes('Employee')) && (dstNode.labels?.includes('Person') || dstNode.labels?.includes('Employee'));
if (isPersonToPerson && VALUABLE_RELATIONSHIPS.has(edgeType)) {
validation.priority = 'high';
}
// Rule 7: Reject edges with missing names
if (!srcName || !dstName || srcName === 'undefined' || dstName === 'undefined') {
validation.valid = false;
validation.reason = `Missing names: src="${srcName}", dst="${dstName}"`;
return validation;
}
return validation;
}
// --- BUILD FROM HINTS (DEDUPLICATED) ---
function buildFromHints(rows, hints) {
const nodeMap = new Map();
const edgeMap = new Map();
const clean = (s) => String(s).toLowerCase().trim().replace(/[^a-z0-9]/g, "_");
// Build ID maps
const idMaps = {};
for (const hint of hints) {
if (hint.from_col.includes('_id') || hint.to_col.includes('_id')) {
const idCol = hint.from_col.includes('_id') ? hint.from_col : hint.to_col;
const nameCol = findNameColumn(rows[0], idCol);
if (nameCol) {
idMaps[idCol] = {};
rows.forEach((r) => {
if (r[idCol] && r[nameCol]) {
idMaps[idCol][r[idCol]] = r[nameCol];
}
});
}
}
}
// Process each row
for (const row of rows) {
for (const hint of hints) {
const fromVal = row[hint.from_col];
const toVal = row[hint.to_col];
if (!fromVal || !toVal) continue;
// ✅ FIXED: Removed optional chaining
const fromIdMap = idMaps[hint.from_col];
const toIdMap = idMaps[hint.to_col];
const resolvedFrom = fromIdMap && fromIdMap[fromVal] || fromVal;
const resolvedTo = toIdMap && toIdMap[toVal] || toVal;
const fromKey = `entity:${clean(String(resolvedFrom))}`;
const toKey = `entity:${clean(String(resolvedTo))}`;
if (!nodeMap.has(fromKey)) {
nodeMap.set(fromKey, {
key: fromKey,
labels: [
inferEntityType(hint.from_col)
],
props: {
name: String(resolvedFrom)
}
});
}
if (!nodeMap.has(toKey)) {
nodeMap.set(toKey, {
key: toKey,
labels: [
inferEntityType(hint.to_col)
],
props: {
name: String(resolvedTo)
}
});
}
// ✅ V10: VALIDATE EDGE BEFORE ADDING
const srcNode = nodeMap.get(fromKey);
const dstNode = nodeMap.get(toKey);
const edgeType = hint.relationship || 'RELATES_TO';
const edgeContext = {
context: hint.explanation || `${hint.from_col} → ${hint.to_col}`,
confidence: hint.confidence || 'medium'
};
const validation = validateGraphEdge(srcNode, dstNode, edgeType, edgeContext);
if (validation.valid) {
const edgeKey = `${fromKey}-${edgeType}-${toKey}`;
if (!edgeMap.has(edgeKey)) {
edgeMap.set(edgeKey, {
src_key: fromKey,
dst_key: toKey,
type: edgeType,
props: edgeContext
});
}
} else {
console.log(`[Graph Quality] REJECTED edge: ${validation.reason}`);
}
}
}
const nodes = Array.from(nodeMap.values());
const edges = Array.from(edgeMap.values());
console.log(`[Graph] Deduplicated: ${nodes.length} unique nodes, ${edges.length} unique edges`);
return {
nodes,
edges
};
}
// --- V10: NODE KEY NORMALIZER (PREVENTS DUPLICATES) ---
function normalizeNodeKey(tableName, entityName) {
const clean = (s) => String(s).toLowerCase().trim().replace(/[^a-z0-9]/g, "_");
// Always use tbl_ prefix for consistency
let normalizedTable = tableName;
if (!normalizedTable.startsWith('tbl_')) {
normalizedTable = `tbl_${tableName}`;
}
return `${normalizedTable}:${clean(String(entityName))}`;
}
// --- BUILD FROM HEURISTICS (DEDUPLICATED) ---
function buildFromHeuristics(rows, tableName) {
const nodeMap = new Map();
const edgeMap = new Map();
const entityRegistry = new Map();
const clean = (s) => String(s).toLowerCase().trim().replace(/[^a-z0-9]/g, "_");
const firstRow = rows[0] || {};
const columns = Object.keys(firstRow);
const idColumns = columns.filter((c) => {
return c.endsWith('_id') || c === 'id' || c.includes('identifier');
});
const nameColumns = columns.filter((c) => {
return c.includes('name') || c === 'title' || c === 'label';
});
if (nameColumns.length > 0) {
const primaryName = nameColumns[0];
rows.forEach((row) => {
const entityName = row[primaryName];
if (entityName) {
// ✅ V10: Use normalized key and check entity registry
const normalizedName = clean(String(entityName));
const key = normalizeNodeKey(tableName, entityName);
if (!entityRegistry.has(normalizedName)) {
entityRegistry.set(normalizedName, key); // Track this entity
nodeMap.set(key, {
key,
labels: [
tableName
],
props: {
name: String(entityName),
source: tableName
}
});
console.log(`[Graph Dedup] Created primary node: ${key}`);
} else {
console.log(`[Graph Dedup] Skipped duplicate primary: ${entityName} (already exists as ${entityRegistry.get(normalizedName)})`);
}
}
});
}
for (const col of idColumns) {
if (col === 'id') continue;
const referencedTable = col.replace(/_id$/, '');
const correspondingName = findNameColumn(firstRow, col);
if (correspondingName) {
rows.forEach((row) => {
const fkValue = row[col];
const fkName = row[correspondingName];
if (fkValue && fkName) {
// ✅ V10: Check if entity already exists before creating node
const normalizedFkName = clean(String(fkName));
let fkKey;
if (entityRegistry.has(normalizedFkName)) {
// Entity already exists, use existing key
fkKey = entityRegistry.get(normalizedFkName);
console.log(`[Graph Dedup] Reusing existing node: ${fkKey} for FK ${fkName}`);
} else {
// Create new node with normalized key
fkKey = normalizeNodeKey(referencedTable, fkName);
entityRegistry.set(normalizedFkName, fkKey);
nodeMap.set(fkKey, {
key: fkKey,
labels: [
referencedTable
],
props: {
name: String(fkName)
}
});
console.log(`[Graph Dedup] Created FK node: ${fkKey}`);
}
if (nameColumns.length > 0) {
const primaryName = row[nameColumns[0]];
if (primaryName) {
const primaryKey = `${tableName}:${clean(String(primaryName))}`;
// ✅ V10: VALIDATE HEURISTIC EDGE
const edgeType = `HAS_${referencedTable.toUpperCase()}`;
const srcNode = nodeMap.get(primaryKey);
const dstNode = nodeMap.get(fkKey);
const edgeContext = {
context: `Inferred from ${col}`
};
const validation = validateGraphEdge(srcNode, dstNode, edgeType, edgeContext);
if (validation.valid) {
const edgeKey = `${primaryKey}-${edgeType}-${fkKey}`;
if (!edgeMap.has(edgeKey)) {
edgeMap.set(edgeKey, {
src_key: primaryKey,
dst_key: fkKey,
type: edgeType,
props: edgeContext
});
}
} else {
console.log(`[Graph Quality] REJECTED heuristic edge: ${validation.reason}`);
}
}
}
}
});
}
}
const nodes = Array.from(nodeMap.values());
const edges = Array.from(edgeMap.values());
console.log(`[Graph] Heuristic mode: ${nodes.length} unique nodes, ${edges.length} unique edges`);
return {
nodes,
edges
};
}
// --- HELPER FUNCTIONS ---
function findNameColumn(row, idColumn) {
const keys = Object.keys(row);
const baseName = idColumn.replace(/_id$/, '');
const candidates = [
`${baseName}_name`,
`${baseName}_title`,
`${baseName}`,
'name',
'title',
'label'
];
for (const candidate of candidates) {
if (keys.includes(candidate)) return candidate;
}
return null;
}
function inferEntityType(columnName) {
if (columnName.includes('employee') || columnName.includes('person')) return 'Person';
if (columnName.includes('customer') || columnName.includes('client')) return 'Customer';
if (columnName.includes('warehouse') || columnName.includes('location')) return 'Location';
if (columnName.includes('product') || columnName.includes('item')) return 'Product';
if (columnName.includes('order')) return 'Order';
if (columnName.includes('company') || columnName.includes('organization')) return 'Organization';
return 'Entity';
}
async function analyzeTableSchema(tableName, rows, apiKey) {
const sample = rows.slice(0, 3);
const headers = Object.keys(sample[0] || {});
const prompt = `You are a data schema analyst. Analyze this table:
Table Name: ${tableName}
Columns: ${headers.join(', ')}
Sample Data: ${JSON.stringify(sample, null, 2)}
Provide:
1. description: One sentence explaining what this data represents
2. semantics: For each column, identify its semantic type. Options:
- person_name, company_name, location_name
- currency_amount, percentage, count
- date, datetime, duration
- identifier, category, description, status
3. graph_hints: Relationships that could form knowledge graph edges. Format:
[{"from_col": "manager_id", "to_col": "employee_id", "edge_type": "MANAGES", "confidence": "high"}]
Output ONLY valid JSON:
{
"description": "...",
"semantics": {"col1": "type", ...},
"graph_hints": [...]
}`;
try {
const result = await callGemini(prompt, apiKey, "gemini-2.5-flash");
return {
description: result.description || `Data table: ${tableName}`,
semantics: result.semantics || {},
graphHints: result.graph_hints || []
};
} catch (e) {
console.error("Schema analysis failed:", e);
return {
description: `Data table: ${tableName}`,
semantics: {},
graphHints: []
};
}
}
// --- MAIN LOGIC: SPREADSHEET ---
async function processSpreadsheetCore(uri, title, rows, env) {
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_ROLE_KEY);
console.log(`[Spreadsheet] Processing ${rows.length} rows for: ${title}`);
// 1. Clean Rows
const cleanRows = rows.map((row) => {
const newRow = {};
Object.keys(row).forEach((k) => {
const cleanKey = k.toLowerCase().trim().replace(/[^a-z0-9]/g, '_');
newRow[cleanKey] = row[k];
});
return newRow;
});
// 2. Schema Inference
const firstRow = cleanRows[0] || {};
const schema = {};
Object.keys(firstRow).forEach((k) => schema[k] = typeof firstRow[k]);
const safeName = title.toLowerCase().replace(/[^a-z0-9]/g, '_');
const tableName = `tbl_${safeName}`;
// 3. BUILD ID MAP
const idMap = {};
cleanRows.forEach((r) => {
if (r.employee_id && r.name) {
idMap[r.employee_id] = r.name;
}
});
// 4. ANALYZE SCHEMA
console.log(`[V8] Analyzing schema for: ${title}`);
const schemaAnalysis = await analyzeTableSchema(title, cleanRows, env.GOOGLE_API_KEY);
console.log(`[V8] Analysis complete:`, schemaAnalysis);
// 5. GENERATE GRAPH
console.log(`[Graph] Building graph using generic approach...`);
const { nodes: allNodes, edges: allEdges } = await buildGraphGeneric(cleanRows, tableName, env.GOOGLE_API_KEY);
console.log(`[Graph] Generated ${allNodes.length} nodes and ${allEdges.length} edges.`);
// 6. DB Insert
const BATCH_SIZE = 500;
for (let i = 0; i < cleanRows.length; i += BATCH_SIZE) {
const rowBatch = cleanRows.slice(i, i + BATCH_SIZE);
const nodesBatch = i === 0 ? allNodes : [];
const edgesBatch = i === 0 ? allEdges : [];
const { error } = await supabase.rpc('ingest_spreadsheet', {
p_uri: uri,
p_title: title,
p_table_name: safeName,
p_description: null,
p_rows: rowBatch,
p_schema: schema,
p_nodes: nodesBatch,
p_edges: edgesBatch
});
if (error) throw new Error(`Batch ${i} error: ${error.message}`);
}
// 7. SAVE METADATA
console.log(`[V8] Saving metadata for ${tableName}...`);
const { error: metaError } = await supabase.from('structured_table').update({
description: schemaAnalysis.description,
column_semantics: schemaAnalysis.semantics,
graph_hints: schemaAnalysis.graphHints,
sample_row: cleanRows[0]
}).eq('table_name', tableName);
if (metaError) {
console.error(`[V8] Metadata save failed:`, metaError);
} else {
console.log(`[V8] Metadata saved successfully`);
}
return {
success: true,
rows: cleanRows.length,
graph_nodes: allNodes.length,
metadata: schemaAnalysis
};
}
// --- MAIN HANDLER ---
serve(async (req) => {
if (req.method === 'OPTIONS') {
return new Response('ok', {
headers: corsHeaders
});
}
try {
const env = Deno.env.toObject();
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_ROLE_KEY);
const body = await req.json();
const { uri, title, text, data } = body;
if (!uri || !title) throw new Error("Missing 'uri' or 'title'");
if (!text && !data) throw new Error("Must provide 'text' or 'data'");
// DOC PATH
if (text) {
const config = await getConfig(supabase);
const chunks = semanticChunker(text, config.chunk_size, config.chunk_overlap);
const rows = chunks.map((chunk, idx) => ({
uri,
title,
chunk_index: idx,
chunk_text: chunk,
status: 'pending'
}));
for (let i = 0; i < rows.length; i += 100) {
const { error } = await supabase.from('ingestion_queue').insert(rows.slice(i, i + 100));
if (error) throw error;
}
fetch(`${env.SUPABASE_URL}/functions/v1/ingest-worker`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.SUPABASE_SERVICE_ROLE_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
action: 'start_processing'
})
}).catch((e) => console.error("Worker trigger failed", e));
return new Response(JSON.stringify({
success: true,
message: `Queued ${chunks.length} chunks.`
}), {
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
}
// DATA PATH
if (data) {
const payloadSize = JSON.stringify(data).length;
if (payloadSize < 40000) {
const result = await processSpreadsheetCore(uri, title, data, env);
return new Response(JSON.stringify(result), {
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
} else {
const fileName = `${Date.now()}_${uri.replace(/[^a-z0-9]/gi, '_')}.json`;
const { error } = await supabase.storage.from('raw_uploads').upload(fileName, JSON.stringify(body), {
contentType: 'application/json'
});
if (error) throw error;
return new Response(JSON.stringify({
status: "queued",
message: "Large file uploaded to background queue."
}), {
status: 202,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
}
}
} catch (error) {
return new Response(JSON.stringify({
error: error.message
}), {
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
}
});
Step 5. Create another Edge Function. Name this one 'ingest-worker':
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type'
};
// --- CONFIG LOADER ---
async function getConfig(supabase) {
const { data } = await supabase.from('app_config').select('settings').single();
const defaults = {
graph_sample_rate: 5,
worker_batch_size: 5,
model_extraction: "gemini-2.5-flash"
};
return {
...defaults,
...data?.settings || {}
};
}
// --- GEMINI CALLER ---
async function callGemini(prompt, apiKey, model) {
console.log(`[Gemini] Calling ${model}...`);
try {
const response = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:generateContent?key=${apiKey}`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
contents: [
{
parts: [
{
text: prompt
}
]
}
],
generationConfig: {
temperature: 0.1,
responseMimeType: "application/json"
}
})
});
if (!response.ok) {
console.error(`[Gemini] HTTP ${response.status}: ${response.statusText}`);
return {
description: "Analysis unavailable",
semantics: {},
graph_hints: []
};
}
const data = await response.json();
const text = data.candidates?.[0]?.content?.parts?.[0]?.text || "{}";
console.log(`[Gemini] Response received: ${text.substring(0, 100)}...`);
return JSON.parse(text);
} catch (e) {
console.error("[Gemini] Error:", e);
return {
description: "Analysis failed",
semantics: {},
graph_hints: []
};
}
}
// --- EMBEDDING HELPER ---
async function getEmbedding(text, apiKey) {
const response = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/gemini-embedding-001:embedContent?key=${apiKey}`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
model: "models/gemini-embedding-001",
content: {
parts: [
{
text
}
]
},
outputDimensionality: 768
})
});
const data = await response.json();
return data.embedding?.values || [];
}
// --- HELPER: SMART GRAPH BUILDER (LEGACY - FALLBACK) ---
async function buildGraphGeneric(rows, tableName, apiKey) {
console.log(`[Graph] Building graph for ${tableName} (${rows.length} rows)`);
// Step 1: Try AI Analysis
const hints = await analyzeRelationships(rows, tableName, apiKey);
if (hints && hints.length > 0) {
console.log(`[Graph] Using AI hints (${hints.length} relationships found)`);
return buildFromHints(rows, hints);
}
// Step 2: Generic Heuristic Fallback
console.log(`[Graph] No AI hints, using generic heuristics`);
return buildFromHeuristics(rows, tableName);
}
// --- AI Analysis (Enhanced Prompt) ---
async function analyzeRelationships(rows, tableName, apiKey) {
const sample = rows.slice(0, 5); // More samples = better analysis
const headers = Object.keys(sample[0] || {});
const prompt = `You are a data relationship analyzer.
Table: ${tableName}
Columns: ${headers.join(', ')}
Sample Data: ${JSON.stringify(sample, null, 2)}
TASK: Identify HIGH-VALUE relationships ONLY.
✅ PRIORITIZE (High-Value Relationships):
1. **Person-to-Person**: Manager-employee, mentor-mentee, colleague relationships
- employee_id → manager_id = "REPORTS_TO"
- manager_id → employee_id = "MANAGES"
2. **Business Process**: Order-customer, shipment-warehouse, payment-account
- order_id → customer_id = "PLACED_BY"
- order_id → warehouse_id = "FULFILLED_FROM"
- shipment_id → carrier_id = "SHIPPED_BY"
3. **Ownership/Assignment**: Asset-owner, project-lead, task-assignee
- warehouse_id → manager_name = "MANAGED_BY"
- project_id → owner_id = "OWNED_BY"
❌ IGNORE (Low-Value Relationships):
1. **Generic Attributes**: HAS_STATUS, HAS_TYPE, HAS_CATEGORY
- order_id → order_status (this is an attribute, not a relationship)
- item_id → item_type (this is classification, not a relationship)
2. **Carrier/Infrastructure**: Unless directly person-related
- order_id → carrier_id (weak relationship, often just logistics)
3. **Self-References**: Same entity on both sides
- employee_id → employee_id (invalid)
RELATIONSHIP QUALITY CRITERIA:
- **High**: Connects two different entities with meaningful business relationship
- **Medium**: Connects entities but relationship is transactional
- **Low**: Just describes an attribute or status
OUTPUT RULES:
- Only return relationships with confidence "high" or "medium"
- Skip any relationship that just describes an attribute
- Focus on relationships between ENTITIES, not entity-to-attribute
Output ONLY valid JSON array:
[
{
"from_col": "source_column",
"to_col": "target_column",
"relationship": "VERB_DESCRIBING_RELATIONSHIP",
"confidence": "high",
"explanation": "Why this relationship is valuable"
}
]
If no HIGH-VALUE relationships exist, return empty array [].`;
try {
const result = await callGemini(prompt, apiKey, "gemini-2.5-flash");
// Handle both array and object responses
if (Array.isArray(result)) return result;
if (result.relationships) return result.relationships;
return [];
} catch (e) {
console.error("[Graph] AI analysis failed:", e);
return [];
}
}
// --- V10: GRAPH EDGE QUALITY VALIDATOR ---
function validateGraphEdge(srcNode, dstNode, edgeType, context) {
const validation = {
valid: true,
reason: null,
priority: 'normal'
};
// Get clean names for comparison
const srcName = srcNode.props?.name || srcNode.key || '';
const dstName = dstNode.props?.name || dstNode.key || '';
// Rule 1: Reject self-referential edges
if (srcName === dstName) {
validation.valid = false;
validation.reason = `Self-referential: ${srcName} → ${srcName}`;
return validation;
}
// Rule 2: Reject if both nodes have same key prefix (duplicates)
const srcPrefix = srcNode.key?.split(':')[0];
const dstPrefix = dstNode.key?.split(':')[0];
if (srcPrefix === dstPrefix && srcName === dstName) {
validation.valid = false;
validation.reason = `Duplicate nodes: ${srcNode.key} → ${dstNode.key}`;
return validation;
}
// Rule 3: Define relationship priorities
const VALUABLE_RELATIONSHIPS = new Set([
'REPORTS_TO',
'MANAGES',
'WORKS_WITH',
'ASSIGNED_TO',
'PLACED_BY',
'FULFILLED_FROM',
'SHIPPED_BY'
]);
const LOW_VALUE_RELATIONSHIPS = new Set([
'HAS_CARRIER',
'HAS_STATUS',
'HAS_TYPE',
'HAS_CATEGORY'
]);
// Rule 4: Reject generic "HAS_*" relationships unless high priority
if (edgeType.startsWith('HAS_') && !VALUABLE_RELATIONSHIPS.has(edgeType)) {
if (LOW_VALUE_RELATIONSHIPS.has(edgeType)) {
validation.valid = false;
validation.reason = `Low-value relationship: ${edgeType}`;
return validation;
}
}
// Rule 5: Reject if context suggests it's inferred from ID column only
const contextStr = context?.context || context?.explanation || '';
if (contextStr.includes('Inferred from carrier_id') || contextStr.includes('Inferred from status') || contextStr.includes('Inferred from type')) {
validation.valid = false;
validation.reason = `Low-confidence inference: ${contextStr}`;
return validation;
}
// Rule 6: Boost person-to-person relationships
const isPersonToPerson = (srcNode.labels?.includes('Person') || srcNode.labels?.includes('Employee')) && (dstNode.labels?.includes('Person') || dstNode.labels?.includes('Employee'));
if (isPersonToPerson && VALUABLE_RELATIONSHIPS.has(edgeType)) {
validation.priority = 'high';
}
// Rule 7: Reject edges with missing names
if (!srcName || !dstName || srcName === 'undefined' || dstName === 'undefined') {
validation.valid = false;
validation.reason = `Missing names: src="${srcName}", dst="${dstName}"`;
return validation;
}
return validation;
}
// --- BUILD FROM HINTS (DEDUPLICATED) ---
function buildFromHints(rows, hints) {
const nodeMap = new Map();
const edgeMap = new Map();
const clean = (s) => String(s).toLowerCase().trim().replace(/[^a-z0-9]/g, "_");
// Build ID maps
const idMaps = {};
for (const hint of hints) {
if (hint.from_col.includes('_id') || hint.to_col.includes('_id')) {
const idCol = hint.from_col.includes('_id') ? hint.from_col : hint.to_col;
const nameCol = findNameColumn(rows[0], idCol);
if (nameCol) {
idMaps[idCol] = {};
rows.forEach((r) => {
if (r[idCol] && r[nameCol]) {
idMaps[idCol][r[idCol]] = r[nameCol];
}
});
}
}
}
// Process each row
for (const row of rows) {
for (const hint of hints) {
const fromVal = row[hint.from_col];
const toVal = row[hint.to_col];
if (!fromVal || !toVal) continue;
// ✅ FIXED: Removed optional chaining
const fromIdMap = idMaps[hint.from_col];
const toIdMap = idMaps[hint.to_col];
const resolvedFrom = fromIdMap && fromIdMap[fromVal] || fromVal;
const resolvedTo = toIdMap && toIdMap[toVal] || toVal;
const fromKey = `entity:${clean(String(resolvedFrom))}`;
const toKey = `entity:${clean(String(resolvedTo))}`;
if (!nodeMap.has(fromKey)) {
nodeMap.set(fromKey, {
key: fromKey,
labels: [
inferEntityType(hint.from_col)
],
props: {
name: String(resolvedFrom)
}
});
}
if (!nodeMap.has(toKey)) {
nodeMap.set(toKey, {
key: toKey,
labels: [
inferEntityType(hint.to_col)
],
props: {
name: String(resolvedTo)
}
});
}
// ✅ V10: VALIDATE EDGE BEFORE ADDING
const srcNode = nodeMap.get(fromKey);
const dstNode = nodeMap.get(toKey);
const edgeType = hint.relationship || 'RELATES_TO';
const edgeContext = {
context: hint.explanation || `${hint.from_col} → ${hint.to_col}`,
confidence: hint.confidence || 'medium'
};
const validation = validateGraphEdge(srcNode, dstNode, edgeType, edgeContext);
if (validation.valid) {
const edgeKey = `${fromKey}-${edgeType}-${toKey}`;
if (!edgeMap.has(edgeKey)) {
edgeMap.set(edgeKey, {
src_key: fromKey,
dst_key: toKey,
type: edgeType,
props: edgeContext
});
}
} else {
console.log(`[Graph Quality] REJECTED edge: ${validation.reason}`);
}
}
}
const nodes = Array.from(nodeMap.values());
const edges = Array.from(edgeMap.values());
console.log(`[Graph] Deduplicated: ${nodes.length} unique nodes, ${edges.length} unique edges`);
return {
nodes,
edges
};
}
// --- V10: NODE KEY NORMALIZER (PREVENTS DUPLICATES) ---
function normalizeNodeKey(tableName, entityName) {
const clean = (s) => String(s).toLowerCase().trim().replace(/[^a-z0-9]/g, "_");
// Always use tbl_ prefix for consistency
let normalizedTable = tableName;
if (!normalizedTable.startsWith('tbl_')) {
normalizedTable = `tbl_${tableName}`;
}
return `${normalizedTable}:${clean(String(entityName))}`;
}
// --- BUILD FROM HEURISTICS (DEDUPLICATED) ---
function buildFromHeuristics(rows, tableName) {
const nodeMap = new Map();
const edgeMap = new Map();
const entityRegistry = new Map();
const clean = (s) => String(s).toLowerCase().trim().replace(/[^a-z0-9]/g, "_");
const firstRow = rows[0] || {};
const columns = Object.keys(firstRow);
const idColumns = columns.filter((c) => {
return c.endsWith('_id') || c === 'id' || c.includes('identifier');
});
const nameColumns = columns.filter((c) => {
return c.includes('name') || c === 'title' || c === 'label';
});
if (nameColumns.length > 0) {
const primaryName = nameColumns[0];
rows.forEach((row) => {
const entityName = row[primaryName];
if (entityName) {
// ✅ V10: Use normalized key and check entity registry
const normalizedName = clean(String(entityName));
const key = normalizeNodeKey(tableName, entityName);
if (!entityRegistry.has(normalizedName)) {
entityRegistry.set(normalizedName, key); // Track this entity
nodeMap.set(key, {
key,
labels: [
tableName
],
props: {
name: String(entityName),
source: tableName
}
});
console.log(`[Graph Dedup] Created primary node: ${key}`);
} else {
console.log(`[Graph Dedup] Skipped duplicate primary: ${entityName} (already exists as ${entityRegistry.get(normalizedName)})`);
}
}
});
}
for (const col of idColumns) {
if (col === 'id') continue;
const referencedTable = col.replace(/_id$/, '');
const correspondingName = findNameColumn(firstRow, col);
if (correspondingName) {
rows.forEach((row) => {
const fkValue = row[col];
const fkName = row[correspondingName];
if (fkValue && fkName) {
// ✅ V10: Check if entity already exists before creating node
const normalizedFkName = clean(String(fkName));
let fkKey;
if (entityRegistry.has(normalizedFkName)) {
// Entity already exists, use existing key
fkKey = entityRegistry.get(normalizedFkName);
console.log(`[Graph Dedup] Reusing existing node: ${fkKey} for FK ${fkName}`);
} else {
// Create new node with normalized key
fkKey = normalizeNodeKey(referencedTable, fkName);
entityRegistry.set(normalizedFkName, fkKey);
nodeMap.set(fkKey, {
key: fkKey,
labels: [
referencedTable
],
props: {
name: String(fkName)
}
});
console.log(`[Graph Dedup] Created FK node: ${fkKey}`);
}
if (nameColumns.length > 0) {
const primaryName = row[nameColumns[0]];
if (primaryName) {
const primaryKey = `${tableName}:${clean(String(primaryName))}`;
// ✅ V10: VALIDATE HEURISTIC EDGE
const edgeType = `HAS_${referencedTable.toUpperCase()}`;
const srcNode = nodeMap.get(primaryKey);
const dstNode = nodeMap.get(fkKey);
const edgeContext = {
context: `Inferred from ${col}`
};
const validation = validateGraphEdge(srcNode, dstNode, edgeType, edgeContext);
if (validation.valid) {
const edgeKey = `${primaryKey}-${edgeType}-${fkKey}`;
if (!edgeMap.has(edgeKey)) {
edgeMap.set(edgeKey, {
src_key: primaryKey,
dst_key: fkKey,
type: edgeType,
props: edgeContext
});
}
} else {
console.log(`[Graph Quality] REJECTED heuristic edge: ${validation.reason}`);
}
}
}
}
});
}
}
const nodes = Array.from(nodeMap.values());
const edges = Array.from(edgeMap.values());
console.log(`[Graph] Heuristic mode: ${nodes.length} unique nodes, ${edges.length} unique edges`);
return {
nodes,
edges
};
}
// --- Helper Functions ---
function findNameColumn(row, idColumn) {
const keys = Object.keys(row);
// Try exact match: employee_id → employee_name
const baseName = idColumn.replace(/_id$/, '');
const candidates = [
`${baseName}_name`,
`${baseName}_title`,
`${baseName}`,
'name',
'title',
'label'
];
for (const candidate of candidates) {
if (keys.includes(candidate)) return candidate;
}
return null;
}
function inferEntityType(columnName) {
// Infer entity type from column name
if (columnName.includes('employee') || columnName.includes('person')) return 'Person';
if (columnName.includes('customer') || columnName.includes('client')) return 'Customer';
if (columnName.includes('warehouse') || columnName.includes('location')) return 'Location';
if (columnName.includes('product') || columnName.includes('item')) return 'Product';
if (columnName.includes('order')) return 'Order';
if (columnName.includes('company') || columnName.includes('organization')) return 'Organization';
return 'Entity'; // Generic fallback
}
async function analyzeTableSchema(tableName, rows, apiKey) {
console.log(`[V8] Starting schema analysis for: ${tableName}`);
const sample = rows.slice(0, 3);
const headers = Object.keys(sample[0] || {});
const prompt = `You are a data schema analyst. Analyze this table:
Table Name: ${tableName}
Columns: ${headers.join(', ')}
Sample Data: ${JSON.stringify(sample, null, 2)}
Provide:
1. description: One sentence explaining what this data represents
2. semantics: For each column, identify its semantic type. Options:
- person_name, company_name, location_name
- currency_amount, percentage, count
- date, datetime, duration
- identifier, category, description, status
3. graph_hints: Relationships that could form knowledge graph edges. Format:
[{"from_col": "manager_id", "to_col": "employee_id", "edge_type": "MANAGES", "confidence": "high"}]
Output ONLY valid JSON:
{
"description": "...",
"semantics": {"col1": "type", ...},
"graph_hints": [...]
}`;
try {
const result = await callGemini(prompt, apiKey, "gemini-2.5-flash");
console.log(`[V8] Analysis complete for ${tableName}`);
return {
description: result.description || `Data table: ${tableName}`,
semantics: result.semantics || {},
graphHints: result.graph_hints || []
};
} catch (e) {
console.error("[V8] Schema analysis failed:", e);
return {
description: `Data table: ${tableName}`,
semantics: {},
graphHints: []
};
}
}
// --- SPREADSHEET PROCESSOR ---
async function processSpreadsheetCore(uri, title, rows, env) {
console.log(`[Spreadsheet] Starting processing: ${rows.length} rows for ${title}`);
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_ROLE_KEY);
// 1. Clean Keys
console.log(`[Spreadsheet] Cleaning row keys...`);
const cleanRows = rows.map((row) => {
const newRow = {};
Object.keys(row).forEach((k) => {
const cleanKey = k.toLowerCase().trim().replace(/[^a-z0-9]/g, '_');
newRow[cleanKey] = row[k];
});
return newRow;
});
// 2. Infer Schema
const firstRow = cleanRows[0] || {};
const schema = {};
Object.keys(firstRow).forEach((k) => schema[k] = typeof firstRow[k]);
console.log(`[Spreadsheet] Schema inferred: ${Object.keys(schema).length} columns`);
// *** V8: Generate table name early ***
const safeName = title.toLowerCase().replace(/[^a-z0-9]/g, '_');
const tableName = `tbl_${safeName}`;
console.log(`[Spreadsheet] Table name: ${tableName}`);
// 3. PRE-SCAN FOR ID MAP
const idMap = {};
cleanRows.forEach((r) => {
if (r.employee_id && r.name) {
idMap[r.employee_id] = r.name;
}
});
console.log(`[Spreadsheet] ID map built: ${Object.keys(idMap).length} entries`);
// *** V8: ANALYZE SCHEMA ***
console.log(`[V8] Analyzing schema for: ${title}`);
const schemaAnalysis = await analyzeTableSchema(title, cleanRows, env.GOOGLE_API_KEY);
console.log(`[V8] Analysis complete:`, schemaAnalysis);
// 4. GENERATE GRAPH (V8: Use Generic Builder)
console.log(`[Graph] Building graph using generic approach...`);
const { nodes: allNodes, edges: allEdges } = await buildGraphGeneric(cleanRows, tableName, env.GOOGLE_API_KEY);
console.log(`[Graph] Generated ${allNodes.length} nodes and ${allEdges.length} edges.`);
// 5. BATCH INSERT
console.log(`[DB] Starting batch insert...`);
const BATCH_SIZE = 500;
for (let i = 0; i < cleanRows.length; i += BATCH_SIZE) {
const rowBatch = cleanRows.slice(i, i + BATCH_SIZE);
const nodesBatch = i === 0 ? allNodes : [];
const edgesBatch = i === 0 ? allEdges : [];
console.log(`[DB] Inserting batch ${i / BATCH_SIZE + 1}: ${rowBatch.length} rows`);
const { error } = await supabase.rpc('ingest_spreadsheet', {
p_uri: uri,
p_title: title,
p_table_name: safeName,
p_description: null,
p_rows: rowBatch,
p_schema: schema,
p_nodes: nodesBatch,
p_edges: edgesBatch
});
if (error) {
console.error(`[DB] Batch ${i} ERROR:`, error);
throw new Error(`Batch ${i} error: ${error.message}`);
}
console.log(`[DB] Batch ${i / BATCH_SIZE + 1} completed successfully`);
}
// *** V8: SAVE METADATA (SKIP FOR NOW) ***
console.log(`[V8] SKIPPING metadata save to test basic ingestion`);
console.log(`[Spreadsheet] Processing complete!`);
return {
success: true,
rows: cleanRows.length,
graph_nodes: allNodes.length,
metadata: schemaAnalysis
};
}
// --- MAIN WORKER HANDLER ---
serve(async (req) => {
if (req.method === 'OPTIONS') return new Response('ok', {
headers: corsHeaders
});
console.log(`[Worker] Request received`);
try {
const env = Deno.env.toObject();
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_ROLE_KEY);
const payload = await req.json();
console.log(`[Worker] Payload parsed`);
// PATH A: STORAGE TRIGGER (Large Spreadsheets)
if (payload.record && payload.record.bucket_id === 'raw_uploads') {
console.log(`[Worker] Storage Trigger: ${payload.record.name}`);
console.log(`[Worker] Downloading file...`);
const { data: fileData, error: dlError } = await supabase.storage.from('raw_uploads').download(payload.record.name);
if (dlError) {
console.error(`[Worker] Download error:`, dlError);
throw dlError;
}
console.log(`[Worker] File downloaded, parsing JSON...`);
const contentStr = await fileData.text();
console.log(`[Worker] Content length: ${contentStr.length} bytes`);
const parsed = JSON.parse(contentStr);
const { uri, title, data } = parsed;
console.log(`[Worker] Parsed: uri=${uri}, title=${title}, rows=${data?.length || 0}`);
if (!data || data.length === 0) {
console.error(`[Worker] ERROR: No data in payload!`);
throw new Error("No data found in uploaded file");
}
console.log(`[Worker] Calling processSpreadsheetCore...`);
await processSpreadsheetCore(uri, title, data, env);
console.log(`[Worker] Processing complete, deleting file...`);
await supabase.storage.from('raw_uploads').remove([
payload.record.name
]);
console.log(`[Worker] SUCCESS!`);
return new Response(JSON.stringify({
success: true
}), {
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
}
// PATH B: QUEUE TRIGGER (Text Documents)
console.log(`[Worker] Queue trigger path...`);
const config = await getConfig(supabase);
const { data: batch, error } = await supabase.from('ingestion_queue').select('*').eq('status', 'pending').limit(config.worker_batch_size);
if (error || !batch || batch.length === 0) {
console.log(`[Worker] Queue empty or error:`, error);
return new Response(JSON.stringify({
msg: "Queue empty"
}), {
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
}
console.log(`[Worker] Processing ${batch.length} chunks...`);
await supabase.from('ingestion_queue').update({
status: 'processing'
}).in('id', batch.map((r) => r.id));
for (const row of batch) {
try {
const embedding = await getEmbedding(row.chunk_text, env.GOOGLE_API_KEY);
// GRAPH EXTRACTION FOR TEXT (Every Nth chunk)
let nodes = [], edges = [], mentions = [];
if (row.chunk_index % config.graph_sample_rate === 0) {
const extractPrompt = `
You are a Knowledge Graph Extractor.
Analyze this text. Identify:
1. **ROLES**: Job titles (e.g., "Customer Support Manager").
2. **RESPONSIBILITIES**: Key duties (e.g., "Refunds", "OSHA").
3. **SYSTEMS**: Tools (e.g., "OMS", "Chatbots").
Output JSON: {
"nodes": [{"key": "Role:Name", "labels": ["Role"], "props": {"name": "Name"}}],
"edges": [{"src_key": "...", "dst_key": "...", "type": "OWNS", "props": {"context": "..."}}]
}
Text: ${row.chunk_text}`;
const graphData = await callGemini(extractPrompt, env.GOOGLE_API_KEY, config.model_extraction);
nodes = graphData.nodes || [];
edges = graphData.edges || [];
mentions = nodes.map((n) => ({
node_key: n.key,
rel: "MENTIONS"
}));
}
await supabase.rpc('ingest_document_chunk', {
p_uri: row.uri,
p_title: row.title,
p_doc_meta: {
processed_at: new Date().toISOString()
},
p_chunk: {
ordinal: row.chunk_index,
text: row.chunk_text,
embedding
},
p_nodes: nodes,
p_edges: edges,
p_mentions: mentions
});
await supabase.from('ingestion_queue').delete().eq('id', row.id);
} catch (err) {
console.error(`Error processing chunk ${row.id}:`, err);
await supabase.from('ingestion_queue').update({
status: 'failed',
error_log: err.message
}).eq('id', row.id);
}
}
// RECURSION (Process next batch)
const { count } = await supabase.from('ingestion_queue').select('*', {
count: 'exact',
head: true
}).eq('status', 'pending');
if (count && count > 0) {
fetch(`${env.SUPABASE_URL}/functions/v1/ingest-worker`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.SUPABASE_SERVICE_ROLE_KEY}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
action: 'continue'
})
}).catch((e) => console.error("Daisy chain failed", e));
}
return new Response(JSON.stringify({
success: true,
processed: batch.length
}), {
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
} catch (e) {
console.error("[Worker] FATAL ERROR:", e);
return new Response(JSON.stringify({
error: e.message,
stack: e.stack
}), {
status: 500,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
}
});
Step 6. Create one last Edge Function. Name this one, simply 'search':
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
const CORS = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "authorization, x-client-info, apikey, content-type"
};
// --- CONFIG LOADER ---
async function getConfig(supabase) {
const { data } = await supabase.from('app_config').select('settings').single();
return {
model_router: "gemini-2.5-flash",
model_reranker: "gemini-2.5-flash-lite",
model_sql: "gemini-2.5-flash",
rrf_weight_enrichment: 15.0,
rrf_weight_sql: 10.0,
rrf_weight_graph: 2.0,
rrf_weight_fts: 4.0,
rrf_weight_vector: 2.5,
rerank_depth: 15,
min_vector_score: 0.01,
...data && data.settings || {}
};
}
// --- V8: DYNAMIC SCHEMA LOADER ---
async function getAvailableSchemas(supabase) {
const { data, error } = await supabase.from('structured_table').select('table_name, description, schema_def, column_semantics').gt('row_count', 0);
if (error || !data || data.length === 0) {
return "No structured data available.";
}
return data.map((t)=>{
const cols = Object.keys(t.schema_def || {}).join(', ');
const desc = t.description || 'No description';
return `- ${t.table_name}: ${desc}\n Columns: ${cols}`;
}).join('\n');
}
// --- V9: COMPOSITE ENRICHMENT ---
async function enrichQueryContext(query, supabase, log) {
console.log("[V9] Detecting entities in query...");
const { data: detected, error: detectError } = await supabase.rpc('detect_query_entities', {
p_query: query
});
if (detectError || !detected || detected.length === 0) {
log("ENTITY_DETECTION", {
found: false
});
return [];
}
log("ENTITY_DETECTION", {
found: true,
entities: detected
});
const enrichments = [];
for (const entity of detected){
console.log(`[V9] Enriching ${entity.entity_type}: ${entity.key_value}`);
const { data: enriched, error: enrichError } = await supabase.rpc('enrich_query_context', {
p_primary_table: entity.table_name,
p_primary_key: entity.key_column,
p_primary_value: entity.key_value
});
if (enrichError) {
log("ENRICHMENT_ERROR", {
entity,
error: enrichError
});
continue;
}
if (enriched && enriched.length > 0) {
enrichments.push(...enriched.map((e)=>({
...e,
_source: `enrichment (${e.enrichment_type})`,
content: `[${e.enrichment_type.toUpperCase()}] ${e.table_name}: ${JSON.stringify(e.row_data)}`
})));
}
}
log("ENRICHMENT_RESULTS", {
count: enrichments.length
});
return enrichments;
}
// --- GEMINI LLM RERANKER (FROM OLD SYSTEM) ---
async function rerankWithGemini(query, docs, apiKey, model, depth) {
if (!docs || docs.length === 0) return [];
const candidates = docs.slice(0, depth);
const docList = candidates.map((d)=>({
id: d.chunk_id,
text: (d.content || "").substring(0, 350)
}));
const prompt = `Role: Relevance Filter.
Task: Evaluate if chunks are RELEVANT to the User Query.
User Query: "${query}"
KEEP RULES:
✅ KEEP if chunk directly answers the query
✅ KEEP if chunk provides important context (definitions, procedures, policies)
✅ KEEP if chunk mentions key entities from the query (names, IDs, locations)
✅ KEEP if unsure - err on the side of inclusion
DISCARD RULES:
❌ ONLY discard if completely unrelated (different topic entirely)
❌ Discard "Table of Contents", "Index", or navigation elements
❌ Discard if chunk is just metadata without substance
EXAMPLES:
- Query: "return policy" → KEEP: "Customer Support: returns processing", "30-day return window", "refund procedures"
- Query: "Order O00062" → KEEP: order details, customer info, warehouse data, shipping info
- Query: "Who founded company?" → KEEP: company history, founder bio, origin story
Return JSON: { "kept_ids": [list of chunk IDs to keep] }
Docs: ${JSON.stringify(docList)}`;
try {
const response = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:generateContent?key=${apiKey}`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
contents: [
{
parts: [
{
text: prompt
}
]
}
],
generationConfig: {
responseMimeType: "application/json"
}
})
});
const data = await response.json();
const text = data.candidates && data.candidates[0] && data.candidates[0].content && data.candidates[0].content.parts && data.candidates[0].content.parts[0] && data.candidates[0].content.parts[0].text || "{}";
const result = JSON.parse(text);
if (result.kept_ids && Array.isArray(result.kept_ids)) {
const keptDocs = [];
result.kept_ids.forEach((id)=>{
const doc = docs.find((d)=>d.chunk_id === id);
if (doc) keptDocs.push(doc);
});
// Safety net: If LLM discards everything, return top 1
if (keptDocs.length === 0 && docs.length > 0) {
return [
docs[0]
];
}
const MIN_KEPT = 10;
if (keptDocs.length < MIN_KEPT && docs.length >= MIN_KEPT) {
console.log(`[Rerank] Only kept ${keptDocs.length}, adding top ${MIN_KEPT - keptDocs.length} from original set`);
const keptIds = new Set(keptDocs.map((d)=>d.chunk_id));
const remaining = docs.filter((d)=>!keptIds.has(d.chunk_id));
const toAdd = remaining.slice(0, MIN_KEPT - keptDocs.length);
return [
...keptDocs,
...toAdd
];
}
return keptDocs;
}
return candidates;
} catch (e) {
console.error("[Rerank] Error:", e);
return candidates;
}
}
// --- V10: GRAPH RELEVANCE FILTER ---
function filterGraphByRelevance(graphEdges, query) {
if (!graphEdges || graphEdges.length === 0) return [];
const queryLower = query.toLowerCase();
const queryWords = queryLower.split(/\s+/).filter((w)=>w.length > 2);
const priorityRelations = [
'REPORTS_TO',
'MANAGES',
'PLACED_BY',
'FULFILLED_FROM',
'SHIPPED_BY',
'ASSIGNED_TO',
'WORKS_WITH'
];
const relevantEdges = [];
for (const edge of graphEdges){
const edgeText = `${edge.subject || ''} ${edge.action || ''} ${edge.object || ''}`.toLowerCase();
// Calculate keyword overlap
const matchedWords = queryWords.filter((w)=>edgeText.includes(w));
const overlapRatio = matchedWords.length / queryWords.length;
// Check for exact phrase matches (e.g., "customer onboarding" as a phrase)
const hasPhraseMatch = queryWords.some((word, idx)=>{
if (idx < queryWords.length - 1) {
const phrase = `${word} ${queryWords[idx + 1]}`;
return edgeText.includes(phrase);
}
return false;
});
// Keep if:
// 1. High keyword overlap (>40% of query words present in edge)
// 2. OR at least 2 keywords match
// 3. OR exact phrase match found
// 4. OR it's a high-priority relationship type
if (overlapRatio > 0.4 || matchedWords.length >= 2 || hasPhraseMatch || priorityRelations.includes(edge.action)) {
relevantEdges.push(edge);
}
}
console.log(`[Graph Filter] ${graphEdges.length} → ${relevantEdges.length} relevant edges`);
return relevantEdges;
}
// --- RRF FUSION (SIMPLIFIED) ---
async function performRRFFusion(rerankedDocs, graphResults, sqlResults, config) {
const K = 60;
const scores = {};
const addScore = (item, rank, weight, type)=>{
let key = item.chunk_id ? `chunk_${item.chunk_id}` : `sql_${JSON.stringify(item.row_data)}`;
if (!scores[key]) {
scores[key] = {
item: {
...item,
_source: type
},
score: 0
};
}
scores[key].score += 1.0 / (K + rank) * weight;
if (!scores[key].item._source.includes(type)) {
scores[key].item._source += `, ${type}`;
}
};
// Apply weights
rerankedDocs.forEach((item, idx)=>addScore(item, idx, config.rrf_weight_vector, 'vector/fts'));
if (graphResults) graphResults.forEach((item, idx)=>addScore(item, idx, config.rrf_weight_graph, 'graph'));
if (sqlResults) sqlResults.forEach((item, idx)=>addScore(item, idx, config.rrf_weight_sql, 'structured'));
// Convert to array & sort
let results = Object.values(scores).sort((a, b)=>b.score - a.score);
// ✅ NO CUTOFF - Let ranking + limit handle quality
// Reranking already filtered noise, cutoff was too aggressive
console.log(`[RRF_FUSION] Total results: ${results.length}`);
return results.map((s)=>s.item);
}
// --- V10: STRICTER ROUTER PROMPT (LINE ~238) ---
function buildRouterPrompt(schemas, query) {
return `You are the Context Mesh Router.
User Query: "${query}"
Available Tables:
${schemas}
DECISION PROTOCOL:
1. **ENTITY EXTRACTION** - Extract ONLY specific identifiers and proper names:
✅ ALWAYS EXTRACT (ID Patterns):
- Order IDs: O00001, O00062, O\\d{5}
- Customer IDs: CU006, CU008, CU\\d{3}
- Employee IDs: E001, E002, E\\d{3}
- Warehouse IDs: WH001, WH002, WH\\d{3}
- Carrier IDs: CR001, CR005, CR\\d{3}
✅ ALWAYS EXTRACT (Proper Names):
- Full person names: "Nicholas Cooper", "Sarah Brooks", "Emily Chen"
- Company names: "CommerceFlow Solutions", "Brand 06"
- Specific location names: "California", "Texas", "New Jersey"
⌠NEVER EXTRACT:
- Action verbs: list, show, get, find, tell, display, give, provide
- Question words: what, who, where, when, how, why, which
- Generic nouns: employees, customers, orders, warehouses, people, items
- Plural table names: customers, orders, employees (these trigger SQL, not graph)
- Departments: Operations, Sales, Marketing (these are WHERE clauses, not entities)
- Roles/titles: manager, CEO, analyst (these are WHERE clauses, not entities)
- Determiners: the, a, an, this, that, these, those
- Prepositions: in, at, from, to, with, by
- Status words: active, inactive, pending, shipped
âš ï¸ CRITICAL VALIDATION:
- If word appears in common English dictionary → NOT an entity
- If word is lowercase in query → NOT an entity (unless it's an ID)
- If word describes a category/group → NOT an entity
- If word is a verb in any tense → NOT an entity
EXAMPLES:
Query: "List orders placed this year"
❌ BAD: entities: ["list", "orders", "placed", "year"]
✅ GOOD: entities: []
Reason: All are generic terms, no specific identifiers
Query: "Show me order O00062"
❌ BAD: entities: ["show", "me", "order", "O00062"]
✅ GOOD: entities: ["O00062"]
Reason: Only O00062 is a specific identifier
Query: "Who does Sarah Brooks report to?"
❌ BAD: entities: ["who", "Sarah", "Brooks", "report", "to"]
✅ GOOD: entities: ["Sarah Brooks"]
Reason: Full name is proper noun, rest are grammar/verbs
Query: "List employees in Operations department"
❌ BAD: entities: ["employees", "Operations", "department"]
✅ GOOD: entities: []
Reason: All are generic category terms, no specific names
Query: "Show orders for Brand 06"
❌ BAD: entities: ["show", "orders", "Brand", "06"]
✅ GOOD: entities: ["Brand 06"]
Reason: "Brand 06" is a specific customer name
Query: "Which carrier shipped order O00123?"
❌ BAD: entities: ["which", "carrier", "shipped", "order", "O00123"]
✅ GOOD: entities: ["O00123"]
Reason: Only O00123 is a specific identifier
2. **SQL TABLE DETECTION**:
- Use sql_tables for: counts, sums, averages, lists, filters
- Keywords: "how many", "total", "average", "list", "show all"
3. **KEYWORD EXTRACTION** (for FTS):
- Extract nouns ONLY (no verbs, no determiners)
- Max 3-5 keywords
- Focus on domain-specific terms
VALIDATION CHECKLIST (before returning entities):
1. Is it an ID pattern (letters + numbers)? → YES = keep, NO = next check
2. Is it a capitalized proper name (2+ words)? → YES = keep, NO = next check
3. Is it a common English word? → YES = REMOVE, NO = keep
4. Is it a verb (ends in -ing, -ed, -s)? → YES = REMOVE, NO = keep
5. Is it a plural noun (employees, orders)? → YES = REMOVE, NO = keep
DEFAULT BEHAVIOR:
- When in doubt → DO NOT extract as entity
- Empty entities array is CORRECT for most queries
- Entities should be rare (only 20-30% of queries have them)
Output JSON: { "sql_tables": [], "entities": [], "keywords": [] }`;
}
// --- V10: ENTITY VALIDATION FILTER (ADD AFTER buildRouterPrompt) ---
function validateRouterEntities(entities, query) {
if (!entities || entities.length === 0) return [];
const validated = [];
const queryLower = query.toLowerCase();
// Common English verbs and nouns to reject
const REJECT_PATTERNS = {
// Action verbs
verbs: new Set([
'list',
'show',
'get',
'find',
'tell',
'display',
'give',
'provide',
'placed',
'hired',
'shipped',
'ordered',
'delivered',
'returned',
'create',
'update',
'delete',
'search',
'filter',
'sort'
]),
// Question words
questions: new Set([
'what',
'who',
'where',
'when',
'why',
'how',
'which',
'whose'
]),
// Generic nouns (plural forms)
plurals: new Set([
'employees',
'customers',
'orders',
'warehouses',
'carriers',
'products',
'items',
'people',
'users',
'companies',
'brands'
]),
// Generic nouns (singular forms)
singulars: new Set([
'employee',
'customer',
'order',
'warehouse',
'carrier',
'product',
'item',
'person',
'user',
'company',
'brand',
'manager',
'analyst',
'director',
'supervisor',
'coordinator'
]),
// Departments and categories
departments: new Set([
'operations',
'sales',
'marketing',
'finance',
'logistics',
'hr',
'it',
'support',
'management',
'administration'
]),
// Status and descriptors
descriptors: new Set([
'active',
'inactive',
'pending',
'shipped',
'delivered',
'returned',
'new',
'old',
'current',
'previous',
'next',
'last',
'first'
])
};
for (const entity of entities){
const entityLower = entity.toLowerCase().trim();
// Rule 1: Reject if empty or too short
if (!entityLower || entityLower.length < 2) {
console.log(`[Router Validation] REJECTED (too short): "${entity}"`);
continue;
}
// Rule 2: Keep if matches ID pattern (letters + numbers)
// O00062, CU006, E001, WH003, CR005
if (entity.match(/^[A-Z]{1,3}\d{3,5}$/)) {
console.log(`[Router Validation] ACCEPTED (ID pattern): "${entity}"`);
validated.push(entity);
continue;
}
// Rule 3: Keep if proper name (2+ capitalized words)
// "Nicholas Cooper", "Sarah Brooks", "Brand 06"
if (entity.match(/^[A-Z][a-z]+(\s+[A-Z0-9][a-z0-9]*)+$/)) {
console.log(`[Router Validation] ACCEPTED (proper name): "${entity}"`);
validated.push(entity);
continue;
}
// Rule 4: Keep if single capitalized word (potential company/location name)
// "CommerceFlow", "California", "Texas"
if (entity.match(/^[A-Z][a-z]{2,}$/) && entityLower.length > 4) {
// But reject if it's a known category word
if (REJECT_PATTERNS.departments.has(entityLower) || REJECT_PATTERNS.singulars.has(entityLower) || REJECT_PATTERNS.descriptors.has(entityLower)) {
console.log(`[Router Validation] REJECTED (category word): "${entity}"`);
continue;
}
console.log(`[Router Validation] ACCEPTED (capitalized term): "${entity}"`);
validated.push(entity);
continue;
}
// Rule 5: Reject if it's a known verb
if (REJECT_PATTERNS.verbs.has(entityLower)) {
console.log(`[Router Validation] REJECTED (verb): "${entity}"`);
continue;
}
// Rule 6: Reject if it's a question word
if (REJECT_PATTERNS.questions.has(entityLower)) {
console.log(`[Router Validation] REJECTED (question word): "${entity}"`);
continue;
}
// Rule 7: Reject if it's a plural generic noun
if (REJECT_PATTERNS.plurals.has(entityLower)) {
console.log(`[Router Validation] REJECTED (plural noun): "${entity}"`);
continue;
}
// Rule 8: Reject if it's a singular generic noun
if (REJECT_PATTERNS.singulars.has(entityLower)) {
console.log(`[Router Validation] REJECTED (singular noun): "${entity}"`);
continue;
}
// Rule 9: Reject if it's a department/category
if (REJECT_PATTERNS.departments.has(entityLower)) {
console.log(`[Router Validation] REJECTED (department): "${entity}"`);
continue;
}
// Rule 10: Reject if it's a status/descriptor
if (REJECT_PATTERNS.descriptors.has(entityLower)) {
console.log(`[Router Validation] REJECTED (descriptor): "${entity}"`);
continue;
}
// Rule 11: Reject if word appears as-is in query (likely a query word)
// Exception: proper nouns (capitalized in both)
if (queryLower.includes(entityLower) && entity === entityLower) {
console.log(`[Router Validation] REJECTED (uncapitalized query word): "${entity}"`);
continue;
}
// If we got here, it passed all checks - keep it with warning
console.log(`[Router Validation] ACCEPTED (passed all checks): "${entity}"`);
validated.push(entity);
}
console.log(`[Router Validation] Final: ${entities.length} → ${validated.length}`);
return validated;
}
// --- HELPERS ---
async function callGemini(prompt, apiKey, model = "gemini-2.5-flash") {
try {
const response = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:generateContent?key=${apiKey}`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
contents: [
{
parts: [
{
text: prompt
}
]
}
]
})
});
const data = await response.json();
if (!data.candidates) return {};
const txt = data.candidates[0].content.parts[0].text;
return JSON.parse(txt.replace(/```
json/g, "").replace(/
```/g, "").trim());
} catch (e) {
console.error("Gemini error:", e);
return {};
}
}
async function getEmbedding(text, apiKey) {
const response = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/gemini-embedding-001:embedContent?key=${apiKey}`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
model: "models/gemini-embedding-001",
content: {
parts: [
{
text
}
]
},
outputDimensionality: 768
})
});
const data = await response.json();
return data.embedding && data.embedding.values || [];
}
function extractFastKeywords(query) {
const stopWords = new Set([
'the',
'and',
'for',
'with',
'that',
'this',
'what',
'where',
'when',
'who',
'how',
'show',
'list',
'tell'
]);
return query.replace(/[^\w\s]/g, '').split(/\s+/).filter((w)=>w.length > 3 && !stopWords.has(w.toLowerCase()));
}
// --- MAIN HANDLER ---
serve(async (req)=>{
if (req.method === 'OPTIONS') {
return new Response('ok', {
headers: CORS
});
}
// NEW: toggle debug output from request body
let debugEnabled = false;
let debugInfo = {
logs: []
};
const log = (msg, data)=>{
if (!debugEnabled) return; // no-op when debug is off
debugInfo.logs.push({
msg,
data
});
};
let resultLimit = 20;
try {
const body = await req.json();
const query = body.query;
// You can make this stricter/looser if you want
debugEnabled = body.debug === true;
if (body.limit) resultLimit = body.limit;
const env = Deno.env.toObject();
const supabase = createClient(env.SUPABASE_URL, env.SUPABASE_SERVICE_ROLE_KEY);
const config = await getConfig(supabase);
// V8: DYNAMIC SCHEMA LOADING
console.log("[V8] Loading available schemas...");
const schemas = await getAvailableSchemas(supabase);
log("AVAILABLE_SCHEMAS", schemas);
// V9: COMPOSITE ENRICHMENT
const enrichmentPromise = enrichQueryContext(query, supabase, log);
// ROUTER
const routerPrompt = buildRouterPrompt(schemas, query);
const embeddingPromise = getEmbedding(query, env.GOOGLE_API_KEY);
const routerPromise = callGemini(routerPrompt, env.GOOGLE_API_KEY, config.model_router);
const fastKeywords = extractFastKeywords(query);
const [embedding, routerRes, enrichedData] = await Promise.all([
embeddingPromise,
routerPromise,
enrichmentPromise
]);
// ✅ V10: VALIDATE ROUTER ENTITIES
const originalEntities = routerRes.entities || [];
const validatedEntities = validateRouterEntities(originalEntities, query);
// Update router response with validated entities
routerRes.entities = validatedEntities;
debugInfo.router = {
...routerRes,
entity_validation: {
before: originalEntities,
after: validatedEntities,
rejected: originalEntities.filter((e)=>!validatedEntities.includes(e))
}
};
log("ROUTER_DECISION", debugInfo.router);
// ✅ MERGE: Combine entity detection + router entities
const allEntities = [];
// Add entities from detection (O00062, CU006, etc.)
if (enrichedData && enrichedData.length > 0) {
// Entity detection already found these
const detectedIds = enrichedData.filter((e)=>e.enrichment_type === 'primary').map((e)=>e.row_data.order_id || e.row_data.customer_id || e.row_data.employee_id).filter(Boolean);
allEntities.push(...detectedIds);
}
// Add entities from router (person names, roles, etc.)
if (routerRes.entities && routerRes.entities.length > 0) {
allEntities.push(...routerRes.entities);
}
// Add keywords as fallback (for queries like "employees in warehouse")
if (allEntities.length === 0 && routerRes.keywords && routerRes.keywords.length > 0) {
allEntities.push(...routerRes.keywords.slice(0, 3)); // Top 3 keywords
}
log("MERGED_ENTITIES", {
count: allEntities.length,
entities: allEntities
});
// ✅ ENTITY VALIDATION: Filter out generic terms
const specificEntities = allEntities.filter((entity)=>{
const entityLower = entity.toLowerCase().trim();
// 1. Keep if matches ID pattern (O00062, CU006, E001, WH001, CR001)
if (entity.match(/^[A-Z]{1,3}\d{3,5}$/)) {
return true;
}
// 2. Keep if proper name (has space + capitalized words)
// "Sarah Brooks", "Nicholas Cooper", "Brand 06"
if (entity.match(/^[A-Z][a-z]+(\s+[A-Z][a-z0-9]+)+$/)) {
return true;
}
// 3. Reject if generic table name
const genericTableNames = [
'employee',
'employees',
'customer',
'customers',
'order',
'orders',
'warehouse',
'warehouses',
'carrier',
'carriers',
'product',
'products',
'item',
'items',
'user',
'users'
];
if (genericTableNames.includes(entityLower)) {
return false;
}
// 4. Reject if generic concept
const genericConcepts = [
'process',
'procedure',
'system',
'policy',
'onboarding',
'training',
'support',
'service',
'department',
'role',
'location',
'region',
'manager',
'staff',
'team',
'people'
];
if (genericConcepts.includes(entityLower)) {
return false;
}
// 5. Keep if single capitalized word (might be company name)
// "CommerceFlow", "California", "Texas"
if (entity.match(/^[A-Z][a-z]+$/) && entity.length > 3) {
return true;
}
// 6. Default: reject
return false;
});
log("ENTITY_VALIDATION", {
before: allEntities,
after: specificEntities,
filtered_out: allEntities.filter((e)=>!specificEntities.includes(e)),
count_before: allEntities.length,
count_after: specificEntities.length
});
const searchTasks = [];
// TASK A: VECTOR
if (embedding && embedding.length > 0) {
searchTasks.push(supabase.rpc('search_vector', {
p_embedding: embedding,
p_limit: 20,
p_threshold: config.min_vector_score
}));
} else {
searchTasks.push(Promise.resolve([]));
}
// TASK B: FTS
const ftsQuery = routerRes.keywords && routerRes.keywords.length > 0 ? routerRes.keywords.join(' ') : query;
console.log(`[V8] FTS search for: "${ftsQuery}"`);
searchTasks.push(supabase.rpc('search_fulltext', {
p_query: ftsQuery,
p_limit: 20
}));
// TASK C: GRAPH (Entity Neighborhood)
if (specificEntities.length > 0) {
searchTasks.push((async ()=>{
console.log(`[V8] Searching graph for entities:`, allEntities);
const { data, error } = await supabase.rpc('get_graph_neighborhood', {
p_entity_names: specificEntities
});
log("GRAPH_RAW", {
count: data && data.length || 0,
sample: data && data[0],
error: error && error.message
});
if (error) {
log("GRAPH_ERROR", error);
return [];
}
// ✅ FILTER OUT GARBAGE EDGES
const cleanEdges = (data || []).filter((r)=>{
// Remove self-referential edges (subject == object)
if (r.subject && r.object && r.subject === r.object) {
console.log(`[GRAPH_FILTER] Removed self-referential: ${r.subject} ${r.action} ${r.object}`);
return false;
}
// Remove edges with "Inferred from carrier_id" context (low quality)
if (r.context && r.context.context && r.context.context.includes('Inferred from carrier_id')) {
console.log(`[GRAPH_FILTER] Removed low-quality inferred edge: ${r.subject} ${r.action} ${r.object}`);
return false;
}
return true;
});
log("GRAPH_FILTERED", {
before: data && data.length || 0,
after: cleanEdges.length,
removed: (data && data.length || 0) - cleanEdges.length
});
return cleanEdges.map((r)=>({
...r,
_source: `graph (${r.action})`,
content: `[GRAPH] ${r.subject} ${r.action} ${r.object || ''}. Context: ${r.context && r.context.context || ''}`
}));
})());
} else {
searchTasks.push(Promise.resolve([]));
}
// TASK D: SQL
let sqlEntityNames = []; // Track entity names for graph (generic)
let sqlEntityType = ''; // Track what type of entities
if (routerRes.sql_tables && routerRes.sql_tables.length > 0) {
searchTasks.push((async ()=>{
const targetTable = routerRes.sql_tables[0];
console.log(`[V8] Peeking at table: ${targetTable}`);
const allTableNames = [
targetTable,
...routerRes.sql_tables.filter((t)=>t !== targetTable)
];
const allContexts = await Promise.all(allTableNames.map((tableName)=>supabase.rpc('get_table_context', {
p_table_name: tableName
})));
const context = allContexts[0].data; // Primary table
const relatedSchemas = allContexts.slice(1).map((c)=>c.data).filter(Boolean);
if (!context || context.error) {
log("PEEK_ERROR", context?.error || "Failed to fetch table context");
return [];
}
log("TABLE_CONTEXT", context);
const sqlPrompt = `PostgreSQL Query Generator
Query: "${query}"
PRIMARY TABLE: ${context.table}
Schema: ${context.schema}
${context.description ? `Purpose: ${context.description}` : ''}
${relatedSchemas.length > 0 ? `
RELATED TABLES:
${relatedSchemas.map((s)=>`${s.table}: ${s.schema}`).join('\n')}
` : ''}
${context.related_tables && context.related_tables.length > 0 ? `
JOINS (copy exact syntax):
${context.related_tables.map((rt)=>`${rt.join_on}`).join('\n')}
` : ''}
🚨 MANDATORY RULES:
1. DATE CONVERSION (start_date, order_date are Excel serials):
✅ CORRECT: WHERE EXTRACT(YEAR FROM (DATE '1899-12-30' + start_date::int)) = 2022
❌ WRONG: WHERE LEFT(start_date::text, 4) = '2022'
❌ WRONG: WHERE TO_TIMESTAMP(order_date) >= '2024-01-01'
❌ WRONG: WHERE start_date < 2015
2. TYPE CASTING (all aggregates):
- SUM(col::numeric)::double precision
- AVG(col::numeric)::double precision
- COUNT(*)::double precision
- MIN/MAX(col::numeric)::double precision
3. EXACT COLUMN NAMES (from schemas above):
- order_status, warehouse_id, order_value_usd
- NOT: status, id, value
4. LIST QUERIES (contains "list", "show", "find", "display"):
✅ CORRECT: SELECT name, role, department, location, email, employee_id FROM...
❌ WRONG: SELECT name FROM...
- Return ALL identifying + descriptive columns
- For employees: name, role, department, location, email, employee_id
- For customers: customer_id, customer_name, status, industry, location
- For orders: order_id, customer_name, order_status, order_value_usd, order_date
5. INCLUDE FILTER COLUMNS (CRITICAL):
If query filters by a column, you MUST include it (or computed version) in SELECT.
✅ CORRECT Examples:
Query: "employees who started before 2015"
SELECT name, role, department, location, email, employee_id,
EXTRACT(YEAR FROM (DATE '1899-12-30' + start_date::int)) as start_year
FROM tbl_employees
WHERE EXTRACT(YEAR FROM (DATE '1899-12-30' + start_date::int)) < 2015
Query: "orders over $1000"
SELECT order_id, customer_name, order_status, order_value_usd
FROM tbl_orders
WHERE order_value_usd > 1000
Query: "customers in California"
SELECT customer_id, customer_name, status, location
FROM tbl_customers
WHERE location ILIKE '%California%'
WHY: Users need to verify results match the filter criteria.
RULE: WHERE clause column → must appear in SELECT
Output: {"sql": "..."}`;
const sqlGen = await callGemini(sqlPrompt, env.GOOGLE_API_KEY, config.model_sql);
log("SQL_GENERATED", sqlGen);
log("SQL_PROMPT_LENGTH", {
chars: sqlPrompt.length,
lines: sqlPrompt.split('\n').length
});
if (!sqlGen || !sqlGen.sql) {
log("SQL_GENERATION_FAILED", {
reason: "Gemini returned empty or invalid response",
response: sqlGen
});
}
if (sqlGen.sql) {
const cleanSql = sqlGen.sql.replace(/```
sql/g, "").replace(/
```/g, "").trim();
const { data, error } = await supabase.rpc('search_structured', {
p_query_sql: cleanSql,
p_limit: 20
});
if (error || data && data[0] && data[0].table_name === 'ERROR') {
log("SQL_ERROR", error || data && data[0]);
return [
{
_source: "structured_error",
error: error || data && data[0]
}
];
}
// ✅ V10: Validate SQL results have expected fields
if (data && data.length > 0 && data[0].row_data) {
const firstRow = data[0].row_data;
const columns = Object.keys(firstRow);
const queryLower = query.toLowerCase();
// Check for date fields in temporal queries
if (queryLower.match(/\b(hired|year|month|date|when|2022|2023|2024)\b/)) {
const hasDateField = columns.some((col)=>col.includes('date') || col.includes('hire') || col.includes('start'));
if (!hasDateField) {
log("SQL_VALIDATION_WARNING", {
issue: "temporal_query_missing_date",
query: query,
columns: columns,
suggestion: "SQL should include converted date field"
});
}
}
// Check for name fields in employee queries
if (targetTable.includes('employee') && !columns.includes('name')) {
log("SQL_VALIDATION_WARNING", {
issue: "employee_query_missing_name",
query: query,
columns: columns
});
}
// Check for status/value fields in order queries
if (targetTable.includes('order') && queryLower.match(/\b(status|value|price|amount)\b/)) {
const hasStatusOrValue = columns.some((col)=>col.includes('status') || col.includes('value') || col.includes('amount'));
if (!hasStatusOrValue) {
log("SQL_VALIDATION_WARNING", {
issue: "order_query_missing_status_or_value",
query: query,
columns: columns
});
}
}
log("SQL_VALIDATION_PASSED", {
columns: columns,
row_count: data.length
});
}
// ✅ GENERIC ENTITY EXTRACTION (works for ANY table/columns)
if (data && data.length > 0) {
const firstRow = data[0].row_data || {};
const columns = Object.keys(firstRow);
// Try common identifier columns in priority order
const identifierColumns = [
'name',
'customer_name',
'brand_name',
'order_id',
'customer_id',
'employee_id',
'product_id',
'warehouse_id',
'title',
'project_name' // Generic identifiers
];
// Find first column that exists
const identifierColumn = identifierColumns.find((col)=>columns.includes(col));
if (identifierColumn) {
sqlEntityNames = data.map((row)=>row.row_data && row.row_data[identifierColumn]).filter(Boolean);
sqlEntityType = identifierColumn.replace(/_id$/, '').replace(/_name$/, '');
log("SQL_ENTITY_NAMES", {
column: identifierColumn,
type: sqlEntityType,
count: sqlEntityNames.length,
sample: sqlEntityNames.slice(0, 3)
});
}
}
return data || [];
}
return [];
})());
} else {
searchTasks.push(Promise.resolve([]));
}
// WAIT FOR ALL SEARCHES
const [vectorRes, ftsRes, graphRes, sqlResults] = await Promise.all(searchTasks);
// ✅ SECOND GRAPH PASS: Use SQL-derived entity names
let entityGraphRes = [];
if (sqlEntityNames.length > 0) {
// Validate these are real names, not generic terms
const validSqlNames = sqlEntityNames.filter((name)=>{
return name && name.length > 2 && ![
'employee',
'customer',
'order',
'warehouse'
].includes(name.toLowerCase());
});
if (validSqlNames.length > 0) {
console.log(`[V8] Second graph pass with SQL ${sqlEntityType}:`, validSqlNames);
const { data: entityGraphData, error: entityGraphError } = await supabase.rpc('get_graph_neighborhood', {
p_entity_names: validSqlNames
});
if (!entityGraphError && entityGraphData && entityGraphData.length > 0) {
// Filter out garbage edges
const cleanEntityEdges = entityGraphData.filter((r)=>{
if (r.subject && r.object && r.subject === r.object) return false;
if (r.context && r.context.context && r.context.context.includes('Inferred from carrier_id')) return false;
return true;
});
entityGraphRes = cleanEntityEdges.map((r)=>({
...r,
_source: `graph (${r.action})`,
content: `[GRAPH] ${r.subject} ${r.action} ${r.object || ''}. Context: ${r.context && r.context.context || ''}`
}));
log("ENTITY_GRAPH", {
type: sqlEntityType,
count: entityGraphRes.length,
sample: entityGraphRes[0]
});
}
} else {
log("ENTITY_GRAPH_SKIPPED", {
reason: "no_valid_sql_names",
filtered_out: sqlEntityNames
});
}
}
// Log all search results
log("VECTOR_RAW", {
count: Array.isArray(vectorRes) ? vectorRes.length : vectorRes && vectorRes.data && vectorRes.data.length || 0
});
log("FTS_RAW", {
count: Array.isArray(ftsRes) ? ftsRes.length : ftsRes && ftsRes.data && ftsRes.data.length || 0
});
log("GRAPH_EDGES", {
count: Array.isArray(graphRes) ? graphRes.length : graphRes && graphRes.data && graphRes.data.length || 0,
sample: Array.isArray(graphRes) ? graphRes[0] : graphRes && graphRes.data && graphRes.data[0]
});
log("SQL_RAW", {
count: Array.isArray(sqlResults) ? sqlResults.length : 0
});
// ✅ COMBINE VECTOR + FTS & DEDUPLICATE
const vectorItems = Array.isArray(vectorRes) ? vectorRes : vectorRes && vectorRes.data || [];
const ftsItems = Array.isArray(ftsRes) ? ftsRes : ftsRes && ftsRes.data || [];
const combinedDocs = [
...vectorItems,
...ftsItems
];
const uniqueDocsMap = new Map();
combinedDocs.forEach((doc)=>{
if (doc.chunk_id && !uniqueDocsMap.has(doc.chunk_id)) {
uniqueDocsMap.set(doc.chunk_id, doc);
}
});
log("BEFORE_RERANK", {
count: uniqueDocsMap.size
});
log("BEFORE_RERANK_IDS", {
chunk_ids: Array.from(uniqueDocsMap.keys())
});
// ✅ V10: SELECTIVE RERANKING - Only rerank for policy/document queries
// Skip reranking when we have structured results (SQL/entities/enrichment)
const hasStructuredResults = routerRes.sql_tables && routerRes.sql_tables.length > 0 || enrichedData.length > 0 || Array.isArray(sqlResults) && sqlResults.length > 0;
let rerankedDocs;
if (hasStructuredResults) {
// Structured query: Keep top 15 docs without reranking (reranker often removes critical context)
rerankedDocs = Array.from(uniqueDocsMap.values()).slice(0, 15);
log("RERANK_SKIPPED", {
reason: "structured_query_detected",
keeping: rerankedDocs.length
});
} else {
// Policy/document query: Use reranker with safety net
// SAFETY NET: Always preserve top 5 by original score
const allDocs = Array.from(uniqueDocsMap.values());
const topByScore = allDocs.slice(0, 5);
// Send up to 30 docs to reranker
const rerankerResults = await rerankWithGemini(query, allDocs, env.GOOGLE_API_KEY, config.model_reranker, Math.min(uniqueDocsMap.size, 30));
// Merge: reranker results + top 5 safety net (deduplicated)
const merged = [
...rerankerResults
];
let safetyNetAdded = 0;
topByScore.forEach((doc)=>{
if (!merged.find((d)=>d.chunk_id === doc.chunk_id)) {
merged.push(doc);
safetyNetAdded++;
}
});
// Keep top 15 from merged results
rerankedDocs = merged.slice(0, 15);
log("RERANK_EXECUTED", {
reason: "policy_query_detected",
reranker_kept: rerankerResults.length,
safety_net_added: safetyNetAdded,
final_count: rerankedDocs.length,
fallback_used: rerankerResults.length === 0
});
}
log("AFTER_RERANK", {
count: rerankedDocs.length
});
// ✅ CONDITIONAL GRAPH: Use entity-specific OR generic, NOT both
const graphItems = Array.isArray(graphRes) ? graphRes : graphRes && graphRes.data || [];
// If we have entity-specific edges, DROP generic graph entirely
const allGraphItems = entityGraphRes.length > 0 ? entityGraphRes // ✅ Use ONLY entity-specific edges (high quality)
: graphItems; // Fallback to generic (if no entities)
log("GRAPH_STRATEGY", {
strategy: entityGraphRes.length > 0 ? "entity-specific" : "generic",
entity_type: sqlEntityType || 'none',
entity_count: entityGraphRes.length,
generic_count: graphItems.length,
using: allGraphItems.length
});
// ✅ V10: Filter graph edges for relevance BEFORE fusion
const filteredGraphItems = allGraphItems;
log("GRAPH_RELEVANCE_FILTER", {
before: allGraphItems.length,
after: filteredGraphItems.length,
removed: allGraphItems.length - filteredGraphItems.length
});
const fusedResults = await performRRFFusion(rerankedDocs, filteredGraphItems, sqlResults, config);
log("AFTER_FUSION", {
count: fusedResults.length
});
// ✅ FILTER & DIVERSIFY GRAPH EDGES
// 1. Filter redundant attributes (don't repeat what SQL already shows)
// 2. Deduplicate by entity (show diverse entities, not 4 facts about 1 person)
const RELATIONSHIP_ACTIONS = [
'REPORTS_TO',
'MANAGES',
'ASSIGNED_TO',
'WORKS_WITH',
'PLACED_BY',
'FULFILLED_FROM',
'SHIPPED_BY'
];
const ATTRIBUTE_ACTIONS = [
'HAS_ROLE',
'BELONGS_TO',
'WORKS_AT',
'HAS_STATUS'
];
// Check what SQL already has
const sqlHasRole = sqlResults.some((r)=>r.row_data && r.row_data.role);
const sqlHasDepartment = sqlResults.some((r)=>r.row_data && (r.row_data.department || r.row_data.dept));
const sqlHasLocation = sqlResults.some((r)=>r.row_data && r.row_data.location);
// Filter redundant edges
const valuableEdges = filteredGraphItems.filter((edge)=>{
// Keep all relationship edges (these ADD new info)
if (RELATIONSHIP_ACTIONS.includes(edge.action)) return true;
// Filter redundant attributes
if (edge.action === 'HAS_ROLE' && sqlHasRole) return false;
if (edge.action === 'BELONGS_TO' && sqlHasDepartment) return false;
if (edge.action === 'WORKS_AT' && sqlHasLocation) return false;
// Keep everything else
return true;
});
log("FILTERED_REDUNDANT", {
before: filteredGraphItems.length,
after: valuableEdges.length,
removed: filteredGraphItems.length - valuableEdges.length,
sql_has: {
role: sqlHasRole,
department: sqlHasDepartment,
location: sqlHasLocation
}
});
// Group by entity (subject)
const edgesByEntity = new Map();
valuableEdges.forEach((edge)=>{
const subject = edge.subject || 'unknown';
if (!edgesByEntity.has(subject)) {
edgesByEntity.set(subject, []);
}
edgesByEntity.get(subject).push(edge);
});
// Take 1 edge per entity (prioritize relationships)
const diverseGraphEdges = [];
for (const [subject, edges] of edgesByEntity){
// Prioritize relationship edges over attributes
const sorted = edges.sort((a, b)=>{
const relPriority = {
'REPORTS_TO': 10,
'MANAGES': 9,
'ASSIGNED_TO': 8,
'PLACED_BY': 7,
'FULFILLED_FROM': 6,
'SHIPPED_BY': 5,
'BELONGS_TO': 3,
'WORKS_AT': 2,
'HAS_ROLE': 1
};
return (relPriority[b.action] || 0) - (relPriority[a.action] || 0);
});
diverseGraphEdges.push(sorted[0]); // Take best edge for this entity
if (diverseGraphEdges.length >= 10) break; // Max 10 diverse entities
}
// Score them for proper ranking
const scoredGraphEdges = diverseGraphEdges.map((e, idx)=>({
...e,
score: config.rrf_weight_graph + (10 - idx) * 0.05
}));
log("DIVERSIFIED_GRAPH", {
total_edges: filteredGraphItems.length,
after_filter: valuableEdges.length,
unique_entities: edgesByEntity.size,
selected: scoredGraphEdges.length,
sample: scoredGraphEdges[0]
});
// ✅ ADD ENRICHED DATA AT TOP
const finalResults = [
...enrichedData.map((e)=>({
...e,
score: config.rrf_weight_enrichment
})),
...scoredGraphEdges,
...fusedResults
];
// ✅ DEDUPLICATE by content (avoid showing same edge/doc twice)
const seenKeys = new Set();
const dedupedResults = finalResults.filter((item)=>{
// Generate unique key
const key = item.chunk_id ? `chunk_${item.chunk_id}` : item.subject && item.action && item.object ? `graph_${item.subject}_${item.action}_${item.object}` : JSON.stringify(item.row_data || item);
if (seenKeys.has(key)) return false;
seenKeys.add(key);
return true;
});
dedupedResults.sort((a, b)=>b.score - a.score);
const payload = {
success: true,
results: dedupedResults.slice(0, resultLimit),
enrichment_count: enrichedData.length
};
// Only include diagnostics when explicitly requested
if (debugEnabled) {
payload.debug = debugInfo;
}
return new Response(JSON.stringify(payload), {
headers: {
...CORS,
'Content-Type': 'application/json'
}
});
} catch (error) {
console.error("[V9] Search error:", error);
const errorPayload = {
error: error?.message || "Unknown error"
};
if (debugEnabled) {
errorPayload.debug = debugInfo;
}
return new Response(JSON.stringify(errorPayload), {
status: 500,
headers: CORS
});
}
});
Step 7. Now you can interact with your Supabase functions using the api endpoints ingest-intelligent and search (ingest-worker is called by ingest-intelligent). You can do that with any REST API call. I've created a couple of n8n workflows that facilitate this. First, here's two workflows for ingestion (one for documents and one for spreadsheets). If you have n8n, copy and paste this json into a canvas:
{
"name": "Context Mesh V2 - File Uploader",
"nodes": [
{
"parameters": {
"path": "doc-upload-v2-robust",
"formTitle": "Upload Document",
"formDescription": "Upload text files (PDF, TXT, or MD) to add to the knowledge base",
"formFields": {
"values": [
{
"fieldLabel": "data",
"fieldType": "file",
"requiredField": true
},
{
"fieldLabel": "Title (use logical title name)"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.formTrigger",
"typeVersion": 2,
"position": [
1232,
-80
],
"id": "da526682-dde5-4245-ac50-2a337c71dad6",
"name": "Document Upload Form",
"webhookId": "doc-upload-v2-robust"
},
{
"parameters": {
"operation": "text",
"options": {}
},
"type": "n8n-nodes-base.extractFromFile",
"typeVersion": 1,
"position": [
1456,
-80
],
"id": "8b526c4c-6773-43cd-b19a-ce70fb75f518",
"name": "Extract Text"
},
{
"parameters": {
"method": "POST",
"url": "=https://zbtqpvkaycnonaslwqfq.supabase.co/functions/v1/ingest-intelligent",
"authentication": "predefinedCredentialType",
"nodeCredentialType": "supabaseApi",
"sendBody": true,
"bodyParameters": {
"parameters": [
{
"name": "uri",
"value": "={{ $('Document Upload Form').item.json.data[0].filename }}"
},
{
"name": "title",
"value": "={{ $('Document Upload Form').item.json['Title (optional)'] }}"
},
{
"name": "text",
"value": "={{ $json.data }}"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [
1680,
-80
],
"id": "5e2c69f6-4312-4b23-8881-37ee9ad0c360",
"name": "Ingest Document Chunk",
"retryOnFail": true,
"waitBetweenTries": 5000,
"credentials": {
"supabaseApi": {
"id": "L1c6TGVJOHc8wt9H",
"name": "infoSupa_contentMesh"
}
}
},
{
"parameters": {
"content": "## This is for documents (txt, pdf, md) \n",
"width": 192
},
"type": "n8n-nodes-base.stickyNote",
"position": [
1120,
-192
],
"typeVersion": 1,
"id": "3e1eeb6f-fe73-4676-b714-e35008d2b27a",
"name": "Sticky Note"
},
{
"parameters": {
"content": "## This node extracts from txt files\n**Change this to extract from whatever file type you wish to upload. 'Extract from Text File' for .txt or .md. 'Extract from PDF' for .pdf.**\n",
"height": 208,
"color": 3
},
"type": "n8n-nodes-base.stickyNote",
"position": [
1376,
-256
],
"typeVersion": 1,
"id": "b82b51c2-aa31-4ec3-b262-4c175da20e26",
"name": "Sticky Note1"
},
{
"parameters": {
"content": "## Make sure your Supabase Credentials are saved in n8n\n",
"height": 176,
"color": 6
},
"type": "n8n-nodes-base.stickyNote",
"position": [
1856,
48
],
"typeVersion": 1,
"id": "58314655-fd7a-48d5-921d-11aba1b11333",
"name": "Sticky Note2"
},
{
"parameters": {
"content": "## This is for spreadsheets (csv, xls, xlsx) \n",
"width": 192
},
"type": "n8n-nodes-base.stickyNote",
"position": [
1088,
192
],
"typeVersion": 1,
"id": "04447f73-111d-45c7-a37b-d81e08cb5189",
"name": "Sticky Note3"
},
{
"parameters": {
"content": "## This node extracts from spreadsheet files\n**Change this to extract from whatever file type you wish to upload. 'Extract from CSV', 'Extract from XLS', etc**\n",
"height": 208,
"color": 3
},
"type": "n8n-nodes-base.stickyNote",
"position": [
1376,
112
],
"typeVersion": 1,
"id": "d23b51ce-557b-4811-82f4-137b6592f0ff",
"name": "Sticky Note4"
},
{
"parameters": {
"path": "sheet-upload-v3",
"formTitle": "Upload Spreadsheet (V3)",
"formDescription": "Upload CSV or Excel files. The system handles large files automatically.",
"formFields": {
"values": [
{
"fieldLabel": "data",
"fieldType": "file",
"requiredField": true
},
{
"fieldLabel": "Table Name (e.g. sales_data)",
"requiredField": true
}
]
},
"options": {}
},
"type": "n8n-nodes-base.formTrigger",
"typeVersion": 2,
"position": [
1216,
304
],
"id": "d6e7a240-e165-4ed4-9310-07d1a59944e1",
"name": "Spreadsheet Upload Form",
"webhookId": "sheet-upload-v3"
},
{
"parameters": {
"operation": "xlsx",
"options": {}
},
"type": "n8n-nodes-base.extractFromFile",
"typeVersion": 1,
"position": [
1440,
304
],
"id": "8e4447c8-22db-4cc3-a6a5-8faa8f245f4f",
"name": "Extract Spreadsheet"
},
{
"parameters": {
"method": "POST",
"url": "https://zbtqpvkaycnonaslwqfq.supabase.co/functions/v1/ingest-intelligent",
"authentication": "predefinedCredentialType",
"nodeCredentialType": "supabaseApi",
"sendBody": true,
"bodyParameters": {
"parameters": [
{
"name": "uri",
"value": "={{ $('Spreadsheet Upload Form').item.json.data[0].filename }}"
},
{
"name": "title",
"value": "={{ $('Spreadsheet Upload Form').item.json['Table Name (e.g. sales_data)'] }}"
},
{
"name": "data",
"value": "={{ $json.data }}"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [
1888,
304
],
"id": "c4130f58-b1a7-4d58-a338-7f481b89a695",
"name": "Send to Context Mesh",
"retryOnFail": true,
"waitBetweenTries": 5000,
"credentials": {
"supabaseApi": {
"id": "L1c6TGVJOHc8wt9H",
"name": "infoSupa_contentMesh"
}
}
},
{
"parameters": {
"aggregate": "aggregateAllItemData",
"options": {}
},
"type": "n8n-nodes-base.aggregate",
"typeVersion": 1,
"position": [
1664,
304
],
"id": "5840c145-0511-4cec-8b70-3d5d373e2556",
"name": "Aggregate"
}
],
"pinData": {},
"connections": {
"Document Upload Form": {
"main": [
[
{
"node": "Extract Text",
"type": "main",
"index": 0
}
]
]
},
"Extract Text": {
"main": [
[
{
"node": "Ingest Document Chunk",
"type": "main",
"index": 0
}
]
]
},
"Ingest Document Chunk": {
"main": [
[]
]
},
"Spreadsheet Upload Form": {
"main": [
[
{
"node": "Extract Spreadsheet",
"type": "main",
"index": 0
}
]
]
},
"Extract Spreadsheet": {
"main": [
[
{
"node": "Aggregate",
"type": "main",
"index": 0
}
]
]
},
"Aggregate": {
"main": [
[
{
"node": "Send to Context Mesh",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
},
"versionId": "dc8d82a5-eb54-446f-ba8a-9274469bb70e",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "1dbf32ab27f7926a258ac270fe5e9e15871cfb01059a55b25aa401186050b9b5"
},
"id": "P9zYEohLKCCgjkym",
"tags": []
}
Step 8. Here's a workflow for the 'search' endpoint. This one is the retrieval. I connected it as a tool to an A.I. agent, so you can just start chatting and reference your data directly:
{
"name": "Context Mesh V2 - Chat Interface",
"nodes": [
{
"parameters": {
"options": {}
},
"type": "@n8n/n8n-nodes-langchain.lmChatGoogleGemini",
"typeVersion": 1,
"position": [
304,
528
],
"id": "9da9a603-da31-460f-9f1a-96e0bb3e9e23",
"name": "Google Gemini Chat Model",
"credentials": {
"googlePalmApi": {
"id": "YEyGAyg7bHXHutrf",
"name": "sb_projects"
}
}
},
{
"parameters": {
"toolDescription": "composite_query: query Supabase using edge function that retrieves hybrid vector search, SQL, and knowledge graph all at once.",
"method": "POST",
"url": "https://zbtqpvkaycnonaslwqfq.supabase.co/functions/v1/search",
"authentication": "predefinedCredentialType",
"nodeCredentialType": "supabaseApi",
"sendHeaders": true,
"headerParameters": {
"parameters": [
{
"name": "Content-Type",
"value": "application/json"
}
]
},
"sendBody": true,
"bodyParameters": {
"parameters": [
{
"name": "query",
"value": "={{ /*n8n-auto-generated-fromAI-override*/ $fromAI('parameters0_Value', ``, 'string') }}"
},
{
"name": "limit",
"value": "20"
}
]
},
"options": {}
},
"type": "n8n-nodes-base.httpRequestTool",
"typeVersion": 4.2,
"position": [
560,
528
],
"id": "f02b140b-aaad-48b2-be6c-42ae55a1209f",
"name": "composite_query",
"credentials": {
"supabaseApi": {
"id": "L1c6TGVJOHc8wt9H",
"name": "infoSupa_contentMesh"
}
}
},
{
"parameters": {
"options": {
"systemMessage": "=You have access to a powerful search tool called `composite_query` that searches through a knowledge base using three search methods simultaneously:\n1. **Vector search** - semantic/meaning-based search\n2. **Graph search** - entity and relationship traversal \n3. **Structured search** - full-text filtering\n\n**When to use this tool:**\n- Whenever the user asks any question\n- When you need factual information to answer questions accurately\n- When the user requests specific filtering or analysis\n\n**How to use this tool:**\nOutput a query_text:\n\n**Required:**\n- `query_text` (string) - The user's question or search terms in natural language\n\n\n**What you'll receive:**\n- `context_block` - Formatted text with source, content, entities, and relationships\n- `entities` - JSON array of relevant entities (people, products, companies, etc.)\n- `relationships` - JSON array showing how entities are connected\n- `relevance` - Indicates which search methods found this result\n\n**Important:** Always use this tool before answering questions. Use the returned context to provide accurate, grounded answers. Reference entities and relationships when relevant.\n"
}
},
"type": "@n8n/n8n-nodes-langchain.agent",
"typeVersion": 2.2,
"position": [
352,
304
],
"id": "c32fa3a4-d62f-47dc-b0a2-a4c418a30a35",
"name": "AI Agent"
},
{
"parameters": {
"options": {}
},
"type": "@n8n/n8n-nodes-langchain.chatTrigger",
"typeVersion": 1.3,
"position": [
128,
304
],
"id": "350f2dcc-4f8b-4dc7-861a-fe661b06348f",
"name": "When chat message received",
"webhookId": "873bddf5-f2ee-4ead-afa3-0a09463389ea"
}
],
"pinData": {},
"connections": {
"Google Gemini Chat Model": {
"ai_languageModel": [
[
{
"node": "AI Agent",
"type": "ai_languageModel",
"index": 0
}
]
]
},
"composite_query": {
"ai_tool": [
[
{
"node": "AI Agent",
"type": "ai_tool",
"index": 0
}
]
]
},
"When chat message received": {
"main": [
[
{
"node": "AI Agent",
"type": "main",
"index": 0
}
]
]
},
"AI Agent": {
"main": [
[]
]
}
},
"active": false,
"settings": {
"executionOrder": "v1"
},
"versionId": "16316f80-7cac-4bd4-b05e-b0c4230a9e85",
"meta": {
"templateCredsSetupCompleted": true,
"instanceId": "1dbf32ab27f7926a258ac270fe5e9e15871cfb01059a55b25aa401186050b9b5"
},
"id": "4lKXwzK514XEOuiY",
"tags": []
}
That's it. That's the full Context Mesh Lite. Cheers!
P.S. if you want these in managable files, use this form here to give me your email and I'll send them to you in a nice zip file:
https://vmat.fillout.com/context-mesh-lite
Top comments (0)