Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/com/zenith/command/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

@Getter
public class CommandManager {
private static final int MAX_QUEUED_ASYNC_COMMANDS = 100;
private final List<Command> commandsList = Lists.newArrayList(
new ActionLimiterCommand(),
new ActiveHoursCommand(),
Expand Down Expand Up @@ -120,6 +121,7 @@ public class CommandManager {
new WhitelistCommand()
);
private final CommandDispatcher<CommandContext> dispatcher;
private final CommandQueue queuedCommandExecutor = new CommandQueue("ZenithProxy Queued Command Executor - #%d", MAX_QUEUED_ASYNC_COMMANDS);
private @NonNull CommandNode[] mcplCommandNodes = new CommandNode[0];
private AtomicBoolean mcplCommandNodesStale = new AtomicBoolean(true);

Expand Down Expand Up @@ -265,4 +267,8 @@ public CompletableFuture<Suggestions> suggestions(final String input, PlayerComm
final ParseResults<CommandContext> parse = this.dispatcher.parse(stringReader, ctx);
return this.dispatcher.getCompletionSuggestions(parse);
}

public CommandQueue.Submission submitQueuedCommand(final Runnable runnable) {
return queuedCommandExecutor.submit(runnable);
}
}
59 changes: 59 additions & 0 deletions src/main/java/com/zenith/command/CommandQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.zenith.command;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import net.kyori.adventure.text.logger.slf4j.ComponentLogger;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CommandQueue implements AutoCloseable {
private static final ComponentLogger LOG = ComponentLogger.logger("CommandQueue");
private final ThreadPoolExecutor executor;
private final int maxPendingCommands;

public CommandQueue(final String threadNameFormat, final int maxPendingCommands) {
if (maxPendingCommands <= 0) {
throw new IllegalArgumentException("maxPendingCommands must be greater than 0");
}
this.maxPendingCommands = maxPendingCommands;
this.executor = new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder()
.setNameFormat(threadNameFormat)
.setDaemon(true)
.setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception in queued command thread {}", thread, e))
.build());
}

public synchronized Submission submit(final Runnable runnable) {
final int commandsAhead = executor.getQueue().size() + executor.getActiveCount();
if (executor.getQueue().size() >= maxPendingCommands) {
return new Submission(false, commandsAhead);
}
try {
executor.execute(runnable);
return new Submission(true, commandsAhead);
} catch (final RejectedExecutionException e) {
return new Submission(false, commandsAhead);
}
}

public synchronized int clearPending() {
final int cleared = executor.getQueue().size();
executor.getQueue().clear();
return cleared;
}

@Override
public void close() {
executor.shutdownNow();
}

public record Submission(boolean accepted, int commandsAhead) { }
}
37 changes: 34 additions & 3 deletions src/main/java/com/zenith/discord/DiscordBot.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,23 +189,35 @@ private void onMessageReceived(MessageReceivedEvent event) {
}

private void executeDiscordCommand(final DiscordMainChannelCommandReceivedEvent event) {
var inputMessage = event.message().substring(CONFIG.discord.prefix.length()).trim();
var submission = COMMAND.submitQueuedCommand(() -> executeDiscordCommandNow(event, inputMessage));
if (!submission.accepted()) {
replyToDiscordCommand(event, "Discord command queue is full. Please try again in a moment.");
return;
}
if (submission.commandsAhead() > 0) {
DISCORD_LOG.info("Queued discord command: {} ({} command(s) ahead)", inputMessage, submission.commandsAhead());
replyToDiscordCommand(event, "Queued command. " + submission.commandsAhead() + " command(s) ahead of you.");
}
}

