Qdrant Provider

Qdrant Provider

The Qdrant provider integrates with Qdrant vector database for storing, searching, and managing vector embeddings.

Installation

go get github.com/resolute/resolute/providers/qdrant

Configuration

QdrantConfig

type QdrantConfig struct {
    Host     string // Qdrant server host (e.g., "localhost:6334")
    APIKey   string // API key for authentication (optional)
    UseTLS   bool   // Enable TLS connection
    Timeout  time.Duration // Request timeout
}

Environment Variables

VariableDescriptionDefault
QDRANT_HOSTQdrant server addresslocalhost:6334
QDRANT_API_KEYAPI key for authentication-
QDRANT_USE_TLSEnable TLSfalse

Provider Constructor

NewProvider

func NewProvider(cfg QdrantConfig) *QdrantProvider

Creates a new Qdrant provider.

Parameters:

  • cfg - Qdrant configuration

Returns: *QdrantProvider implementing core.Provider

Example:

provider := qdrant.NewProvider(qdrant.QdrantConfig{
    Host:   os.Getenv("QDRANT_HOST"),
    APIKey: os.Getenv("QDRANT_API_KEY"),
    UseTLS: true,
})

Types

Point

type Point struct {
    ID      string            `json:"id"`
    Vector  []float32         `json:"vector"`
    Payload map[string]any    `json:"payload"`
}

ScoredPoint

type ScoredPoint struct {
    ID      string            `json:"id"`
    Score   float32           `json:"score"`
    Vector  []float32         `json:"vector"`
    Payload map[string]any    `json:"payload"`
}

Filter

type Filter struct {
    Must    []Condition `json:"must"`
    Should  []Condition `json:"should"`
    MustNot []Condition `json:"must_not"`
}

type Condition struct {
    Field string `json:"field"`
    Match Match  `json:"match"`
}

type Match struct {
    Value    any      `json:"value"`
    Values   []any    `json:"values"`
    Range    *Range   `json:"range"`
}

type Range struct {
    GT  *float64 `json:"gt"`
    GTE *float64 `json:"gte"`
    LT  *float64 `json:"lt"`
    LTE *float64 `json:"lte"`
}

CollectionConfig

type CollectionConfig struct {
    Name           string `json:"name"`
    VectorSize     int    `json:"vector_size"`
    Distance       string `json:"distance"` // "Cosine", "Euclid", "Dot"
    OnDiskPayload  bool   `json:"on_disk_payload"`
    ShardNumber    int    `json:"shard_number"`
    ReplicationFactor int `json:"replication_factor"`
}

Activities

Performs vector similarity search.

Input:

type SearchInput struct {
    Collection  string    `json:"collection"`
    Vector      []float32 `json:"vector"`
    Limit       int       `json:"limit"`
    Filter      *Filter   `json:"filter"`
    WithPayload bool      `json:"with_payload"`
    WithVector  bool      `json:"with_vector"`
    ScoreThreshold float32 `json:"score_threshold"`
}

Output:

type SearchOutput struct {
    Results []ScoredPoint `json:"results"`
    Count   int           `json:"count"`
}

Node Factory:

func Search(input SearchInput) *core.Node[SearchInput, SearchOutput]

Example:

searchNode := qdrant.Search(qdrant.SearchInput{
    Collection:  "documents",
    Vector:      queryEmbedding,
    Limit:       10,
    WithPayload: true,
    Filter: &qdrant.Filter{
        Must: []qdrant.Condition{
            {Field: "category", Match: qdrant.Match{Value: "technical"}},
        },
    },
})

SearchBatch

Performs multiple vector searches in a single request.

Input:

type SearchBatchInput struct {
    Collection string      `json:"collection"`
    Searches   []SearchInput `json:"searches"`
}

Output:

type SearchBatchOutput struct {
    Results [][]ScoredPoint `json:"results"`
}

Node Factory:

func SearchBatch(input SearchBatchInput) *core.Node[SearchBatchInput, SearchBatchOutput]

Upsert

Inserts or updates points in a collection.

Input:

type UpsertInput struct {
    Collection string  `json:"collection"`
    Points     []Point `json:"points"`
    Wait       bool    `json:"wait"` // Wait for operation to complete
}

Output:

type UpsertOutput struct {
    Status    string `json:"status"`
    Upserted  int    `json:"upserted"`
}

Node Factory:

func Upsert(input UpsertInput) *core.Node[UpsertInput, UpsertOutput]

Example:

