KEMBAR78
Flag and maybe delete messages after messages have been copied by filiphr · Pull Request #9546 · spring-projects/spring-integration · GitHub
Skip to content

Conversation

@filiphr
Copy link
Contributor

@filiphr filiphr commented Oct 11, 2024

The reason for this pull request is due to the fact that we have noticed that sometimes the IMAP connection breaks just after the message flags have been set, but the message has not been copied yet. This then leads to the message never being received by (as it has been flagged and the next search will not return it).

e.g.

2024-08-08T13:16:02.888-04:00 ERROR 5820 --- [able-mail-30971] c.f.p.e.i.e.mail.MailMessageHandler : Unexpected error occurred in when handling messages for channel processinMailChannel

org.springframework.messaging.MessagingException: failure occurred while polling for mail
at org.springframework.integration.mail.MailReceivingMessageSource.doReceive(MailReceivingMessageSource.java:74) ~[spring-integration-mail-6.1.8.jar:6.1.8]
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142) ~[spring-integration-core-6.1.8.jar:6.1.8]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) ~[spring-integration-core-6.1.8.jar:6.1.8]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:443) ~[spring-integration-core-6.1.8.jar:6.1.8]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412) ~[spring-integration-core-6.1.8.jar:6.1.8]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348) ~[spring-integration-core-6.1.8.jar:6.1.8]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-6.1.8.jar:6.1.8]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: jakarta.mail.MessagingException: IOException while copying message
at jakarta.mail.internet.MimeMessage.<init>(MimeMessage.java:254) ~[jakarta.mail-api-2.1.3.jar:na]
at org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage.<init>(AbstractMailReceiver.java:658) ~[spring-integration-mail-6.1.8.jar:6.1.8]
at org.springframework.integration.mail.AbstractMailReceiver.postProcessFilteredMessages(AbstractMailReceiver.java:513) ~[spring-integration-mail-6.1.8.jar:6.1.8]
at org.springframework.integration.mail.AbstractMailReceiver.searchAndFilterMessages(AbstractMailReceiver.java:427) ~[spring-integration-mail-6.1.8.jar:6.1.8]
at org.springframework.integration.mail.AbstractMailReceiver.receive(AbstractMailReceiver.java:380) ~[spring-integration-mail-6.1.8.jar:6.1.8]
at org.springframework.integration.mail.MailReceivingMessageSource.doReceive(MailReceivingMessageSource.java:62) ~[spring-integration-mail-6.1.8.jar:6.1.8]
... 9 common frames omitted
Caused by: java.io.IOException: jakarta.mail.FolderClosedException
at com.sun.mail.imap.IMAPInputStream.fill(IMAPInputStream.java:141) ~[angus-mail-1.1.0.jar:na]
at com.sun.mail.imap.IMAPInputStream.read(IMAPInputStream.java:245) ~[angus-mail-1.1.0.jar:na]
at com.sun.mail.imap.IMAPInputStream.read(IMAPInputStream.java:272) ~[angus-mail-1.1.0.jar:na]
at com.sun.mail.imap.IMAPMessage.writeTo(IMAPMessage.java:892) ~[angus-mail-1.1.0.jar:na]
at jakarta.mail.internet.MimeMessage.<init>(MimeMessage.java:246) ~[jakarta.mail-api-2.1.3.jar:na]
... 14 common frames omitted
Caused by: jakarta.mail.FolderClosedException: null
... 19 common frames omitted

I could reproduce a similar exception while debuging in the IntegrationMimeMessage constructor. If I put a break point there, wait a minute and then continue I would get this exception.

While debugging I could find some things such as

java.io.IOException: Connection dropped by server?
	at org.eclipse.angus.mail.iap.ResponseInputStream.readResponse(ResponseInputStream.java:96)
	at org.eclipse.angus.mail.iap.Response.<init>(Response.java:113)

I know that it says Spring Integration 6.1.8 here. However, I could reproduce the same with 6.3.5 as well.

I've added a test trying to mimic the behaviour (an IOException) when copying the message contents.

@filiphr filiphr force-pushed the flag-after-copying branch from 87b78c6 to 81ee4c2 Compare October 11, 2024 14:28
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, consider to run gradlew spring-integration-mail:check locally.
Otherwise we have those Checkstyle failures:

Error: eckstyle] [ERROR] /home/runner/work/spring-integration/spring-integration/spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java:516:17: '}' at column 3 should be alone on a line. [RightCurly]

