Живое знание как процессный движок для импорта данных Часть 2. How-to
2025-11-09 00:45
Часть 2. How-to: один проход от пробы до Postgres
Ниже — практический маршрут, который мы реально прогнали на демо. Он опирается на Онто как «мозг» процесса и на минимальный исполнительный контур (MCP → MinIO → Airflow → Postgres). Цель — предсказуемый импорт без «магии» нейросети и без лишнего трафика.
Архитектурная рамка
Онто — хранит знания: объекты DatasetSignature, DatasetClass, PipelineTemplate, RecognitionResult и правила сопоставления.
Airflow — один универсальный DAG csv_ingest_pg (profile → DDL → COPY → отчёт).
Postgres — целевая БД.
MCP — оркестратор шагов и API-слой
Роль MCP. Координирует четыре шага процесса между Онто (модель знаний), MinIO (сырьё), Airflow (исполнение) и Postgres (целевая БД). Принципы: storage-first, детерминизм вместо «магии», без проксирования тяжёлых данных через сервер.
MCP возвращает план для клиента: одну команду, которая локально считает «сигнатуру» файла (заголовок, хэши, базовые типы), и затем отправку этой сигнатуры обратно в MCP.
Смысл: ничего не гоняем по сети и не грузим нейросеть ради очевидного.
preflight_submit — анализ и решение в Онто
MCP сопоставляет сигнатуру с объектами в Онто: если профиль (DatasetClass) найден — фиксирует выбор; если нет — создаёт аккуратный черновик. На этом же шаге жёстко выбирается хранилище (StorageConfig) и рассчитывается s3Key для загрузки.
Смысл: маршрут определён до любых перемещений данных; решения — по алгоритмам, LLM лишь объясняет.
upload_url — загрузка в MinIO по presigned URL
По signatureId MCP выдаёт presigned PUT (single или multipart). Клиент заливает напрямую в MinIO, MCP байты не трогает.
Смысл: надёжная, дешёвая загрузка без посредников и лишнего трафика.
pipeline_import_pg — запуск импорта в Postgres
MCP создаёт/выбирает шаблон пайплайна (если нужно) и триггерит единый DAG csv_ingest_pg в Airflow с параметрами: источник (presigned GET или {endpoint,bucket,key}), sep/encoding, target {schema, table}. На выходе — runId, статус и артефакты (DDL/отчёт).
Смысл: один универсальный DAG, предсказуемый контракт, воспроизводимый результат.
preflight_submit фиксирует знание в Онто и сразу определяет StorageConfig + s3Key.
upload_url выдаёт подписи → файл попадает в MinIO
pipeline_import_pg запускает DAG → Postgres получает таблицу, Онто — артефакты и статус.
Почему это работает в корп-среде
Предсказуемо. Решения принимают правила (хэши, устойчивые совпадения), не «интуиция» модели.
Бережливо. Аналитика — в локальной пробе, сервер не возит гигабайты и не «жжёт» GPU.
Трассируемо. В Онто остаются сигнатуры, выбранные профили и шаблоны: легко объяснить «почему так».
Экономно. Один DAG и переиспользуемые классы/шаблоны вместо «зоопарка» MVP.
Что хранит Онто — детально: что это и зачем
DatasetSignature
Что это: «паспорт» конкретного файла — нормализованный заголовок, число колонок, хэши (обычный и отсортированный), разделитель, кодировка, имя/размер.
Зачем: служит основанием для сопоставления с профилями; фиксирует параметры, по которым будет выполняться загрузка; даёт воспроизводимость («вот именно этот файл мы импортировали вот так»).
DatasetClass
Что это: «профиль» типа датасета (не мета-шаблон, а объект): хэши и отсортированный список колонок, ключевые слова домена, флаги PII, статус draft/active.
Зачем: определяет маршрут импорта для всех файлов такого типа; позволяет автоматически выбирать шаблон пайплайна и не изобретать логику заново при каждом новом источнике.
PipelineTemplate
Что это: «маршрут» обработки для профиля: дефолты парсинга (sep/encoding/loadMode), целевой таргет (storage/schema/table/partitionBy), версия/статус.
Зачем: делает импорт параметризуемым и повторяемым: один универсальный DAG + набор шаблонов, вместо «зоопарка» одноразовых скриптов; снижает стоимость поддержки.
RecognitionResult
Что это: протокол решения сопоставления: чем именно совпало (matchedBy), с какой уверенностью (score), когда принято, краткие заметки.
Зачем: обеспечивает объяснимость и аудит — можно ответить «почему выбран именно этот профиль/маршрут» и воспроизвести решение при повторных запусках.
Выше приведены шаблоны для создания объектов в Онто.
Что такое шаблон в Онто
Шаблон — это не «одна конкретная запись», а тип объекта с фиксированным смыслом и набором полей. Он:
задаёт структуру (какие поля есть и как они называются);
делает данные однородными и сравнимыми между кейсами;
даёт точку входа для автоматизации (MCP знает, что создавать и чем заполнять);
обеспечивает проверяемость и воспроизводимость (одни и те же правила → одни и те же решения).
Почему это важно для MCP-driven процесса
MCP может детерминированно создавать/заполнять объекты по шаблонам (никакой импровизации в полях и названиях).
Вся логика «почему и что делать дальше» считывается из стандартных сущностей — решение предсказуемо и защищаемо перед архитекторами.
Шаблоны снижают стоимость поддержки: один раз договорились о форме — дальше масштабируем без переписывания «под каждый случай».
Проба: считаем сигнатуру локально (детально)
Наша цель — получить минимальный, но достаточный портрет файла без гонки трафика и без «магии». Всё делаем локально, быстрыми детерминированными правилами.
1) Что считаем (поля сигнатуры)
encoding — кодировка чтения.
sep — разделитель (,, ;, \t, реже |).
hasHeader — есть ли заголовок.
headers[] — нормализованные имена колонок.
numCols — количество колонок.
headerHash — sha256('h1;h2;…;hn').
headerSortedHash — sha256('sorted(h1…hn)').
headersSorted — строка с отсортированными именами через ;.
dtypeGuess (по колонкам) — bool|int|float|datetime|text.
(опц.) rowsScanned — сколько строк просмотрели (для статистики).
2) Нормализация заголовков (строго в таком порядке)
lower()
strip() (обрезать пробелы по краям)
заменить [' ', '-'] → _
удалить все, кроме [a-z0-9_а-яё]
схлопнуть повторные _ → один _
убрать _ в начале/конце
Пример: " Start Date-Time " → start_date_time
3) Кодировка и разделитель
Encoding: сначала пробуем utf-8 (срез первых 256–512 КБ). Если ошибка декодирования — пробуем cp1251. (Если хочется тонкости — локально подключаем детектор, но дефолт utf-8 чаще всего верен.)
Sep: считаем частоты , ; \t | в первых ~256 КБ, выбираем максимум (если \t в топе — ставим табуляцию). Для спорных случаев допускаем ручную подсказку --sep.
4) Объём пробы
Читаем только заголовок + первые N строк (обычно N=1000).
Память не распухает, чтение идёт потоково.
Если строки разной длины — считаем «битые» и игнорируем их при типизации.
5) Типизация колонок (простые эвристики)
Для каждого значения v (обрезанного):
bool: true|false|0|1 (без регистра)
int: ^-?\d+$
float: ^-?\d+([.,]\d+)?$ (запятую не превращаем в точку — только детект)
Супердлинные поля — режем предпросмотр, но не весь файл; считаем только заголовок и первые N строк.
Много «битых» строк — фиксируем процент и предупреждаем; маршрут импорта не стартуем, пока не будет явной валидации.
Нет заголовка — hasHeader=false: на анализе вернём «нужно сопоставить имена колонок» (mapping), класс не создаём автоматически.
Эта проба даёт стабильную сигнатуру, по которой анализ в Онто либо находит готовый класс и маршрут, либо создаёт аккуратный черновик. Дальше — storage-first, presigned PUT и один универсальный DAG.
Сопоставление в Онто: класс найден/создан
Алгоритм.
Поиск по headerHash → если пусто, по headerSortedHash.
Если всё ещё пусто — кандидаты по numCols и Jaccard по множеству заголовков.
Порог уверенности (score) зафиксирован; если не дотягивает — создаём черновой объект DatasetClass(draft).
Что записываем.
DatasetSignature (факт пробы).
DatasetClass (найденный или черновой).
PipelineTemplate (если нет — создаём draft с дефолтами).
RecognitionResult (matchedBy, score, timestamp).
Важно. До любой загрузки мы определяем хранилище. MCP выбирает объект StorageConfig (или default) и рассчитывает s3Key по шаблону (raw/{dataset}/{yyyy}/{mm}/source-{uuid}.csv). Это исключает гонки и перезаписи.
Вот как действуем, если в Онто подходящий объект [DATA] DatasetClass не найден.
Алгоритм
Нормализация входа
headers → lower/trim, пробелы и - → _, убрать все кроме [a-z0-9_а-яё], схлопнуть _.
(Связи можно добавить позже, если в этом спринте выключены.)
Идемпотентность (важно!)
Перед созданием повторно пробуем find по:
A) headerHash; B) (headerSortedHash, numCols, headersSorted) — защита от гонок/повторных запусков.
Если за время операции кто-то уже создал класс с теми же ключами — используем его, наш черновик не плодим.
Требование ревью
draft=true означает: класс не участвует в автопринятии решений без ручного подтверждения.
В UI/Онто — тег/фильтр “нужен просмотр”; после ревью draft=false и (опц.) priority↑.
Переиспользование
Следующий файл с тем же headerHash/headerSortedHash попадёт в этот класс без создания новых объектов.
При незначительных расхождениях (добавилась колонка) можно:
а) обновить ColumnSignature + перевести класс из draft,
б) завести новый класс и поднять/понизить priority.
Что получает инженер на выходе
Новый класс-черновик с полным описанием полей и предложенным шаблоном пайплайна.
Объективная метка «почему создали класс» и «насколько он близок» к найденным.
Возможность одним кликом/скриптом: подтвердить, поправить столбцы, назначить целевую таблицу — и запустить импорт уже как «активный» класс.
Это держит процесс бережливым и управляемым: сначала фиксация знания в Онто, потом — воспроизводимый маршрут импорта.
Загрузка в MinIO: только presigned PUT
Инструмент. upload_url по signatureId.
Режимы.
single — файл до 5 GiB; возвращаем один putUrl.
multipart — крупные файлы; клиент грузит частями параллельно, затем CompleteMultipartUpload.
Правило. MCP никогда не проксирует содержимое — только выдаёт подписи. Если объект ещё не загружен, инструмент может вернуть putUrl без запуска DAG (режим ensureUploaded=true), чтобы не смешивать этапы.
Импорт: один DAG для всех
DAG csv_ingest_pg принимает либо presigned_get_url, либо трио {s3_endpoint, bucket, key} и работает по шагам:
download_csv — скачивает во временный файл (presigned GET или генерация presign внутри DAG).
generate_ddl — формирует DDL под Postgres (idempotent).
create_table — применяет DDL.
load_to_pg — COPY … FROM STDIN (CSV, HEADER, DELIMITER).
write_report — JSON-отчёт (профиль + DDL) — можно положить в reports/.
Контракты. В params передаём минимум: sep, encoding, target: {schema, table}, плюс ссылку на источник. Один DAG — меньше поддержки, понятный аудит.
Где здесь «живое знание»
Онто хранит не слайды, а рабочие сущности: сигнатуры, классы, шаблоны, результаты распознавания. Это позволяет:
воспроизводить маршрут для любого будущего файла того же типа;
объяснять почему был выбран именно такой путь (hash, пороги, кандидаты);
целостно связать инженера, пробы, LLM-подсказки и фактический импорт.
Почему это дешевле MVP-гоночки
Переиспользование. Один DatasetClass и один PipelineTemplate обслуживают множество файлов/источников.
Один DAG. Универсальный, параметризуемый, с контрактом — вместо «зоопарка» одноразовых скриптов.
Бережливость. Аналитика — в локальной пробе; сеть и LLM не расходуются «на всякий случай».
Меньше рисков. Хранилище и путь рассчитываются до загрузки; политика no-overwrite.
Чек-лист запуска
Создать в Онто шаблоны и (при необходимости) объект StorageConfig (bucket=raw, pathPatternRaw=raw/{dataset}/{yyyy}/{mm}/source-{uuid}.csv).
Запустить preflight_plan → получить payload.json.
Вызвать preflight_submit → получить signatureId, выбранный класс/шаблон и рассчитанный s3Key.
upload_url → загрузить файл по putUrl.
pipeline_import_pg (или dag_trigger) → дождаться runId и success.
Проверить таблицу в Postgres и отчёт (при желании — сложить в reports/).
Типичные ошибки и как их не допускать
Загрузка без выбора StorageConfig. Решение: storage-first.
Дублирующиеся файлы. Включить политику no-overwrite, добавлять суффикс к s3Key при коллизии.
Несогласованные sep/encoding. Передавать их из сигнатуры; не менять наугад в DAG.
«LLM всё решит». В нашем процессе LLM — только пояснитель; маршрут задают правила.
В финальной части — выводы: почему такой подход проходит у архитекторов и эксплуатации, чем «разработка на данных в Онто» выигрывает у серии одноразовых MVP и как бережливость вычислений превращается в реальную экономию.