/*
 * Decompiled with CFR 0.152.
 */
package org.apache.calcite.adapter.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.stream.IntStream;
import org.apache.calcite.adapter.elasticsearch.ElasticsearchSchema;
import org.apache.calcite.adapter.elasticsearch.EmbeddedElasticsearchPolicy;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.test.CalciteAssert;
import org.apache.http.Header;
import org.elasticsearch.client.Response;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class ScrollingTest {
    @ClassRule
    public static final EmbeddedElasticsearchPolicy NODE = EmbeddedElasticsearchPolicy.create();
    private static final String NAME = "scroll";
    private static final int SIZE = 10;

    @BeforeClass
    public static void setupInstance() throws Exception {
        NODE.createIndex(NAME, Collections.singletonMap("value", "long"));
        ArrayList<ObjectNode> docs = new ArrayList<ObjectNode>();
        for (int i = 0; i < 10; ++i) {
            String json = String.format(Locale.ROOT, "{\"value\": %d}", i);
            docs.add((ObjectNode)NODE.mapper().readTree(json));
        }
        NODE.insertBulk(NAME, docs);
    }

    private CalciteAssert.ConnectionFactory newConnectionFactory(final int fetchSize) {
        return new CalciteAssert.ConnectionFactory(){

            public Connection createConnection() throws SQLException {
                Connection connection = DriverManager.getConnection("jdbc:calcite:");
                SchemaPlus root = connection.unwrap(CalciteConnection.class).getRootSchema();
                ElasticsearchSchema schema = new ElasticsearchSchema(NODE.restClient(), NODE.mapper(), ScrollingTest.NAME, null, fetchSize);
                root.add("elastic", (Schema)schema);
                return connection;
            }
        };
    }

    @Test
    public void scrolling() throws Exception {
        String[] expected = (String[])IntStream.range(0, 10).mapToObj(i -> "V=" + i).toArray(String[]::new);
        String query = String.format(Locale.ROOT, "select _MAP['value'] as v from \"elastic\".\"%s\"", NAME);
        for (int fetchSize : Arrays.asList(1, 2, 3, 5, 9, 10, 11, 20)) {
            CalciteAssert.that().with(this.newConnectionFactory(fetchSize)).query(query).returnsUnordered(expected);
            this.assertNoActiveScrolls();
        }
    }

    private void assertNoActiveScrolls() throws IOException {
        Response response = NODE.restClient().performRequest("GET", "/_nodes/stats/indices/search", new Header[0]);
        try (InputStream is = response.getEntity().getContent();){
            ObjectNode node = (ObjectNode)NODE.mapper().readValue(is, ObjectNode.class);
            String path = "/indices/search/scroll_current";
            JsonNode scrollCurrent = ((JsonNode)node.with("nodes").elements().next()).at("/indices/search/scroll_current");
            if (scrollCurrent.isMissingNode()) {
                throw new IllegalStateException("Couldn't find node at /indices/search/scroll_current");
            }
            if (scrollCurrent.asInt() != 0) {
                String message = String.format(Locale.ROOT, "Expected no active scrolls but got %d. Current index stats %s", scrollCurrent.asInt(), node);
                throw new AssertionError((Object)message);
            }
        }
    }
}