upsertNode := qdrant.Upsert(qdrant.UpsertInput{
    Collection: "documents",
    Points: []qdrant.Point{
        {
            ID:     "doc-1",
            Vector: embedding1,
            Payload: map[string]any{
                "title":    "Document Title",
                "category": "technical",
            },
        },
    },
    Wait: true,
})

UpsertBatch

Inserts points in batches for large datasets.

Input:

type UpsertBatchInput struct {
    Collection string  `json:"collection"`
    Points     []Point `json:"points"`
    BatchSize  int     `json:"batch_size"` // Points per batch (default: 100)
    Wait       bool    `json:"wait"`
}

Output:

type UpsertBatchOutput struct {
    Status   string `json:"status"`
    Upserted int    `json:"upserted"`
    Batches  int    `json:"batches"`
}

Node Factory:

func UpsertBatch(input UpsertBatchInput) *core.Node[UpsertBatchInput, UpsertBatchOutput]

Delete

Deletes points from a collection.

Input:

type DeleteInput struct {
    Collection string   `json:"collection"`
    IDs        []string `json:"ids"`
    Filter     *Filter  `json:"filter"`
    Wait       bool     `json:"wait"`
}

Output:

type DeleteOutput struct {
    Status  string `json:"status"`
    Deleted int    `json:"deleted"`
}

Node Factory:

func Delete(input DeleteInput) *core.Node[DeleteInput, DeleteOutput]

Example:

// Delete by IDs
deleteNode := qdrant.Delete(qdrant.DeleteInput{
    Collection: "documents",
    IDs:        []string{"doc-1", "doc-2"},
})

// Delete by filter
deleteNode := qdrant.Delete(qdrant.DeleteInput{
    Collection: "documents",
    Filter: &qdrant.Filter{
        Must: []qdrant.Condition{
            {Field: "expired", Match: qdrant.Match{Value: true}},
        },
    },
})

GetPoints

Retrieves specific points by ID.

Input:

type GetPointsInput struct {
    Collection  string   `json:"collection"`
    IDs         []string `json:"ids"`
    WithPayload bool     `json:"with_payload"`
    WithVector  bool     `json:"with_vector"`
}

Output:

type GetPointsOutput struct {
    Points []Point `json:"points"`
}

Node Factory:

func GetPoints(input GetPointsInput) *core.Node[GetPointsInput, GetPointsOutput]

Scroll

Iterates through all points in a collection.

Input:

type ScrollInput struct {
    Collection  string  `json:"collection"`
    Filter      *Filter `json:"filter"`
    Limit       int     `json:"limit"`
    Offset      string  `json:"offset"` // Point ID to start from
    WithPayload bool    `json:"with_payload"`
    WithVector  bool    `json:"with_vector"`
}

Output:

type ScrollOutput struct {
    Points     []Point `json:"points"`
    NextOffset string  `json:"next_offset"`
}

Node Factory:

func Scroll(input ScrollInput) *core.Node[ScrollInput, ScrollOutput]

CreateCollection

Creates a new collection.

Input:

type CreateCollectionInput struct {
    Config CollectionConfig `json:"config"`
}

Output:

type CreateCollectionOutput struct {
    Status string `json:"status"`
}

Node Factory:

func CreateCollection(input CreateCollectionInput) *core.Node[CreateCollectionInput, CreateCollectionOutput]

Example:

createNode := qdrant.CreateCollection(qdrant.CreateCollectionInput{
    Config: qdrant.CollectionConfig{
        Name:       "documents",
        VectorSize: 384,
        Distance:   "Cosine",
    },
})

DeleteCollection

Deletes a collection.

Input:

type DeleteCollectionInput struct {
    Name string `json:"name"`
}

Output:

type DeleteCollectionOutput struct {
    Status string `json:"status"`
}

Node Factory:

func DeleteCollection(input DeleteCollectionInput) *core.Node[DeleteCollectionInput, DeleteCollectionOutput]

CollectionInfo

Gets collection information.

Input:

type CollectionInfoInput struct {
    Name string `json:"name"`
}

Output:

type CollectionInfoOutput struct {
    Status       string `json:"status"`
    VectorsCount int64  `json:"vectors_count"`
    PointsCount  int64  `json:"points_count"`
    Config       CollectionConfig `json:"config"`
}

Node Factory:

func CollectionInfo(input CollectionInfoInput) *core.Node[CollectionInfoInput, CollectionInfoOutput]

Usage Patterns