> Task :spring-integration-mail:checkstyleMain

> Task :spring-integration-mail:checkstyleMain FAILED
> Task :spring-integration-stream:compileTestJava
Error: eckstyle] [ERROR] /home/runner/work/spring-integration/spring-integration/spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java:1053:17: Redundant 'public' modifier. [RedundantModifier]

Also, ad your name to all the affected classes.

We will consider to back-port it, but that is not going to happen for 6.1.x.
That one is out of Open Source support already.

setMessageFlagsAndMaybeDeleteMessages(originalMessages);
}
else {
setMessageFlagsAndMaybeDeleteMessages(filteredMessages);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May we consider a local method variable for Message[] instead of else branch with the same method call?

I mean something like:

Message[] messages = filteredMessages;
if (this.headerMapper == null && (this.autoCloseFolder || this.simpleContent)) {
    messages = new Message[filteredMessages.length];
    ...
}
setMessageFlagsAndMaybeDeleteMessages(messages);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wanted to do that initially as well. However, in the if we are creating a new Message of the IntegrationMimeMessage. This means that when we call setMessageFlagsAndMaybeDeleteMessages and we invoke Message#setFlags the actual implementation differs. If we look into the IMAPMessage#setFlags the implementation looks like

synchronized (getMessageCacheLock()) {
    try {
        IMAPProtocol p = getProtocol();
        checkExpunged(); // Insure that this message is not expunged
        p.storeFlags(getSequenceNumber(), flag, set);
    } catch (ConnectionException cex) {
        throw new FolderClosedException(folder, cex.getMessage());
    } catch (ProtocolException pex) {
        throw new MessagingException(pex.getMessage(), pex);
    }
}

whereas the MimeMessage#setFlags looks like

if (set)
    flags.add(flag);
else
    flags.remove(flag);

I am going to add a comment there to explain why it is done in the way I did it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure why does that matter since you call setMessageFlagsAndMaybeDeleteMessages in both cases? So, one local variable for both cases feels right.
Ok, I’ll look into that locally on merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan it matters a lot because what will be invoked in the end is not the same. The behaviour of the setFlags will change due to the fact that there will be different implementations.

Prior to this PR the setFlags was invoked on the original messages received from the folder. The goal of the PR is to still invoke the setFlags on the original messages. However, instead of doing it before copying it, to do it after it has been copied.

I have an idea for a test case. I'll add it to illustrate the problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still failing to understand why this code is not OK with you:

	private void postProcessFilteredMessages(Message[] filteredMessages) throws MessagingException {
		Message[] messagesToProcess = filteredMessages;

		// Copy messages to cause an eager fetch
		if (this.headerMapper == null && (this.autoCloseFolder || this.simpleContent)) {
			messagesToProcess = new Message[filteredMessages.length];
			for (int i = 0; i < filteredMessages.length; i++) {
				Message originalMessage = filteredMessages[i];
				messagesToProcess[i] = originalMessage;
				MimeMessage mimeMessage = new IntegrationMimeMessage((MimeMessage) originalMessage);
				filteredMessages[i] = mimeMessage;
			}
		}

		setMessageFlagsAndMaybeDeleteMessages(messagesToProcess);
	}

You see, we still fulfill that messagesToProcess with original messages.
And then call setMessageFlagsAndMaybeDeleteMessages() only against a single source of truth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @artembilan, I misunderstood you. I know what you mean now. The messages will be the original messages populated in the loop.

I adjusted the PR. Please correct me if I am wrong.

@filiphr
Copy link
Contributor Author

filiphr commented Oct 11, 2024

Thanks for the quick review @artembilan.

Please, consider to run gradlew spring-integration-mail:check locally.

That was my mistake. The checkstyle errors are already fixed.

Also, add your name to all the affected classes.

Done

We will consider to back-port it, but that is not going to happen for 6.1.x.
That one is out of Open Source support already.

We are already upgraded to Spring Boot 3.3 and using Spring Integration 6.3, so back-porting it to at least 6.3 would be more than appreciated for us.


@Override
public void writeTo(OutputStream os) throws IOException, MessagingException {
if (this.exceptionsBeforeWrite.decrementAndGet() >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like just AtomicBoolean will be enough without any extra ctor arg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I've adjusted it

setMessageFlagsAndMaybeDeleteMessages(originalMessages);
}
else {
setMessageFlagsAndMaybeDeleteMessages(filteredMessages);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still failing to understand why this code is not OK with you:

	private void postProcessFilteredMessages(Message[] filteredMessages) throws MessagingException {
		Message[] messagesToProcess = filteredMessages;

		// Copy messages to cause an eager fetch
		if (this.headerMapper == null && (this.autoCloseFolder || this.simpleContent)) {
			messagesToProcess = new Message[filteredMessages.length];
			for (int i = 0; i < filteredMessages.length; i++) {
				Message originalMessage = filteredMessages[i];
				messagesToProcess[i] = originalMessage;
				MimeMessage mimeMessage = new IntegrationMimeMessage((MimeMessage) originalMessage);
				filteredMessages[i] = mimeMessage;
			}
		}

		setMessageFlagsAndMaybeDeleteMessages(messagesToProcess);
	}

You see, we still fulfill that messagesToProcess with original messages.
And then call setMessageFlagsAndMaybeDeleteMessages() only against a single source of truth.

}

@Override
public synchronized void setFlags(Flags flag, boolean set) throws MessagingException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to override this if there is that Message.getFlags()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to show that we are calling the setFlags on the original message and not the copied one. The Flags can be shared. The MimeMessage has

flags = source.getFlags();
if (flags == null)    // make sure flags is always set
    flags = new Flags();

So if the source has Flags and we call the setFlags on the filteredMessages then the test will still pass. The goal of the change is to show that the setFlags is called on the original message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like you validate somehow in the test if message was original or IntegrationMimeMessage.
Therefore this extra precaution is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is

assertThat(msg2.getMyTestFlags().contains(Flag.SEEN)).isTrue();

In the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But looks like it is exactly the same as previous one:

assertThat(msg2.getFlags().contains(Flag.SEEN)).isTrue();

That's why I believe that is really enough.
Because you are not going to deal with mock messages in the real application.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to validate that the setFlags is called on the original message and not on the IntegrationMimeMessage. Currently, GreenMailUtil#newMimeMessage does not set the flags. However, if you want I can set it up in such case that if the setMessageFlagsAndMaybeDeleteMessages is called on the IntegrationMimeMessage the test would fail.

assertThat(msg2.getFlags().contains(Flag.SEEN)).isTrue();

The test above passes as the Flags are shared. But

assertThat(msg2.getMyTestFlags().contains(Flag.SEEN)).isTrue();

would fail as it has it's own flags. If you think it is too much, I can remove it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. You probably can spy() on the GreenMailUtil#newMimeMessage result to verfiy that exactly its setFlags() is called.
It is still look redundant to have our own property when it is there in the super class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point with the spy(). I'll add that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced that with a spy. Hope it is good now.

}

@Override
public synchronized void setFlags(Flags flag, boolean set) throws MessagingException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like you validate somehow in the test if message was original or IntegrationMimeMessage.
Therefore this extra precaution is redundant.

}

