Наши публикации

Живое знание как процессный движок для импорта данных Часть 2. How-to

2025-11-09 00:45

Часть 2. How-to: один проход от пробы до Postgres

Ниже — практический маршрут, который мы реально прогнали на демо. Он опирается на Онто как «мозг» процесса и на минимальный исполнительный контур (MCP → MinIO → Airflow → Postgres). Цель — предсказуемый импорт без «магии» нейросети и без лишнего трафика.

Архитектурная рамка

  • Онто — хранит знания: объекты DatasetSignature, DatasetClass, PipelineTemplate, RecognitionResult и правила сопоставления.
  • MCP — оркестратор шагов и API-слой (инструменты: preflight_plan, preflight_submit, upload_url, pipeline_import_pg).
  • MinIO — S3-совместимое «сырое» хранилище (presigned PUT/GET).
  • Airflow — один универсальный DAG csv_ingest_pg (profile → DDL → COPY → отчёт).
  • Postgres — целевая БД.

MCP — оркестратор шагов и API-слой

Роль MCP. Координирует четыре шага процесса между Онто (модель знаний), MinIO (сырьё), Airflow (исполнение) и Postgres (целевая БД). Принципы: storage-first, детерминизм вместо «магии», без проксирования тяжёлых данных через сервер.
Репозиторий на гитхаб

Поток из четырёх инструментов

  1. preflight_plan — подготовка пробы (локально)
  2. MCP возвращает план для клиента: одну команду, которая локально считает «сигнатуру» файла (заголовок, хэши, базовые типы), и затем отправку этой сигнатуры обратно в MCP.
  3. Смысл: ничего не гоняем по сети и не грузим нейросеть ради очевидного.
  4. preflight_submit — анализ и решение в Онто
  5. MCP сопоставляет сигнатуру с объектами в Онто: если профиль (DatasetClass) найден — фиксирует выбор; если нет — создаёт аккуратный черновик. На этом же шаге жёстко выбирается хранилище (StorageConfig) и рассчитывается s3Key для загрузки.
  6. Смысл: маршрут определён до любых перемещений данных; решения — по алгоритмам, LLM лишь объясняет.
  7. upload_url — загрузка в MinIO по presigned URL
  8. По signatureId MCP выдаёт presigned PUT (single или multipart). Клиент заливает напрямую в MinIO, MCP байты не трогает.
  9. Смысл: надёжная, дешёвая загрузка без посредников и лишнего трафика.
  10. pipeline_import_pg — запуск импорта в Postgres
  11. MCP создаёт/выбирает шаблон пайплайна (если нужно) и триггерит единый DAG csv_ingest_pg в Airflow с параметрами: источник (presigned GET или {endpoint,bucket,key}), sep/encoding, target {schema, table}. На выходе — runId, статус и артефакты (DDL/отчёт).
  12. Смысл: один универсальный DAG, предсказуемый контракт, воспроизводимый результат.

Как это выглядит end-to-end (сверху вниз)

  • Клиент запускает preflight_plan → локальная сигнатура готова.
  • preflight_submit фиксирует знание в Онто и сразу определяет StorageConfig + s3Key.
  • upload_url выдаёт подписи → файл попадает в MinIO
  • pipeline_import_pg запускает DAG → Postgres получает таблицу, Онто — артефакты и статус.

Почему это работает в корп-среде

  • Предсказуемо. Решения принимают правила (хэши, устойчивые совпадения), не «интуиция» модели.
  • Бережливо. Аналитика — в локальной пробе, сервер не возит гигабайты и не «жжёт» GPU.
  • Трассируемо. В Онто остаются сигнатуры, выбранные профили и шаблоны: легко объяснить «почему так».
  • Экономно. Один DAG и переиспользуемые классы/шаблоны вместо «зоопарка» MVP.

Что хранит Онто — детально: что это и зачем

