Compare commits

...

9 Commits

Author SHA1 Message Date
Dr. Kiji
b94dca52be [WIP] add type ignore 2025-01-02 20:28:58 +09:00
Dr. Kiji
6d7eb67787 Revert "[WIP]for test perpose only"
This reverts commit 7bdedbe645.
2025-01-02 20:26:44 +09:00
Dr. Kiji
7bdedbe645 [WIP]for test perpose only 2025-01-02 20:25:12 +09:00
Dr. Kiji
2f6bfe8e30 add type annotation 2025-01-02 20:24:57 +09:00
Dr. Kiji
4f5a4e7194 [WIP] before final test 2025-01-02 20:11:44 +09:00
Dr. Kiji
75dd8677b9 improve the consistancy 2025-01-02 19:54:34 +09:00
Dr. Kiji
610d069b69 fix lint 2025-01-02 19:15:59 +09:00
Dr. Kiji
77030d7581 linting 2025-01-02 19:14:16 +09:00
Dr. Kiji
81c5953fa5 add mecab keywords handler for japanese 2025-01-02 18:50:23 +09:00
7 changed files with 1218 additions and 0 deletions

View File

@@ -24,6 +24,10 @@ class Keyword:
from core.rag.datasource.keyword.jieba.jieba import Jieba
return Jieba
case KeyWordType.MECAB:
from core.rag.datasource.keyword.mecab.mecab import MeCab
return MeCab
case _:
raise ValueError(f"Keyword store {keyword_type} is not supported.")

View File

@@ -3,3 +3,4 @@ from enum import StrEnum
class KeyWordType(StrEnum):
JIEBA = "jieba"
MECAB = "mecab"

View File

