Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private case class KafkaBatchPartitionReader(
}

override def nextWithTimeout(
startTime: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
startTimeMs: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
if (!iteratorForRealTimeMode.isDefined) {
logInfo(s"Getting a new kafka consuming iterator for ${offsetRange.topicPartition} " +
s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ public Optional<Long> recArrivalTime() {
* Alternative function to be called than next(), that proceed to the next record. The different
* from next() is that, if there is no more records, the call needs to keep waiting until
* the timeout.
* @param startTime the base time (milliseconds) the was used to calculate the timeout.
* @param startTimeMs the base time (milliseconds) the was used to calculate the timeout.
* Sources should use it as the reference time to start waiting for the next
* record instead of getting the latest time from LowLatencyClock.
* @param timeout if no result is available after this timeout (milliseconds), return
* @param timeoutMs if no result is available after this timeout (milliseconds), return
* @return {@link RecordStatus} describing whether a record is available and its arrival time
* @throws IOException
*/
RecordStatus nextWithTimeout(Long startTime, Long timeout) throws IOException;
RecordStatus nextWithTimeout(Long startTimeMs, Long timeoutMs) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,21 +278,20 @@ class LowLatencyMemoryStreamPartitionReader(
throw new IllegalStateException("Task context was not set!")
}
override def nextWithTimeout(
startTime: java.lang.Long, timeout: java.lang.Long): RecordStatus = {
startTimeMs: java.lang.Long, timeoutMs: java.lang.Long): RecordStatus = {
// SPARK-55699: Use the reference time passed in by the caller instead of getting the latest
// time from LowLatencyClock, to avoid inconsistent reading when LowLatencyClock is a
// manual clock.
val startReadTime = startTime
var elapsedTimeMs = 0L
current = getRecordWithTimestamp
while (current.isEmpty) {
val POLL_TIME = 10L
if (elapsedTimeMs >= timeout) {
if (elapsedTimeMs >= timeoutMs) {
return RecordStatus.newStatusWithoutArrivalTime(false)
}
Thread.sleep(POLL_TIME)
current = getRecordWithTimestamp
elapsedTimeMs = (clock.nanoTime() - startReadTime) / 1000 / 1000
elapsedTimeMs = clock.getTimeMillis() - startTimeMs
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR This is the main change. Basically, in the last PR, we were substract a millisecond off a nano second which is wrong.

I am not sure why no unit test is failing. Maybe this unveils that we have a limited test coverage.

}
currentOffset += 1
RecordStatus.newStatusWithArrivalTimeMs(current.get._2)
Expand Down