DatasetSignature
  • Что это: «паспорт» конкретного файла — нормализованный заголовок, число колонок, хэши (обычный и отсортированный), разделитель, кодировка, имя/размер.
  • Зачем: служит основанием для сопоставления с профилями; фиксирует параметры, по которым будет выполняться загрузка; даёт воспроизводимость («вот именно этот файл мы импортировали вот так»).
DatasetClass
  • Что это: «профиль» типа датасета (не мета-шаблон, а объект): хэши и отсортированный список колонок, ключевые слова домена, флаги PII, статус draft/active.
  • Зачем: определяет маршрут импорта для всех файлов такого типа; позволяет автоматически выбирать шаблон пайплайна и не изобретать логику заново при каждом новом источнике.
PipelineTemplate
  • Что это: «маршрут» обработки для профиля: дефолты парсинга (sep/encoding/loadMode), целевой таргет (storage/schema/table/partitionBy), версия/статус.
  • Зачем: делает импорт параметризуемым и повторяемым: один универсальный DAG + набор шаблонов, вместо «зоопарка» одноразовых скриптов; снижает стоимость поддержки.
RecognitionResult
  • Что это: протокол решения сопоставления: чем именно совпало (matchedBy), с какой уверенностью (score), когда принято, краткие заметки.
  • Зачем: обеспечивает объяснимость и аудит — можно ответить «почему выбран именно этот профиль/маршрут» и воспроизвести решение при повторных запусках.
Выше приведены шаблоны для создания объектов в Онто.

Что такое шаблон в Онто

Шаблон — это не «одна конкретная запись», а тип объекта с фиксированным смыслом и набором полей. Он:
  • задаёт структуру (какие поля есть и как они называются);
  • делает данные однородными и сравнимыми между кейсами;
  • даёт точку входа для автоматизации (MCP знает, что создавать и чем заполнять);
  • обеспечивает проверяемость и воспроизводимость (одни и те же правила → одни и те же решения).

Почему это важно для MCP-driven процесса

  • MCP может детерминированно создавать/заполнять объекты по шаблонам (никакой импровизации в полях и названиях).
  • Вся логика «почему и что делать дальше» считывается из стандартных сущностей — решение предсказуемо и защищаемо перед архитекторами.
  • Шаблоны снижают стоимость поддержки: один раз договорились о форме — дальше масштабируем без переписывания «под каждый случай».

Проба: считаем сигнатуру локально (детально)

Наша цель — получить минимальный, но достаточный портрет файла без гонки трафика и без «магии». Всё делаем локально, быстрыми детерминированными правилами.

1) Что считаем (поля сигнатуры)

  • encoding — кодировка чтения.
  • sep — разделитель (,, ;, \t, реже |).
  • hasHeader — есть ли заголовок.
  • headers[] — нормализованные имена колонок.
  • numCols — количество колонок.
  • headerHash — sha256('h1;h2;…;hn').
  • headerSortedHash — sha256('sorted(h1…hn)').
  • headersSorted — строка с отсортированными именами через ;.
  • dtypeGuess (по колонкам) — bool|int|float|datetime|text.
  • piiFlags — phone|fio|inn|birthday|uuid (true/false).
  • (опц.) rowsScanned — сколько строк просмотрели (для статистики).

2) Нормализация заголовков (строго в таком порядке)

  1. lower()
  2. strip() (обрезать пробелы по краям)
  3. заменить [' ', '-'] → _
  4. удалить все, кроме [a-z0-9_а-яё]
  5. схлопнуть повторные _ → один _
  6. убрать _ в начале/конце
Пример: " Start Date-Time " → start_date_time

3) Кодировка и разделитель

  • Encoding: сначала пробуем utf-8 (срез первых 256–512 КБ). Если ошибка декодирования — пробуем cp1251. (Если хочется тонкости — локально подключаем детектор, но дефолт utf-8 чаще всего верен.)
  • Sep: считаем частоты , ; \t | в первых ~256 КБ, выбираем максимум (если \t в топе — ставим табуляцию). Для спорных случаев допускаем ручную подсказку --sep.