@@ -0,0 +1,339 @@
# MeCab Keyword Processor for Dify
A Japanese text keyword extraction module for Dify's RAG system, powered by MeCab morphological analyzer.
## Overview
This module provides Japanese text keyword extraction capabilities using the MeCab morphological analyzer. It's designed to:
- Extract meaningful keywords from Japanese text
- Handle compound words and technical terms
- Support custom dictionaries
- Provide configurable scoring based on parts of speech
- Handle mixed Japanese-English text
## Components
### 1. MeCabKeywordTableHandler
The core component responsible for keyword extraction using MeCab:
```python
handler = MeCabKeywordTableHandler(
dictionary_path="/path/to/dict", # Optional custom dictionary
user_dictionary_path="/path/to/user_dict" # Optional user dictionary
)
keywords = handler.extract_keywords(text, max_keywords=10)
```
#### Features:
- **Part of Speech (POS) Weighting**:
```python
pos_weights = {
'名詞': 1.0, # Nouns
'動詞': 0.8, # Verbs
'形容詞': 0.6, # Adjectives
'副詞': 0.4, # Adverbs
'連体詞': 0.3, # Adnominal adjectives
'感動詞': 0.2, # Interjections
}
```
- **Special Term Handling**:
- Boosts scores for proper nouns (固有名詞)
- Boosts scores for technical terms (専門用語)
- Compound word detection (e.g., "機械学習", "自然言語処理")
- **Reading Normalization**:
- Handles different forms of the same word
- Normalizes compound terms using readings
### 2. Configuration (MeCabConfig)
Configurable settings for the processor:
```python
class MeCabConfig(BaseModel):
max_keywords_per_chunk: int = 10
min_keyword_length: int = 2
score_threshold: float = 0.3
storage_type: str = "database"
cache_timeout: int = 3600
dictionary_path: str = ""
user_dictionary_path: str = ""
pos_weights: dict = {...}
```
### 3. Stopwords
Comprehensive Japanese stopword list including:
- Particles (は, が, の, etc.)
- Auxiliary verbs (です, ます, etc.)
- Pronouns (これ, それ, etc.)
- Common words
- Numbers and punctuation
- Common English stopwords for mixed text
## Usage
### Basic Usage
```python
from core.rag.datasource.keyword.keyword_factory import Keyword
from models.dataset import Dataset
# Initialize with KEYWORD_STORE = "mecab" in config
keyword_processor = Keyword(dataset)
# Process documents
documents = [
Document(
page_content="自然言語処理は人工知能の重要な分野です。",
metadata={"doc_id": "1"}
)
]
keyword_processor.create(documents)
# Search
results = keyword_processor.search("自然言語処理")
```
## Configuration
### Basic Settings
```python
# In your environment configuration:
KEYWORD_STORE = "mecab"
KEYWORD_DATA_SOURCE_TYPE = "database" # or other supported storage types
```
### Advanced Settings
```python
# MeCab-specific configuration
MECAB_CONFIG = {
"max_keywords_per_chunk": 10,
"score_threshold": 0.3,
"dictionary_path": "/path/to/dict", # Optional
"user_dictionary_path": "/path/to/user_dict", # Optional
"pos_weights": {
"名詞": 1.0, # Nouns
"動詞": 0.8, # Verbs
"形容詞": 0.6 # Adjectives
}
}
```
## Key Features
### 1. Intelligent Keyword Extraction
- Part-of-speech based scoring
- Compound word detection
- Technical term recognition
- Reading normalization for variations
### 2. Storage Options
- Database storage (default)
- File-based storage
- Concurrent access support via Redis locking
### 3. Error Handling
- Comprehensive exception handling
- Detailed logging
- Graceful fallbacks
## Dependencies
```bash
# Ubuntu/Debian
apt-get install mecab mecab-ipadic-utf8 python3-mecab
# macOS
brew install mecab mecab-ipadic
pip install mecab-python3
```
## Best Practices
1. **Performance**
- Use batch processing for large datasets
- Configure appropriate cache timeouts
- Monitor memory usage
2. **Customization**
- Update dictionaries regularly
- Adjust POS weights for your use case
- Set appropriate thresholds
3. **Error Handling**
- Implement proper logging
- Handle dictionary loading errors
- Manage concurrent access
## Example Usage
### Basic Keyword Extraction
```python
# Extract keywords from text
text = "自然言語処理は人工知能の重要な分野です。"
keywords = keyword_processor.create([
Document(page_content=text, metadata={"doc_id": "1"})
])
```
### Custom Dictionary
```python
# Use custom dictionary
config = MeCabConfig(
dictionary_path="/path/to/dict",
user_dictionary_path="/path/to/user.dic"
)
```
### Batch Processing
```python
# Process multiple documents
documents = [
Document(page_content=text1, metadata={"doc_id": "1"}),
Document(page_content=text2, metadata={"doc_id": "2"})
]
keyword_processor.create(documents)
```
## Integration with Dify
The MeCab processor integrates seamlessly with Dify's existing keyword system:
1. Implements the `BaseKeyword` interface
2. Works with the keyword factory system
3. Supports all standard operations:
- Document indexing
- Keyword extraction
- Search functionality
- Index management
## Common Issues
1. **Dictionary Loading**
```python
try:
keyword_processor.create(documents)
except KeywordProcessorError as e:
logger.error("Dictionary loading failed: %s", str(e))
```
2. **Memory Management**
```python
# Process in batches
batch_size = 100
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
keyword_processor.create(batch)
```
3. **Concurrent Access**
```python
# Handled automatically via Redis locks
keyword_processor.create(documents) # Safe for concurrent use
```
For more details, refer to the [Dify Documentation](https://docs.dify.ai).
## Text Processing Examples
### Compound Words
The MeCab processor intelligently handles compound words in Japanese text:
```python
text = "人工知能と機械学習の研究を行っています。"
keywords = keyword_processor.create([
Document(page_content=text, metadata={"doc_id": "1"})
])
# Extracted keywords include:
# - "人工知能" (artificial intelligence - compound)
# - "機械学習" (machine learning - compound)
# - "研究" (research - single)
```
Complex technical terms are properly recognized:
```python
text = "自然言語処理における深層学習の応用"
# Extracts:
# - "自然言語処理" (natural language processing)
# - "深層学習" (deep learning)
# - "応用" (application)
```
### Stopwords Handling
Common particles and auxiliary words are automatically filtered:
```python
text = "私はデータベースの設計をしています。"
# Ignores:
# - "は" (particle)
# - "の" (particle)
# - "を" (particle)
# - "います" (auxiliary verb)
# Extracts:
# - "データベース" (database)
# - "設計" (design)
```
Mixed language text is also handled appropriately:
```python
text = "AIシステムのパフォーマンスを改善する。"
# Ignores:
# - "の" (particle)
# - "を" (particle)
# - "する" (auxiliary verb)
# Extracts:
# - "AI" (kept as is)
# - "システム" (system)
# - "パフォーマンス" (performance)
# - "改善" (improvement)
```
### Reading Variations
The processor normalizes different forms of the same word:
```python
text1 = "データベース設計" # カタカナ
text2 = "データベース設計" # with readings
# Both normalize to the same keywords:
# - "データベース"
# - "設計"
```
### Technical Term Boosting
Technical terms receive higher scores in keyword extraction:
```python
text = "機械学習モデルを用いた自然言語処理の研究"
# Prioritizes technical terms:
# High score:
# - "機械学習" (machine learning)
# - "自然言語処理" (natural language processing)
# Lower score:
# - "研究" (research)
# - "モデル" (model)
```

View File

@@ -0,0 +1,21 @@
from pydantic import BaseModel
class MeCabConfig(BaseModel):
"""Configuration for MeCab keyword processor."""
max_keywords_per_chunk: int = 10
min_keyword_length: int = 2
score_threshold: float = 0.3
storage_type: str = "database"
cache_timeout: int = 3600
# MeCab specific settings
dictionary_path: str = "" # Optional custom dictionary path
user_dictionary_path: str = "" # Optional user dictionary path
pos_weights: dict = {
"名詞": 1.0, # Nouns
"動詞": 0.8, # Verbs
"形容詞": 0.6, # Adjectives
"副詞": 0.4, # Adverbs
}

View File

@@ -0,0 +1,516 @@
import json
import logging
import os
from collections import defaultdict
from typing import Any, Optional
from core.rag.datasource.keyword.keyword_base import BaseKeyword
from core.rag.datasource.keyword.mecab.config import MeCabConfig
from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler
from core.rag.models.document import Document
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment
logger = logging.getLogger(__name__)
class KeywordProcessorError(Exception):
"""Base error for keyword processing."""
pass
class KeywordExtractionError(KeywordProcessorError):
"""Error during keyword extraction."""
pass
class KeywordStorageError(KeywordProcessorError):
"""Error during storage operations."""
pass
class SetEncoder(json.JSONEncoder):
"""JSON encoder that handles sets."""
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super().default(obj)
class MeCab(BaseKeyword):
"""Japanese keyword processor using MeCab morphological analyzer."""
def __init__(self, dataset: Dataset):
super().__init__(dataset)
self._config = MeCabConfig()
self._keyword_handler: MeCabKeywordTableHandler = MeCabKeywordTableHandler()
self._init_handler()
def _init_handler(self) -> None:
"""Initialize MeCab handler with configuration."""
try:
self._keyword_handler = MeCabKeywordTableHandler(
dictionary_path=self._config.dictionary_path, user_dictionary_path=self._config.user_dictionary_path
)
if self._config.pos_weights:
self._keyword_handler.pos_weights = self._config.pos_weights
self._keyword_handler.min_score = self._config.score_threshold
except Exception as e:
logger.exception("Failed to initialize MeCab handler")
raise KeywordProcessorError("MeCab initialization failed: {}".format(str(e)))
def create(self, texts: list[Document], **kwargs: Any) -> BaseKeyword:
"""Create keyword index for documents.
Args:
texts: List of documents to index
**kwargs: Additional arguments
Returns:
BaseKeyword: Self for method chaining
Raises:
KeywordProcessorError: If indexing fails
KeywordExtractionError: If keyword extraction fails
KeywordStorageError: If storage operations fail
"""
if not texts:
return self
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
try:
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
keyword_table = {}
for text in texts:
if not text.page_content or not text.metadata or "doc_id" not in text.metadata:
logger.warning("Skipping invalid document: {}".format(text))
continue
try:
keywords = self._keyword_handler.extract_keywords(
text.page_content, self._config.max_keywords_per_chunk
)
self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords))
keyword_table = self._add_text_to_keyword_table(
keyword_table, text.metadata["doc_id"], list(keywords)
)
except Exception as e:
logger.exception("Failed to process document: {}".format(text.metadata.get("doc_id")))
raise KeywordExtractionError("Failed to extract keywords: {}".format(str(e)))
try:
self._save_dataset_keyword_table(keyword_table)
except Exception as e:
logger.exception("Failed to save keyword table")
raise KeywordStorageError("Failed to save keyword table: {}".format(str(e)))
except Exception as e:
if not isinstance(e, (KeywordExtractionError, KeywordStorageError)):
logger.exception("Unexpected error during keyword indexing")
raise KeywordProcessorError("Keyword indexing failed: {}".format(str(e)))
raise
return self
def add_texts(self, texts: list[Document], **kwargs: Any) -> None:
"""Add new texts to existing index.
Args:
texts: List of documents to add
**kwargs: Additional arguments including optional keywords_list
Raises:
KeywordProcessorError: If indexing fails
KeywordStorageError: If storage operations fail
"""
if not texts:
return
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
try:
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
keyword_table = {}
keywords_list = kwargs.get("keywords_list")
for i, text in enumerate(texts):
if not text.page_content or not text.metadata or "doc_id" not in text.metadata:
logger.warning("Skipping invalid document: {}".format(text))
continue
try:
if keywords_list:
keywords = keywords_list[i]
if not keywords:
keywords = self._keyword_handler.extract_keywords(
text.page_content, self._config.max_keywords_per_chunk
)
else:
keywords = self._keyword_handler.extract_keywords(
text.page_content, self._config.max_keywords_per_chunk
)
self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords))
keyword_table = self._add_text_to_keyword_table(
keyword_table, text.metadata["doc_id"], list(keywords)
)
except Exception as e:
logger.exception("Failed to process document: {}".format(text.metadata.get("doc_id")))
continue
try:
self._save_dataset_keyword_table(keyword_table)
except Exception as e:
logger.exception("Failed to save keyword table")
raise KeywordStorageError("Failed to save keyword table: {}".format(str(e)))
except Exception as e:
if not isinstance(e, KeywordStorageError):
logger.exception("Unexpected error during keyword indexing")
raise KeywordProcessorError("Keyword indexing failed: {}".format(str(e)))
raise
def text_exists(self, id: str) -> bool:
"""Check if text exists in index.
Args:
id: Document ID to check
Returns:
bool: True if text exists, False otherwise
Raises:
KeywordProcessorError: If check fails
"""
if not id:
return False
try:
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
return False
return id in set.union(*keyword_table.values()) if keyword_table else False
except Exception as e:
logger.exception("Failed to check text existence")
raise KeywordProcessorError("Failed to check text existence: {}".format(str(e)))
def delete_by_ids(self, ids: list[str]) -> None:
"""Delete texts by IDs.
Args:
ids: List of document IDs to delete
Raises:
KeywordStorageError: If deletion fails
"""
if not ids:
return
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
try:
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
if keyword_table is not None:
keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids)
self._save_dataset_keyword_table(keyword_table)
except Exception as e:
logger.exception("Failed to delete documents")
raise KeywordStorageError("Failed to delete documents: {}".format(str(e)))
def delete(self) -> None:
"""Delete entire index.
Raises:
KeywordStorageError: If deletion fails
"""
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
try:
with redis_client.lock(lock_name, timeout=600):
dataset_keyword_table = self.dataset.dataset_keyword_table
if dataset_keyword_table:
db.session.delete(dataset_keyword_table)
db.session.commit()
if dataset_keyword_table.data_source_type != "database":
file_key = os.path.join("keyword_files", self.dataset.tenant_id, self.dataset.id + ".txt")
storage.delete(file_key)
except Exception as e:
logger.exception("Failed to delete index")
raise KeywordStorageError("Failed to delete index: {}".format(str(e)))
def search(self, query: str, **kwargs: Any) -> list[Document]:
"""Search documents using keywords.
Args:
query: Search query string
**kwargs: Additional arguments including optional top_k
Returns:
List[Document]: List of matching documents
Raises:
KeywordProcessorError: If search fails
"""
if not query:
return []
try:
keyword_table = self._get_dataset_keyword_table()
k = kwargs.get("top_k", 4)
sorted_chunk_indices = self._retrieve_ids_by_query(keyword_table or {}, query, k)
if not sorted_chunk_indices:
return []
documents = []
for chunk_index in sorted_chunk_indices:
segment = (
db.session.query(DocumentSegment)
.filter(DocumentSegment.dataset_id == self.dataset.id, DocumentSegment.index_node_id == chunk_index)
.first()
)
if segment:
documents.append(
Document(
page_content=segment.content,
metadata={
"doc_id": chunk_index,
"doc_hash": segment.index_node_hash,
"document_id": segment.document_id,
"dataset_id": segment.dataset_id,
},
)
)
return documents
except Exception as e:
logger.exception("Failed to search documents")
raise KeywordProcessorError("Search failed: {}".format(str(e)))
def _get_dataset_keyword_table(self) -> Optional[dict[str, set[str]]]:
"""Get keyword table from storage."""
try:
dataset_keyword_table = self.dataset.dataset_keyword_table
if dataset_keyword_table:
keyword_table_dict = dataset_keyword_table.keyword_table_dict
if keyword_table_dict:
return dict(keyword_table_dict["__data__"]["table"])
else:
# Create new dataset keyword table if it doesn't exist
from configs import dify_config
keyword_data_source_type = dify_config.KEYWORD_DATA_SOURCE_TYPE
dataset_keyword_table = DatasetKeywordTable(
dataset_id=self.dataset.id,
keyword_table="",
data_source_type=keyword_data_source_type,
)
if keyword_data_source_type == "database":
dataset_keyword_table.keyword_table = json.dumps(
{
"__type__": "keyword_table",
"__data__": {"index_id": self.dataset.id, "summary": None, "table": {}},
},
cls=SetEncoder,
)
db.session.add(dataset_keyword_table)
db.session.commit()
return {}
except Exception as e:
logger.exception("Failed to get keyword table")
raise KeywordStorageError("Failed to get keyword table: {}".format(str(e)))
def _save_dataset_keyword_table(self, keyword_table: dict[str, set[str]]) -> None:
"""Save keyword table to storage."""
if keyword_table is None:
raise ValueError("Keyword table cannot be None")
table_dict = {
"__type__": "keyword_table",
"__data__": {"index_id": self.dataset.id, "summary": None, "table": keyword_table},
}
try:
dataset_keyword_table = self.dataset.dataset_keyword_table
if not dataset_keyword_table:
raise KeywordStorageError("Dataset keyword table not found")
data_source_type = dataset_keyword_table.data_source_type
if data_source_type == "database":
dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder)
db.session.commit()
else:
file_key = os.path.join("keyword_files", self.dataset.tenant_id, self.dataset.id + ".txt")
if storage.exists(file_key):
storage.delete(file_key)
storage.save(file_key, json.dumps(table_dict, cls=SetEncoder).encode("utf-8"))
except Exception as e:
logger.exception("Failed to save keyword table")
raise KeywordStorageError("Failed to save keyword table: {}".format(str(e)))
def _add_text_to_keyword_table(
self, keyword_table: dict[str, set[str]], id: str, keywords: list[str]
) -> dict[str, set[str]]:
"""Add text keywords to table."""
if not id or not keywords:
return keyword_table
for keyword in keywords:
if keyword not in keyword_table:
keyword_table[keyword] = set()
keyword_table[keyword].add(id)
return keyword_table
def _delete_ids_from_keyword_table(self, keyword_table: dict[str, set[str]], ids: list[str]) -> dict[str, set[str]]:
"""Delete IDs from keyword table."""
if not keyword_table or not ids:
return keyword_table
node_idxs_to_delete = set(ids)
keywords_to_delete = set()
for keyword, node_idxs in keyword_table.items():
if node_idxs_to_delete.intersection(node_idxs):
keyword_table[keyword] = node_idxs.difference(node_idxs_to_delete)
if not keyword_table[keyword]:
keywords_to_delete.add(keyword)
for keyword in keywords_to_delete:
del keyword_table[keyword]
return keyword_table
def _retrieve_ids_by_query(self, keyword_table: dict[str, set[str]], query: str, k: int = 4) -> list[str]:
"""Retrieve document IDs by query."""
if not query or not keyword_table:
return []
try:
keywords = self._keyword_handler.extract_keywords(query)
# Score documents based on matching keywords
chunk_indices_count: dict[str, int] = defaultdict(int)
keywords_list = [keyword for keyword in keywords if keyword in set(keyword_table.keys())]
for keyword in keywords_list:
for node_id in keyword_table[keyword]:
chunk_indices_count[node_id] += 1
# Sort by score in descending order
sorted_chunk_indices = sorted(
chunk_indices_count.keys(),
key=lambda x: chunk_indices_count[x],
reverse=True,
)
return sorted_chunk_indices[:k]
except Exception as e:
logger.exception("Failed to retrieve IDs by query")
raise KeywordExtractionError("Failed to retrieve IDs: {}".format(str(e)))
def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]) -> None:
"""Update segment keywords in database."""
if not dataset_id or not node_id or not keywords:
return
try:
document_segment = (
db.session.query(DocumentSegment)
.filter(DocumentSegment.dataset_id == dataset_id, DocumentSegment.index_node_id == node_id)
.first()
)
if document_segment:
document_segment.keywords = keywords
db.session.add(document_segment)
db.session.commit()
except Exception as e:
logger.exception("Failed to update segment keywords")
raise KeywordStorageError("Failed to update segment keywords: {}".format(str(e)))
def create_segment_keywords(self, node_id: str, keywords: list[str]) -> None:
"""Create keywords for a single segment.
Args:
node_id: The segment node ID
keywords: List of keywords to add
"""
if not node_id or not keywords:
return
try:
keyword_table = self._get_dataset_keyword_table()
self._update_segment_keywords(self.dataset.id, node_id, keywords)
keyword_table = self._add_text_to_keyword_table(keyword_table or {}, node_id, keywords)
self._save_dataset_keyword_table(keyword_table)
except Exception as e:
logger.exception("Failed to create segment keywords")
raise KeywordProcessorError("Failed to create segment keywords: {}".format(str(e)))
def multi_create_segment_keywords(self, pre_segment_data_list: list[dict[str, Any]]) -> None:
"""Create keywords for multiple segments in batch."""
if not pre_segment_data_list:
return
try:
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
keyword_table = {}
for pre_segment_data in pre_segment_data_list:
segment = pre_segment_data["segment"]
if not segment:
continue
try:
if pre_segment_data.get("keywords"):
segment.keywords = pre_segment_data["keywords"]
keyword_table = self._add_text_to_keyword_table(
keyword_table, segment.index_node_id, pre_segment_data["keywords"]
)
else:
keywords = self._keyword_handler.extract_keywords(
segment.content, self._config.max_keywords_per_chunk
)
segment.keywords = list(keywords)
keyword_table = self._add_text_to_keyword_table(
keyword_table, segment.index_node_id, list(keywords)
)
except Exception as e:
logger.exception("Failed to process segment: {}".format(segment.index_node_id))
continue
self._save_dataset_keyword_table(keyword_table)
except Exception as e:
logger.exception("Failed to create multiple segment keywords")
raise KeywordProcessorError("Failed to create multiple segment keywords: {}".format(str(e)))
def update_segment_keywords_index(self, node_id: str, keywords: list[str]) -> None:
"""Update keywords index for a segment.
Args:
node_id: The segment node ID
keywords: List of keywords to update
"""
if not node_id or not keywords:
return
try:
keyword_table = self._get_dataset_keyword_table()
keyword_table = self._add_text_to_keyword_table(keyword_table or {}, node_id, keywords)
self._save_dataset_keyword_table(keyword_table)
except Exception as e:
logger.exception("Failed to update segment keywords index")
raise KeywordStorageError("Failed to update segment keywords index: {}".format(str(e)))

