BigData/Nifi

Apache Nifi flow.xml.gz 뜯어보기

맥모닝프로 2024. 1. 4. 18:41

Nifi를 운영하면서  flow.xml.gz으로 인한 재기동 이슈가 많이 발생하였다. 단일 노드에서 Nifi를 사용한다면 해당 이슈가 발생하지 않겠지만 클러스터 환경에서는 flow 변경 시 즉각 동기화가 이루어져야 하는데 여러 요인으로 인해 특정 노드가 동기화되지 못하는 이슈가 있을 수 있다. 문제 발생 시 주로 flow controller와 flow.xml.gz이 다르다는 이유로 재기동 이후에 강제로 셧다운 되는 경우가 많았다. 그래서 도대체 flow.xml.gz 안에 어떠한 내용들이 있길래 차이가 발생하는지 직접 확인해 보았다.

 

flow.xml.gz이란?

  • DFM(DataFlowManger)이 Nifi 사용자 인터페이스 컨버스에 모든 내용을 저장하기 위한 파일
  • 기본적으로 nifi/conf에 디렉터리에 저장
  • 사용자가 별도의 "저장" 할 필요 없이 캔버스에 적용된 모든 변경 사항을 자동으로 저장
  • flow.xml.gz이 업데이트되면 archive 디렉토리에 백업 복사본을 자동으로 생성
  • archive 된 파일을 통해서 flow 구성을 롤백 가능 → 이전 flow로 변경하고자 한다면은 이전 flow.xml.gz으로 변경 후 재기동

실제 flow.xml.gz 정보 확인

예시 Data Flow

 

테스트 환경에서 매우 간단한 flow를 생성하고 flow.xml.gz 내용을 확인하였다.

 

Processor

<processor>
    <id>d2b66543-018c-1000-0000-00004f2a1b2b</id>
    <name>GenerateFlowFile</name>
    <position x="584.0" y="85.0"/>
    <styles/>
    <comment/>
    <class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
    <bundle>
        <group>org.apache.nifi</group>
        <artifact>nifi-standard-nar</artifact>
        <version>1.15.2</version>
    </bundle>
    <maxConcurrentTasks>1</maxConcurrentTasks>
    <schedulingPeriod>0 sec</schedulingPeriod>
    <penalizationPeriod>30 sec</penalizationPeriod>
    <yieldPeriod>1 sec</yieldPeriod>
    <bulletinLevel>WARN</bulletinLevel>
    <lossTolerant>false</lossTolerant>
    <scheduledState>STOPPED</scheduledState>
    <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
    <executionNode>ALL</executionNode>
    <runDurationNanos>0</runDurationNanos>
    <property>
        <name>File Size</name>
        <value>0B</value>
    </property>
    <property>
        <name>Batch Size</name>
        <value>1</value>
    </property>
    <property>
        <name>Data Format</name>
        <value>Text</value>
    </property>
    <property>
        <name>Unique FlowFiles</name>
        <value>false</value>
    </property>
    <property>
        <name>generate-ff-custom-text</name>
    </property>
    <property>
        <name>character-set</name>
        <value>UTF-8</value>
    </property>
    <property>
        <name>mime-type</name>
    </property>
</processor>

먼저 flow.xml.gz에서 processor 내용을 확인하였다. 우리가 생성한 GenerateFlowFile 임을 알수 있고 프로세서의 위치 정보, 상태, 설정 내용이 담겨 있다.

 

Connection

<connection>
    <id>d2b81f94-018c-1000-0000-000021b08c80</id>
    <name/>
    <bendPoints/>
    <labelIndex>1</labelIndex>
    <zIndex>0</zIndex>
    <sourceId>d2b66543-018c-1000-0000-00004f2a1b2b</sourceId>
    <sourceGroupId>7c84501d-d10c-407c-b9f3-1d80e38fe36a</sourceGroupId>
    <sourceType>PROCESSOR</sourceType>
    <destinationId>d2b7e55c-018c-1000-0000-000012f51fb6</destinationId>
    <destinationGroupId>7c84501d-d10c-407c-b9f3-1d80e38fe36a</destinationGroupId>
    <destinationType>FUNNEL</destinationType>
    <relationship>success</relationship>
    <maxWorkQueueSize>10000</maxWorkQueueSize>
    <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
    <flowFileExpiration>0 sec</flowFileExpiration>
    <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy>
    <partitioningAttribute/>
    <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression>
</connection>

processor와 Funnel을 연결하는 connection 정보이다. 소스와 데스티네이션 정보가 있고 connection에서 개별적으로 설정 가능한 queue 설정, load balance 설정 정보를 포함하고 있다.

 

Funnel

<funnel>
    <id>d2b7e55c-018c-1000-0000-000012f51fb6</id>
    <position x="720.0" y="424.0"/>
</funnel>

Funnel은 간단하게 위치 정보만 존재하였다.

 

ReportingTask

<reportingTask>
    <id>3b80ba0f-a6c0-48db-b721-4dbc04cef28e</id>
    <name>AmbariReportingTask</name>
    <comment/>
    <class>org.apache.nifi.reporting.ambari.AmbariReportingTask</class>
    <bundle>
        <group>org.apache.nifi</group>
        <artifact>nifi-ambari-nar</artifact>
        <version>1.15.2</version>
    </bundle>
    <schedulingPeriod>1 mins</schedulingPeriod>
    <scheduledState>STOPPED</scheduledState>
    <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
    <property>
        <name>Metrics Collector URL</name>
        <value>${ambari.metrics.collector.url}</value>
    </property>
    <property>
        <name>Application ID</name>
        <value>${ambari.application.id}</value>
    </property>
    <property>
        <name>Hostname</name>
        <value>${hostname(true)}</value>
    </property>
    <property>
        <name>Process Group ID</name>
    </property>
</reportingTask>

Reporting Task 정보도 들어 있음을 확인하였다. 해당 정보는 Ambari와 Grafana에서 Nifi Metric을 모니터링하기 위한 설정이다. 

결론: 파헤치기 전에는 flow.xml.gz안에 flowfile 메타 정보도 포함이 되어 있을 것이라 생각했지만 flowfile에 대한 정보는 있지 않다. 즉, UI를 구성 및 동작에 필요한 설정 정보만을 포함하고 있다.

 

flow.xml.gz 동기화 시점

archive flow.xml.gz

위 파일은 coordinator 노드의 archive 폴더에 존재하는 flow.xml.gz이다. 파일이 생성된 시간을 확인하였을때 내가 flow에 변경 작업이 있을 때마다 flow.xml.gz이 새로 생성되었다. 

# Add Processor
2024-01-04 13:23:38,148 INFO [NiFi Web Server-32668] o.a.nifi.groups.StandardProcessGroup GenerateFlowFile[id=d2b66543-018c-1000-0000-00004f2a1b2b] added to StandardProcessGroup[identifier=7c84501d-d10c-407c-b9f3-1d80e38fe36a,name=NiFi Flow]
# Add Funnel
2024-01-04 13:25:16,414 INFO [NiFi Web Server-31034] o.a.nifi.groups.StandardProcessGroup StandardFunnel[id=d2b7e55c-018c-1000-0000-000012f51fb6] added to StandardProcessGroup[identifier=7c84501d-d10c-407c-b9f3-1d80e38fe36a,name=NiFi Flow]

위 로그는 coordinator가 아닌 다른 노드에서 발생한 로그이다. 해당 로그를 flow의 변화를 특정 주기 이후에 동기화하는 것이 아니라 변화가  발생한 직후 모든 노드의 flow.xml.gz을 동기화함을 알 수 있다.