@Override
public synchronized void setFlags(Flags flag, boolean set) throws MessagingException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But looks like it is exactly the same as previous one:

assertThat(msg2.getFlags().contains(Flag.SEEN)).isTrue();

That's why I believe that is really enough.
Because you are not going to deal with mock messages in the real application.

@artembilan artembilan merged commit 5589daa into spring-projects:main Oct 11, 2024
3 checks passed
@artembilan
Copy link
Member

@filiphr ,

thank you for contribution; looking forward for more!

artembilan pushed a commit that referenced this pull request Oct 11, 2024
Fixes: #9546
Issue link: #9546

Sometimes the IMAP connection breaks just after the message flags have been set, but the message has not been copied yet.
This then leads to the message never being received by (as it has been flagged and the next search will not return it).

* Flag and maybe delete messages after messages have been copied

**Auto-cherry-pick to `6.2.x`**
# Conflicts:
#	spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java
artembilan pushed a commit that referenced this pull request Oct 11, 2024
Fixes: #9546
Issue link: #9546

Sometimes the IMAP connection breaks just after the message flags have been set, but the message has not been copied yet.
This then leads to the message never being received by (as it has been flagged and the next search will not return it).

* Flag and maybe delete messages after messages have been copied

# Conflicts:
#	spring-integration-mail/src/main/java/org/springframework/integration/mail/AbstractMailReceiver.java

# Conflicts:
#	spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java
@filiphr filiphr deleted the flag-after-copying branch October 11, 2024 21:31
@filiphr
Copy link
Contributor Author

filiphr commented Oct 11, 2024

Thanks a lot for the speedy review and integration @artembilan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants