Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pgcli/key_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def pgcli_bindings(pgcli):
def _(event):
"""Enable/Disable SmartCompletion Mode."""
_logger.debug("Detected F2 key.")
pgcli.completer.smart_completion = not pgcli.completer.smart_completion
new_state = not pgcli.completer.smart_completion
pgcli.completer.set_smart_completion(new_state)

@kb.add("f3")
def _(event):
Expand Down
189 changes: 189 additions & 0 deletions pgcli/packages/history_freq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import sqlite3
import os
import platform
import re
import sqlparse
from sqlparse.tokens import Name
from os.path import expanduser
from .pgliterals.main import get_literals


white_space_regex = re.compile("\\s+", re.MULTILINE)


def _compile_regex(keyword):
# Surround the keyword with word boundaries and replace interior whitespace
# with whitespace wildcards
pattern = "\\b" + white_space_regex.sub(r"\\s+", keyword) + "\\b"
return re.compile(pattern, re.MULTILINE | re.IGNORECASE)


keywords = get_literals("keywords")
keyword_regexs = {kw: _compile_regex(kw) for kw in keywords}


def history_freq_location():
"""Return the path to the history frequency database location."""
if "XDG_DATA_HOME" in os.environ:
return "%s/pgcli/history_freq.db" % expanduser(os.environ["XDG_DATA_HOME"])
elif platform.system() == "Windows":
return os.getenv("USERPROFILE") + "\\AppData\\Local\\dbcli\\pgcli\\history_freq.db"
else:
return expanduser("~/.local/share/pgcli/history_freq.db")


class HistoryFrequency:
def __init__(self, db_path=None):
"""Initialize the history frequency tracker.
:param db_path: path to the SQLite database file.
"""
self.db_path = db_path or history_freq_location()
self.conn = None

# For in-memory databases, we need to keep the connection open
if self.db_path == ":memory:":
self.conn = sqlite3.connect(self.db_path)
self._create_table(self.conn)
else:
self._create_table()

def _create_table(self, conn=None):
"""Create the frequency tables if they don't exist."""
# Ensure directory exists (skip for in-memory databases)
if self.db_path != ":memory:":
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)

if conn is None:
conn = sqlite3.connect(self.db_path)
with conn:
self._create_tables_on_conn(conn)
conn.close()
else:
self._create_tables_on_conn(conn)