flow := core.NewFlow("semantic-search").
    TriggeredBy(core.Manual("api")).
    Then(ollama.Embed(ollama.EmbedInput{
        Model: "nomic-embed-text",
        Input: core.Output("input.query"),
    }).As("embedding")).
    Then(qdrant.Search(qdrant.SearchInput{
        Collection:  "documents",
        Vector:      core.Output("embedding.Embeddings[0]"),
        Limit:       10,
        WithPayload: true,
    }).As("results")).
    Build()

Embedding Pipeline with Qdrant Storage

flow := core.NewFlow("embedding-pipeline").
    TriggeredBy(core.Schedule("0 2 * * *")).
    Then(fetchDocumentsNode.As("docs")).
    Then(ollama.EmbedBatch(ollama.EmbedBatchInput{
        Model:     "nomic-embed-text",
        Texts:     core.Output("docs.texts"),
        BatchSize: 32,
    }).As("embeddings")).
    Then(preparePointsNode.As("points")).
    Then(qdrant.UpsertBatch(qdrant.UpsertBatchInput{
        Collection: "documents",
        Points:     core.Output("points.data"),
        BatchSize:  100,
        Wait:       true,
    })).
    Build()
searchNode := qdrant.Search(qdrant.SearchInput{
    Collection: "products",
    Vector:     queryVector,
    Limit:      20,
    Filter: &qdrant.Filter{
        Must: []qdrant.Condition{
            {Field: "category", Match: qdrant.Match{Value: "electronics"}},
            {Field: "in_stock", Match: qdrant.Match{Value: true}},
            {Field: "price", Match: qdrant.Match{Range: &qdrant.Range{
                GTE: ptr(10.0),
                LTE: ptr(100.0),
            }}},
        },
    },
    WithPayload: true,
})

Collection Setup Flow

flow := core.NewFlow("setup-collection").
    TriggeredBy(core.Manual("setup")).
    Then(qdrant.CreateCollection(qdrant.CreateCollectionInput{
        Config: qdrant.CollectionConfig{
            Name:       "documents",
            VectorSize: 384,
            Distance:   "Cosine",
        },
    })).
    Build()

Scroll Through All Points

flow := core.NewFlow("export-collection").
    TriggeredBy(core.Manual("export")).
    Then(qdrant.Scroll(qdrant.ScrollInput{
        Collection:  "documents",
        Limit:       100,
        WithPayload: true,
    }).As("page")).
    While(func(s *core.FlowState) bool {
        page := core.Get[qdrant.ScrollOutput](s, "page")
        return page.NextOffset != ""
    }).
        Then(exportPageNode).
        Then(nextScrollNode).
    EndWhile().
    Build()

Complete Example

package main

import (
    "os"
    "time"

    "github.com/resolute/resolute/core"
    "github.com/resolute/resolute/providers/ollama"
    "github.com/resolute/resolute/providers/qdrant"
)

func main() {
    // Configure providers
    ollamaProvider := ollama.NewProvider(ollama.OllamaConfig{
        Host:    "http://localhost:11434",
        Timeout: 10 * time.Minute,
    })

    qdrantProvider := qdrant.NewProvider(qdrant.QdrantConfig{
        Host:   os.Getenv("QDRANT_HOST"),
        APIKey: os.Getenv("QDRANT_API_KEY"),
    })

    // Build RAG query flow
    flow := core.NewFlow("rag-query").
        TriggeredBy(core.Manual("query")).
        // Embed the query
        Then(ollama.Embed(ollama.EmbedInput{
            Model: "nomic-embed-text",
            Input: core.Output("input.query"),
        }).As("query-embedding")).
        // Search for relevant documents
        Then(qdrant.Search(qdrant.SearchInput{
            Collection:     "knowledge-base",
            Vector:         core.Output("query-embedding.Embeddings[0]"),
            Limit:          5,
            WithPayload:    true,
            ScoreThreshold: 0.7,
        }).As("context")).
        // Generate response
        Then(ollama.Chat(ollama.ChatInput{
            Model: "llama3.2",
            Messages: []ollama.Message{
                {Role: "system", Content: "Answer questions based on the provided context."},
                {Role: "user", Content: core.Output("context.formatted_query")},
            },
        }).As("response")).
        Build()

    // Run worker
    err := core.NewWorker().
        WithConfig(core.WorkerConfig{
            TaskQueue: "rag-queries",
        }).
        WithFlow(flow).
        WithProviders(ollamaProvider, qdrantProvider).
        Run()

    if err != nil {
        panic(err)
    }
}

See Also