mirror of
https://github.com/langgenius/dify.git
synced 2024-11-16 19:59:50 +08:00
3241e4015b
Co-authored-by: jyong <718720800@qq.com>
135 lines
4.5 KiB
Python
135 lines
4.5 KiB
Python
import logging
|
|
import re
|
|
from typing import Optional, List, Tuple, cast
|
|
|
|
from langchain.document_loaders.base import BaseLoader
|
|
from langchain.document_loaders.helpers import detect_file_encodings
|
|
from langchain.schema import Document
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MarkdownLoader(BaseLoader):
|
|
"""Load md files.
|
|
|
|
|
|
Args:
|
|
file_path: Path to the file to load.
|
|
|
|
remove_hyperlinks: Whether to remove hyperlinks from the text.
|
|
|
|
remove_images: Whether to remove images from the text.
|
|
|
|
encoding: File encoding to use. If `None`, the file will be loaded
|
|
with the default system encoding.
|
|
|
|
autodetect_encoding: Whether to try to autodetect the file encoding
|
|
if the specified encoding fails.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
file_path: str,
|
|
remove_hyperlinks: bool = True,
|
|
remove_images: bool = True,
|
|
encoding: Optional[str] = None,
|
|
autodetect_encoding: bool = True,
|
|
):
|
|
"""Initialize with file path."""
|
|
self._file_path = file_path
|
|
self._remove_hyperlinks = remove_hyperlinks
|
|
self._remove_images = remove_images
|
|
self._encoding = encoding
|
|
self._autodetect_encoding = autodetect_encoding
|
|
|
|
def load(self) -> List[Document]:
|
|
tups = self.parse_tups(self._file_path)
|
|
documents = []
|
|
for header, value in tups:
|
|
value = value.strip()
|
|
if header is None:
|
|
documents.append(Document(page_content=value))
|
|
else:
|
|
documents.append(Document(page_content=f"\n\n{header}\n{value}"))
|
|
|
|
return documents
|
|
|
|
def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:
|
|
"""Convert a markdown file to a dictionary.
|
|
|
|
The keys are the headers and the values are the text under each header.
|
|
|
|
"""
|
|
markdown_tups: List[Tuple[Optional[str], str]] = []
|
|
lines = markdown_text.split("\n")
|
|
|
|
current_header = None
|
|
current_text = ""
|
|
|
|
for line in lines:
|
|
header_match = re.match(r"^#+\s", line)
|
|
if header_match:
|
|
if current_header is not None:
|
|
markdown_tups.append((current_header, current_text))
|
|
|
|
current_header = line
|
|
current_text = ""
|
|
else:
|
|
current_text += line + "\n"
|
|
markdown_tups.append((current_header, current_text))
|
|
|
|
if current_header is not None:
|
|
# pass linting, assert keys are defined
|
|
markdown_tups = [
|
|
(re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value))
|
|
for key, value in markdown_tups
|
|
]
|
|
else:
|
|
markdown_tups = [
|
|
(key, re.sub("\n", "", value)) for key, value in markdown_tups
|
|
]
|
|
|
|
return markdown_tups
|
|
|
|
def remove_images(self, content: str) -> str:
|
|
"""Get a dictionary of a markdown file from its path."""
|
|
pattern = r"!{1}\[\[(.*)\]\]"
|
|
content = re.sub(pattern, "", content)
|
|
return content
|
|
|
|
def remove_hyperlinks(self, content: str) -> str:
|
|
"""Get a dictionary of a markdown file from its path."""
|
|
pattern = r"\[(.*?)\]\((.*?)\)"
|
|
content = re.sub(pattern, r"\1", content)
|
|
return content
|
|
|
|
def parse_tups(self, filepath: str) -> List[Tuple[Optional[str], str]]:
|
|
"""Parse file into tuples."""
|
|
content = ""
|
|
try:
|
|
with open(filepath, "r", encoding=self._encoding) as f:
|
|
content = f.read()
|
|
except UnicodeDecodeError as e:
|
|
if self._autodetect_encoding:
|
|
detected_encodings = detect_file_encodings(filepath)
|
|
for encoding in detected_encodings:
|
|
logger.debug("Trying encoding: ", encoding.encoding)
|
|
try:
|
|
with open(filepath, encoding=encoding.encoding) as f:
|
|
content = f.read()
|
|
break
|
|
except UnicodeDecodeError:
|
|
continue
|
|
else:
|
|
raise RuntimeError(f"Error loading {filepath}") from e
|
|
except Exception as e:
|
|
raise RuntimeError(f"Error loading {filepath}") from e
|
|
|
|
if self._remove_hyperlinks:
|
|
content = self.remove_hyperlinks(content)
|
|
|
|
if self._remove_images:
|
|
content = self.remove_images(content)
|
|
|
|
return self.markdown_to_tups(content)
|