def _create_tables_on_conn(self, conn):
"""Create tables on the given connection."""
conn.execute("""
CREATE TABLE IF NOT EXISTS keyword_frequency (
keyword TEXT PRIMARY KEY,
frequency INTEGER DEFAULT 1,
last_used TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS name_frequency (
name TEXT PRIMARY KEY,
frequency INTEGER DEFAULT 1,
last_used TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

def _get_connection(self):
"""Get a database connection."""
if self.db_path == ":memory:":
return self.conn
return sqlite3.connect(self.db_path)

def _close_connection(self, conn):
"""Close the connection unless it's an in-memory database."""
if self.db_path != ":memory:":
conn.close()

def increment_keyword(self, keyword):
"""Increment the frequency count for a keyword."""
keyword = keyword.lower()
conn = self._get_connection()
with conn:
conn.execute("""
INSERT OR REPLACE INTO keyword_frequency (keyword, frequency, last_used)
VALUES (
?,
COALESCE((SELECT frequency FROM keyword_frequency WHERE keyword = ?), 0) + 1,
CURRENT_TIMESTAMP
)
""", (keyword, keyword))
self._close_connection(conn)

def increment_name(self, name):
"""Increment the frequency count for a name/identifier."""
name = name.lower()
conn = self._get_connection()
with conn:
conn.execute("""
INSERT OR REPLACE INTO name_frequency (name, frequency, last_used)
VALUES (
?,
COALESCE((SELECT frequency FROM name_frequency WHERE name = ?), 0) + 1,
CURRENT_TIMESTAMP
)
""", (name, name))
self._close_connection(conn)

def get_keyword_frequency(self, keyword):
"""Get the frequency count for a keyword."""
keyword = keyword.lower()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT frequency FROM keyword_frequency WHERE keyword = ?", (keyword,))
result = cursor.fetchone()
self._close_connection(conn)
return result[0] if result else 0

def get_name_frequency(self, name):
"""Get the frequency count for a name/identifier."""
name = name.lower()
conn = self._get_connection()
cursor = conn.cursor()
cursor.execute("SELECT frequency FROM name_frequency WHERE name = ?", (name,))
result = cursor.fetchone()
self._close_connection(conn)
return result[0] if result else 0

def update_from_text(self, text):
"""Update frequencies from SQL text by extracting keywords and names."""
self.update_keywords(text)
self.update_names(text)

def update_keywords(self, text):
"""Update keyword frequencies from SQL text."""
for keyword, regex in keyword_regexs.items():
count = len(list(regex.finditer(text)))
if count > 0:
keyword_lower = keyword.lower()
conn = self._get_connection()
with conn:
conn.execute("""
INSERT OR REPLACE INTO keyword_frequency (keyword, frequency, last_used)
VALUES (
?,
COALESCE((SELECT frequency FROM keyword_frequency WHERE keyword = ?), 0) + ?,
CURRENT_TIMESTAMP
)
""", (keyword_lower, keyword_lower, count))
self._close_connection(conn)

def update_names(self, text):
"""Update name/identifier frequencies from SQL text."""
for parsed in sqlparse.parse(text):
for token in parsed.flatten():
if token.ttype in Name:
name_lower = token.value.lower()
conn = self._get_connection()
with conn:
conn.execute("""
INSERT OR REPLACE INTO name_frequency (name, frequency, last_used)
VALUES (
?,
COALESCE((SELECT frequency FROM name_frequency WHERE name = ?), 0) + 1,
CURRENT_TIMESTAMP
)
""", (name_lower, name_lower))
self._close_connection(conn)

def clear(self):
"""Clear all frequency data."""
conn = self._get_connection()
with conn:
conn.execute("DELETE FROM keyword_frequency")
conn.execute("DELETE FROM name_frequency")
self._close_connection(conn)
16 changes: 14 additions & 2 deletions pgcli/packages/prioritization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from sqlparse.tokens import Name
from collections import defaultdict
from .pgliterals.main import get_literals
from .history_freq import HistoryFrequency


white_space_regex = re.compile("\\s+", re.MULTILINE)
Expand All @@ -20,9 +21,11 @@ def _compile_regex(keyword):


class PrevalenceCounter:
def __init__(self):
def __init__(self, use_history=False):
self.keyword_counts = defaultdict(int)
self.name_counts = defaultdict(int)
self.use_history = use_history
self.history_freq = HistoryFrequency() if use_history else None

def update(self, text):
self.update_keywords(text)
Expand All @@ -32,7 +35,10 @@ def update_names(self, text):
for parsed in sqlparse.parse(text):
for token in parsed.flatten():
if token.ttype in Name:
self.name_counts[token.value] += 1
value = token.value
self.name_counts[value] += 1
if self.use_history and self.history_freq:
self.history_freq.increment_name(value)

def clear_names(self):
self.name_counts = defaultdict(int)
Expand All @@ -43,9 +49,15 @@ def update_keywords(self, text):
for keyword, regex in keyword_regexs.items():
for _ in regex.finditer(text):
self.keyword_counts[keyword] += 1
if self.use_history and self.history_freq:
self.history_freq.increment_keyword(keyword)

def keyword_count(self, keyword):
if self.use_history and self.history_freq:
return self.keyword_counts[keyword] + self.history_freq.get_keyword_frequency(keyword)
return self.keyword_counts[keyword]

def name_count(self, name):
if self.use_history and self.history_freq:
return self.name_counts[name] + self.history_freq.get_name_frequency(name)
return self.name_counts[name]
4 changes: 3 additions & 1 deletion pgcli/pgclirc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

# Enables context sensitive auto-completion. If this is disabled, all
# possible completions will be listed.
smart_completion = True
# Also enables history-based frequency sorting when enabled (uses SQLite database
# to persist completion frequency data across sessions in ~/.config/pgcli/history_freq.db)
smart_completion = False

# Display the completions in several columns. (More completions will be
# visible.)
Expand Down
10 changes: 9 additions & 1 deletion pgcli/pgcompleter.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ def __init__(self, smart_completion=True, pgspecial=None, settings=None):
super().__init__()
self.smart_completion = smart_completion
self.pgspecial = pgspecial
self.prioritizer = PrevalenceCounter()
settings = settings or {}
# Use history-based frequency sorting when smart_completion is enabled
# History frequency data is persisted across sessions in ~/.config/pgcli/history_freq.db
self.prioritizer = PrevalenceCounter(use_history=self.smart_completion)
self.signature_arg_style = settings.get("signature_arg_style", "{arg_name} {arg_type}")
self.call_arg_style = settings.get("call_arg_style", "{arg_name: <{max_arg_len}} := {arg_default}")
self.call_arg_display_style = settings.get("call_arg_display_style", "{arg_name}")
Expand Down Expand Up @@ -307,6 +309,12 @@ def extend_query_history(self, text, is_init=False):
def set_search_path(self, search_path):
self.search_path = self.escaped_names(search_path)

def set_smart_completion(self, enabled):
"""Enable or disable smart completion (including history-based sorting)."""
self.smart_completion = enabled
# Recreate prioritizer with the new setting
self.prioritizer = PrevalenceCounter(use_history=self.smart_completion)

def reset_completions(self):
self.databases = []
self.special_commands = []
Expand Down