private void executeDiscordCommandNow(final DiscordMainChannelCommandReceivedEvent event, final String inputMessage) {
try {
var jdaEvent = event.event();
var member = event.member();
var inputMessage = event.message().substring(CONFIG.discord.prefix.length());
var memberName = member.getUser().getName();
var memberId = member.getId();
DISCORD_LOG.info("{} ({}) executed discord command: {}", memberName, memberId, inputMessage);
final CommandContext context = DiscordCommandContext.create(inputMessage, jdaEvent);
COMMAND.execute(context);
final MessageCreateData request = commandEmbedOutputToMessage(context);
if (request != null) {
mainChannel.sendMessage(request).queue();
sendMessageToMainChannel(request);
CommandOutputHelper.logEmbedOutputToTerminal(context.getEmbed());
}
if (!context.getMultiLineOutput().isEmpty()) {
for (final String line : context.getMultiLineOutput()) {
mainChannel.sendMessage(line).queue();
sendMessageToMainChannel(line);
}
CommandOutputHelper.logMultiLineOutputToTerminal(context.getMultiLineOutput());
}
Expand All @@ -214,6 +226,25 @@ private void executeDiscordCommand(final DiscordMainChannelCommandReceivedEvent
}
}

private void replyToDiscordCommand(final DiscordMainChannelCommandReceivedEvent event, final String message) {
event.event().getMessage().reply(message).queue(
s -> {},
e -> DISCORD_LOG.debug("Failed sending discord command queue reply", e)
);
}

private void sendMessageToMainChannel(final MessageCreateData message) {
var channel = this.mainChannel;
if (channel == null || !isRunning()) return;
channel.sendMessage(message).queue();
}

private void sendMessageToMainChannel(final String message) {
var channel = this.mainChannel;
if (channel == null || !isRunning()) return;
channel.sendMessage(message).queue();
}

private MessageCreateData commandEmbedOutputToMessage(final CommandContext context) {
var embed = context.getEmbed();
if (embed.title() == null) return null;
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/zenith/feature/tasks/CommandAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ public CommandAction(String command) {

@Override
public void execute() {
var submission = COMMAND.submitQueuedCommand(this::executeQueuedCommand);
if (!submission.accepted()) {
MODULE_LOG.error("Failed queueing scheduled command: {} ({} command(s) ahead)", command, submission.commandsAhead());
if (CONFIG.client.extra.tasks.logCommandActionOutput) {
CommandOutputHelper.logEmbedOutputToDiscord(Embed.builder()
.title("Tasks Error")
.addField("Error", "Command queue full")
.addField("Command", "`" + command + "`"));
}
return;
}
}

private void executeQueuedCommand() {
MODULE_LOG.info("Executing scheduled command: {}", command);
EVENT_BUS.postAsync(new TasksCommandExecutedEvent(command));
var ctx = CommandContext.create(command, SOURCE);
Expand Down
93 changes: 93 additions & 0 deletions src/test/java/com/zenith/command/CommandQueueTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.zenith.command;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.jupiter.api.Assertions.*;

public class CommandQueueTest {

@Test
public void executesCommandsSequentially() throws Exception {
try (var queue = new CommandQueue("command-queue-test-%d", 4)) {
var order = Collections.synchronizedList(new ArrayList<String>());
var firstStarted = new CountDownLatch(1);
var releaseFirst = new CountDownLatch(1);
var secondFinished = new CountDownLatch(1);
var failure = new AtomicReference<Throwable>();

var firstSubmission = queue.submit(() -> {
order.add("first-start");
firstStarted.countDown();
try {
if (!releaseFirst.await(2, TimeUnit.SECONDS)) {
failure.compareAndSet(null, new AssertionError("Timed out waiting to release first command"));
return;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
failure.compareAndSet(null, e);
return;
}
order.add("first-end");
});
assertTrue(firstSubmission.accepted());
assertEquals(0, firstSubmission.commandsAhead());
assertTrue(firstStarted.await(2, TimeUnit.SECONDS));

var secondSubmission = queue.submit(() -> {
order.add("second");
secondFinished.countDown();
});
assertTrue(secondSubmission.accepted());
assertEquals(1, secondSubmission.commandsAhead());
assertFalse(secondFinished.await(200, TimeUnit.MILLISECONDS));

releaseFirst.countDown();
assertTrue(secondFinished.await(2, TimeUnit.SECONDS));
assertNull(failure.get());
assertEquals(List.of("first-start", "first-end", "second"), order);
}
}

@Test
public void rejectsCommandsWhenPendingQueueIsFull() throws Exception {
try (var queue = new CommandQueue("command-queue-test-%d", 1)) {
var releaseFirst = new CountDownLatch(1);
var firstStarted = new CountDownLatch(1);
var failure = new AtomicReference<Throwable>();

var firstSubmission = queue.submit(() -> {
firstStarted.countDown();
try {
if (!releaseFirst.await(2, TimeUnit.SECONDS)) {
failure.compareAndSet(null, new AssertionError("Timed out waiting to release first command"));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
failure.compareAndSet(null, e);
}
});
assertTrue(firstSubmission.accepted());
assertTrue(firstStarted.await(2, TimeUnit.SECONDS));

var secondSubmission = queue.submit(() -> {});
assertTrue(secondSubmission.accepted());
assertEquals(1, secondSubmission.commandsAhead());

var thirdSubmission = queue.submit(() -> {});
assertFalse(thirdSubmission.accepted());
assertEquals(2, thirdSubmission.commandsAhead());

releaseFirst.countDown();
Thread.sleep(100);
assertNull(failure.get());
}
}
}