package com.sohu.jafka.console;

import com.sohu.jafka.api.FetchRequest;
import com.sohu.jafka.consumer.SimpleConsumer;
import com.sohu.jafka.message.MessageAndOffset;
import com.sohu.jafka.utils.Closer;
import com.sohu.jafka.utils.Utils;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;

/* loaded from: classes2.dex */
public class SimipleConsoleConsumer {
    static void checkRequiredArgs(OptionParser optionParser, OptionSet optionSet, OptionSpec<?>... optionSpecArr) throws IOException {
        for (OptionSpec<?> optionSpec : optionSpecArr) {
            if (!optionSet.has(optionSpec)) {
                System.err.println("Missing required argument " + optionSpec);
                optionParser.printHelpOn(System.err);
                System.exit(1);
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        OptionParser optionParser = new OptionParser();
        OptionSpec ofType = optionParser.accepts("topic", "REQUIRED: The topic id to consumer on.").withRequiredArg().describedAs("topic").ofType(String.class);
        OptionSpec ofType2 = optionParser.accepts("server", "REQUIRED: The jafka server connection string.").withRequiredArg().describedAs("jafka://hostname:port").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo = optionParser.accepts("offset", "The offset to start consuming from.").withRequiredArg().describedAs("offset").ofType(Long.class).defaultsTo(0L, new Long[0]);
        OptionSet parse = optionParser.parse(strArr);
        checkRequiredArgs(optionParser, parse, ofType, ofType2);
        URI uri = new URI((String) parse.valueOf(ofType2));
        final String str = (String) parse.valueOf(ofType);
        final long longValue = ((Long) parse.valueOf(defaultsTo)).longValue();
        final SimpleConsumer simpleConsumer = new SimpleConsumer(uri.getHost(), uri.getPort(), 10000, 65536);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.sohu.jafka.console.SimipleConsoleConsumer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Closer.closeQuietly(SimpleConsumer.this);
            }
        });
        Thread thread = new Thread() { // from class: com.sohu.jafka.console.SimipleConsoleConsumer.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                long j = longValue;
                int i = 0;
                while (true) {
                    try {
                        Iterator<MessageAndOffset> it = simpleConsumer.fetch(new FetchRequest(str, 0, j, 1000000)).iterator();
                        boolean z = true;
                        while (it.hasNext()) {
                            MessageAndOffset next = it.next();
                            i++;
                            long j2 = next.offset;
                            System.out.println(String.format("[%d] %d: %s", Integer.valueOf(i), Long.valueOf(j2), Utils.toString(next.message.payload(), "UTF-8")));
                            j = j2;
                            z = false;
                        }
                        if (z) {
                            Thread.sleep(1000L);
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        };
        thread.start();
        thread.join();
    }
}