4) Объём пробы

  • Читаем только заголовок + первые N строк (обычно N=1000).
  • Память не распухает, чтение идёт потоково.
  • Если строки разной длины — считаем «битые» и игнорируем их при типизации.

5) Типизация колонок (простые эвристики)

Для каждого значения v (обрезанного):
  • bool: true|false|0|1 (без регистра)
  • int: ^-?\d+$
  • float: ^-?\d+([.,]\d+)?$ (запятую не превращаем в точку — только детект)
  • datetime: ISO-подобные YYYY-MM-DD, YYYY-MM-DD[ T]HH:MM(:SS)?, YYYY-MM-DDTHH:MM:SS±ZZ:ZZ
  • иначе text
Выбор типа — мода по сэмплу (чаще всего встречающийся). При близких долях — text.

6) PII-эвристики (по сэмплу)

  • phone: ^(7|8)?\d{10}$
  • inn: ^\d{10}$
  • uuid: [0-9a-f]{8}-[0-9a-f]{4}-
  • birthday: ^\d{4}-\d{2}-\d{2}$ или ^\d{2}\.\d{2}\.\d{4}$
  • fio (капсом, 2–3 слова кириллицей): ^[А-ЯЁ]+(?: [А-ЯЁ]+){1,2}$
Флаги поднимаем, если найдено хотя бы одно соответствие в колонке/сэмпле (или несколько — по желанию).

7) Контроль качества пробы (самопроверки)

  • numCols заголовка и строк совпадают ≥ 95% в сэмпле — иначе предупреждение «пляшущие колонки».
  • Дубли заголовков → автоматически переписываем name__2, name__3 и фиксируем в заметках.
  • Слишком много пустых — помечаем колонку как nullableHint=true (если ведём такую метку).

8) Формат выходного JSON (payload сигнатуры)

{
"fileName": "tickets.csv",
"fileSize": 348243289,
"signature": {
"encoding": "utf-8",
"sep": ";",
"hasHeader": true,
"numCols": 27,
"headers": ["created","order_status","ticket_status","..."],
"headerHash": "sha256:…",
"headerSortedHash": "sha256:…",
"headersSorted": "birthday_date;client_name;…;visitor_category",
"stats": {
"rowsScanned": 1000,
"dtypeGuess": { "created":"datetime","ticket_price":"float","is_active":"bool","..." },
"piiFlags": { "phone": true, "fio": true, "inn": true, "birthday": true, "uuid": true }
}
}
}

9) CLI-процедура (быстрый сценарий)

  1. preflight_plan возвращает одну команду shell → запускаем → получаем payload.json.
  2. Сохраняем рядом с файлом (или в temp).
  3. Передаём в preflight_submit без изменений.

10) Граница ответственности (важно)

  • На пробе мы не двигаем байты в хранилище и не включаем LLM.
  • Все решения — детерминированные, воспроизводимые и дешёвые.
  • Если sep/encoding спорные — на этапе анализа возвращаем рекомендации и, при необходимости, запрашиваем ручную подсказку.

11) Краевые случаи и что делать

  • BOM/странный нулевой байт — отрезаем BOM, чистим управляющие.
  • Супердлинные поля — режем предпросмотр, но не весь файл; считаем только заголовок и первые N строк.
  • Много «битых» строк — фиксируем процент и предупреждаем; маршрут импорта не стартуем, пока не будет явной валидации.
  • Нет заголовка — hasHeader=false: на анализе вернём «нужно сопоставить имена колонок» (mapping), класс не создаём автоматически.
Эта проба даёт стабильную сигнатуру, по которой анализ в Онто либо находит готовый класс и маршрут, либо создаёт аккуратный черновик. Дальше — storage-first, presigned PUT и один универсальный DAG.