View File

@@ -0,0 +1,147 @@
from collections import defaultdict
from operator import itemgetter
from typing import Optional
import MeCab # type: ignore
from core.rag.datasource.keyword.mecab.stopwords import STOPWORDS
class MeCabKeywordTableHandler:
"""Japanese keyword extraction using MeCab morphological analyzer."""
def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""):
"""Initialize MeCab tokenizer.
Args:
dictionary_path: Path to custom system dictionary
user_dictionary_path: Path to user dictionary
"""
try:
# Build MeCab argument string
mecab_args = ["-Ochasen"] # Use ChaSen format for detailed POS info
if dictionary_path:
mecab_args.append(f"-d {dictionary_path}")
if user_dictionary_path:
mecab_args.append(f"-u {user_dictionary_path}")
self.tagger = MeCab.Tagger(" ".join(mecab_args))
self.tagger.parse("") # Force initialization to catch dictionary errors
except RuntimeError as e:
raise RuntimeError(f"Failed to initialize MeCab: {str(e)}")
# POS weights for scoring
self.pos_weights = {
"名詞": 1.0, # Nouns
"動詞": 0.8, # Verbs
"形容詞": 0.6, # Adjectives
"副詞": 0.4, # Adverbs
"連体詞": 0.3, # Adnominal adjectives
"感動詞": 0.2, # Interjections
}
self.min_score = 0.3
def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> set[str]:
"""Extract keywords from Japanese text using MeCab.
Args:
text: Input text to extract keywords from
max_keywords_per_chunk: Maximum number of keywords to extract
Returns:
Set of extracted keywords
"""
if not text or not text.strip():
return set()
try:
# Parse text with MeCab
self.tagger.parse("") # Clear tagger state
node = self.tagger.parseToNode(text)
# Calculate term frequencies and scores
term_scores: defaultdict[str, float] = defaultdict(float)
while node:
features = node.feature.split(",")
if len(features) > 0:
pos = features[0] # Part of speech
pos_subtype = features[1] if len(features) > 1 else ""
base_form = features[6] if len(features) > 6 else node.surface
# Score the term based on its POS
if pos in self.pos_weights and base_form not in STOPWORDS:
score = self.pos_weights[pos]
# Boost proper nouns and technical terms
if pos == "名詞" and pos_subtype in ["固有名詞", "専門用語"]:
score *= 1.5
if len(base_form) > 1: # Filter out single characters
term_scores[base_form] += score
node = node.next
# Get top scoring terms
sorted_terms = sorted(term_scores.items(), key=itemgetter(1), reverse=True)
# Filter by minimum score and take top N
keywords = {term for term, score in sorted_terms if score >= self.min_score}
if max_keywords_per_chunk:
keywords = set(list(keywords)[:max_keywords_per_chunk])
# Expand with compound terms
expanded_keywords = self._expand_tokens_with_compounds(keywords, text)
return expanded_keywords
except Exception as e:
raise RuntimeError(f"Failed to extract keywords: {str(e)}")
def _expand_tokens_with_compounds(self, keywords: set[str], text: str) -> set[str]:
"""Expand keywords with compound terms.
This method looks for adjacent keywords in the original text to capture
compound terms like '機械学習' (machine learning) or '自然言語処理' (natural language processing).
"""
results = set(keywords)
try:
# Parse again to find compounds
node = self.tagger.parseToNode(text)
compound = []
compound_readings = [] # For handling different forms of the same compound
while node:
features = node.feature.split(",")
if len(features) > 6:
base_form = features[6]
reading = features[7] if len(features) > 7 else None
else:
base_form = node.surface
reading = None
if base_form in keywords:
compound.append(base_form)
if reading:
compound_readings.append(reading)
else:
if len(compound) > 1:
# Add the compound term
compound_term = "".join(compound)
if len(compound_term) > 1:
results.add(compound_term)
# If readings are available, add normalized form
if compound_readings:
normalized_term = "".join(compound_readings)
if normalized_term != compound_term:
results.add(normalized_term)
compound = []
compound_readings = []
node = node.next
return results
except Exception as e:
# If compound expansion fails, return original keywords
return keywords

