/*
 * Decompiled with CFR 0.152.
 */
package org.apache.calcite.adapter.kafka;

import java.time.Duration;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.calcite.adapter.kafka.KafkaRowConverter;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class KafkaMessageEnumerator<K, V>
implements Enumerator<Object[]> {
    final Consumer consumer;
    final KafkaRowConverter<K, V> rowConverter;
    private final AtomicBoolean cancelFlag;
    private final LinkedList<ConsumerRecord<K, V>> bufferedRecords = new LinkedList();
    private ConsumerRecord<K, V> curRecord;

    KafkaMessageEnumerator(Consumer consumer, KafkaRowConverter<K, V> rowConverter, AtomicBoolean cancelFlag) {
        this.consumer = consumer;
        this.rowConverter = rowConverter;
        this.cancelFlag = cancelFlag;
    }

    public Object[] current() {
        return this.rowConverter.toRow(this.curRecord);
    }

    public boolean moveNext() {
        if (this.cancelFlag.get()) {
            return false;
        }
        while (this.bufferedRecords.isEmpty()) {
            this.pullRecords();
        }
        this.curRecord = this.bufferedRecords.removeFirst();
        return true;
    }

    private void pullRecords() {
        ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
        for (ConsumerRecord record : records) {
            this.bufferedRecords.add(record);
        }
    }

    public void reset() {
        this.bufferedRecords.clear();
        this.pullRecords();
    }

    public void close() {
        this.consumer.close();
    }
}