Сопоставление в Онто: класс найден/создан

Алгоритм.
  1. Поиск по headerHash → если пусто, по headerSortedHash.
  2. Если всё ещё пусто — кандидаты по numCols и Jaccard по множеству заголовков.
  3. Порог уверенности (score) зафиксирован; если не дотягивает — создаём черновой объект DatasetClass(draft).
Что записываем.
  • DatasetSignature (факт пробы).
  • DatasetClass (найденный или черновой).
  • PipelineTemplate (если нет — создаём draft с дефолтами).
  • RecognitionResult (matchedBy, score, timestamp).
Важно. До любой загрузки мы определяем хранилище. MCP выбирает объект StorageConfig (или default) и рассчитывает s3Key по шаблону (raw/{dataset}/{yyyy}/{mm}/source-{uuid}.csv). Это исключает гонки и перезаписи.
Вот как действуем, если в Онто подходящий объект [DATA] DatasetClass не найден.

Алгоритм

  1. Нормализация входа
  • headers → lower/trim, пробелы и - → _, убрать все кроме [a-z0-9_а-яё], схлопнуть _.
  • Считаем headerHash, headerSortedHash, headersSorted=';'.join(sorted(headers)), numCols.
  1. Последняя попытка “похожести”
  • Берём кандидатов по numCols.
  • Для каждого считаем Jaccard по множествам имён колонок.
  • Если score = 0.4*Jaccard ≥ 0.7 — считаем найденным (иначе — создаём новый).
  1. Формирование черновика класса
  • Создаём DatasetClass(draft=true) с полями:
  • headerHash, headerSortedHash, headersSorted, numCols.
  • Генерируем keywords из заголовков (токены: ticket,event,museum,…) и сохраняем строкой.
  • Считаем профиль ПДн (PII): piiPhone, piiFio, piiInn, piiBirthday по простым регэкспам/эвристикам и записываем флаги.
  • priority=0, comment='created from signature <signatureId>'.
  1. Колонки класса
  • На каждую колонку создаём ColumnSignature:
  • name (нормализованное), originalName (если хочется), position (0..n-1),
  • dtypeGuess (эвристика: bool/int/float/datetime/text), examples (до 3 значений).
  • Это фиксирует «какой набор мы увидели» и пригодится для последующей точной подстройки.
  1. Шаблон пайплайна (draft)
  • Создаём PipelineTemplate(draft=true) с дефолтами:
  • defaults = {"sep": "<из сигнатуры>", "encoding": "<…>", "loadMode": "append", "createTable": true}
  • target = {"storage":"postgres","schema":"public","table":"<предложить по домену, напр. tickets_fact>"}
  • (Если есть активный StorageConfig) рассчитываем s3Key по pathPatternRaw и сохраняем его в сигнатуру.
  1. Фиксация факта распознавания
  • Создаём DatasetSignature (если ещё не создана) и RecognitionResult с:
  • matchedBy="created_draft_class", score=<ниже порога>, timestamp=now.
  • (Связи можно добавить позже, если в этом спринте выключены.)
  1. Идемпотентность (важно!)
  • Перед созданием повторно пробуем find по:
  • A) headerHash; B) (headerSortedHash, numCols, headersSorted) — защита от гонок/повторных запусков.
  • Если за время операции кто-то уже создал класс с теми же ключами — используем его, наш черновик не плодим.
  1. Требование ревью
  • draft=true означает: класс не участвует в автопринятии решений без ручного подтверждения.
  • В UI/Онто — тег/фильтр “нужен просмотр”; после ревью draft=false и (опц.) priority↑.
  1. Переиспользование
  • Следующий файл с тем же headerHash/headerSortedHash попадёт в этот класс без создания новых объектов.
  • При незначительных расхождениях (добавилась колонка) можно:
  • а) обновить ColumnSignature + перевести класс из draft,
  • б) завести новый класс и поднять/понизить priority.