View File

@@ -0,0 +1,190 @@
STOPWORDS = {
# Japanese particles and basic stopwords
"",
"",
"",
"",
"",
"",
"",
"",
"から",
"より",
"まで",
"によって",
"あそこ",
"あっ",
"あの",
"あのかた",
"あの人",
"あり",
"あります",
"ある",
"あれ",
"",
"いう",
"います",
"いる",
"",
"うち",
"",
"",
"および",
"おり",
"おります",
"",
"かつて",
"",
"ここ",
"こちら",
"こと",
"この",
"これ",
"これら",
"",
"さらに",
"",
"しかし",
"する",
"",
"",
"せる",
"そこ",
"そして",
"その",
"その他",
"その後",
"それ",
"それぞれ",
"それで",
"",
"ただし",
"たち",
"ため",
"たり",
"",
"だっ",
"だれ",
"",
"",
"でき",
"できる",
"です",
"では",
"でも",
"という",
"といった",
"とき",
"ところ",
"として",
"とともに",
"とも",
"と共に",
"どこ",
"どの",
"",
"ない",
"なお",
"なかっ",
"ながら",
"なく",
"なっ",
"など",
"なに",
"なら",
"なり",
"なる",
"なん",
"において",
"における",
"について",
"にて",
"により",
"による",
"に対して",
"に対する",
"に関する",
"ので",
"のみ",
"",
"ほか",
"ほとんど",
"ほど",
"ます",
"また",
"または",
"",
"もの",
"ものの",
"",
"よう",
"",
"られ",
"られる",
"",
"れる",
"",
"",
"及び",
"",
"彼女",
"我々",
"特に",
"",
"私達",
"貴方",
"貴方方",
# Japanese auxiliary verbs
"でした",
"ました",
"である",
"だった",
# Japanese pronouns
# Japanese common words
"おる",
"いく",
"くる",
# Numbers
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"0",
# Punctuation
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# Common English stopwords (for mixed text)
"the",
"is",
"at",
"which",
"on",
"in",
"and",
"or",
"a",
"an",
}