Что получает инженер на выходе

  • Новый класс-черновик с полным описанием полей и предложенным шаблоном пайплайна.
  • Объективная метка «почему создали класс» и «насколько он близок» к найденным.
  • Возможность одним кликом/скриптом: подтвердить, поправить столбцы, назначить целевую таблицу — и запустить импорт уже как «активный» класс.
Это держит процесс бережливым и управляемым: сначала фиксация знания в Онто, потом — воспроизводимый маршрут импорта.

Загрузка в MinIO: только presigned PUT

Инструмент. upload_url по signatureId.
Режимы.
  • single — файл до 5 GiB; возвращаем один putUrl.
  • multipart — крупные файлы; клиент грузит частями параллельно, затем CompleteMultipartUpload.
Правило. MCP никогда не проксирует содержимое — только выдаёт подписи. Если объект ещё не загружен, инструмент может вернуть putUrl без запуска DAG (режим ensureUploaded=true), чтобы не смешивать этапы.

Импорт: один DAG для всех

DAG csv_ingest_pg принимает либо presigned_get_url, либо трио {s3_endpoint, bucket, key} и работает по шагам:
  1. download_csv — скачивает во временный файл (presigned GET или генерация presign внутри DAG).
  2. sample_profile — быстрая проверка: строки, заголовок, типы-эвристики.
  3. generate_ddl — формирует DDL под Postgres (idempotent).
  4. create_table — применяет DDL.
  5. load_to_pg — COPY … FROM STDIN (CSV, HEADER, DELIMITER).
  6. write_report — JSON-отчёт (профиль + DDL) — можно положить в reports/.
Контракты. В params передаём минимум: sep, encoding, target: {schema, table}, плюс ссылку на источник. Один DAG — меньше поддержки, понятный аудит.

Где здесь «живое знание»

Онто хранит не слайды, а рабочие сущности: сигнатуры, классы, шаблоны, результаты распознавания. Это позволяет:
  • воспроизводить маршрут для любого будущего файла того же типа;
  • объяснять почему был выбран именно такой путь (hash, пороги, кандидаты);
  • целостно связать инженера, пробы, LLM-подсказки и фактический импорт.

Почему это дешевле MVP-гоночки

  • Переиспользование. Один DatasetClass и один PipelineTemplate обслуживают множество файлов/источников.
  • Один DAG. Универсальный, параметризуемый, с контрактом — вместо «зоопарка» одноразовых скриптов.
  • Бережливость. Аналитика — в локальной пробе; сеть и LLM не расходуются «на всякий случай».
  • Меньше рисков. Хранилище и путь рассчитываются до загрузки; политика no-overwrite.

Чек-лист запуска

  1. Создать в Онто шаблоны и (при необходимости) объект StorageConfig (bucket=raw, pathPatternRaw=raw/{dataset}/{yyyy}/{mm}/source-{uuid}.csv).
  2. Запустить preflight_plan → получить payload.json.
  3. Вызвать preflight_submit → получить signatureId, выбранный класс/шаблон и рассчитанный s3Key.
  4. upload_url → загрузить файл по putUrl.
  5. pipeline_import_pg (или dag_trigger) → дождаться runId и success.
  6. Проверить таблицу в Postgres и отчёт (при желании — сложить в reports/).

Типичные ошибки и как их не допускать

  • Загрузка без выбора StorageConfig. Решение: storage-first.
  • Дублирующиеся файлы. Включить политику no-overwrite, добавлять суффикс к s3Key при коллизии.
  • Несогласованные sep/encoding. Передавать их из сигнатуры; не менять наугад в DAG.
  • «LLM всё решит». В нашем процессе LLM — только пояснитель; маршрут задают правила.
В финальной частивыводы: почему такой подход проходит у архитекторов и эксплуатации, чем «разработка на данных в Онто» выигрывает у серии одноразовых MVP и как бережливость вычислений превращается в реальную экономию.