织机项目
在观看了 2020 年末在 YouTube.com 上与 Oracle Project Loom 负责人 Ron Pressler 的一些视频后,解决方案非常简单,新的虚拟线程(纤程)功能将在 Java 的未来版本中推出:
- 调用一个新的
Executors 方法来创建一个使用虚拟线程(纤程)而不是平台/内核线程的executor service。
- 将所有传入的报告处理任务提交给该执行器服务。
- 在每个任务中,尝试获取一个semaphore,为您的 1000 台设备中的每台设备提供一个信号量。
该信号量将是一次仅处理每个设备一个输入的方式,以并行化每个源设备。如果代表特定设备的信号量不可用,只需阻塞 - 让您的报告处理线程等待,直到信号量可用。
Project Loom 维护着许多轻量级虚拟线程(纤程),甚至数百万个,它们在几个重量级平台/内核线程上运行。这使得阻塞线程变得便宜。
Early builds of a JDK binary with Project Loom 内置 macOS/Linux/Windows 现已推出。
警告:我不是并发专家,也不是 Project Loom。但您的特定用例似乎与 Ron Pressler 在他的视频中提出的一些具体建议相匹配。
示例代码
这里是一些我参考过的示例代码。我不确定这是否是一个很好的例子。
我使用了 Java 16 的早期访问版本,专门使用 Project Loom 技术构建:Build 16-loom+9-316 (2020/11/30) for macOS Intel。
package work.basil.example;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
/**
* An example of using Project Loom virtual threads to more simply process incoming data on background threads.
* <p>
* This code was built as a possible solution to this Question at StackOverflow.com: https://stackoverflow.com/q/65327325/642706
* <p>
* Posted in my Answer at StackOverflow.com: https://stackoverflow.com/a/65328799/642706
* <p>
* ©2020 Basil Bourque. 2020-12.
* <p>
* This work by Basil Bourque is licensed under CC BY 4.0. To view a copy of this license, visit https://creativecommons.org/licenses/by/4.0
* <p>
* Caveats:
* - Project Loom is still in early-release, available only as a special build of OpenJDK for Java 16.
* - I am *not* an expert on concurrency in general, nor Project Loom in particular. This code is merely me guessing and experimenting.
*/
public class App
{
// FYI, Project Loom links:
// https://wiki.openjdk.java.net/display/loom/Main
// http://jdk.java.net/loom/ (special early-access builds of Java 16 with Project Loom built-in)
// https://download.java.net/java/early_access/loom/docs/api/ (Javadoc)
// https://www.youtube.com/watch?v=23HjZBOIshY (Ron Pressler talk, 2020-07)
public static void main ( String[] args )
{
System.out.println( "java.version: " + System.getProperty( "java.version" ) );
App app = new App();
app.checkForProjectLoom();
app.demo();
}
public static boolean projectLoomIsPresent ( )
{
try
{
Thread.class.getDeclaredMethod( "startVirtualThread" , Runnable.class );
return true;
}
catch ( NoSuchMethodException e )
{
return false;
}
}
private void checkForProjectLoom ( )
{
if ( App.projectLoomIsPresent() )
{
System.out.println( "INFO - Running on a JVM with Project Loom technology. " + Instant.now() );
} else
{
throw new IllegalStateException( "Project Loom technology not present in this Java implementation. " + Instant.now() );
}
}
record ReportProcessorRunnable(Semaphore semaphore , Integer deviceIdentifier , boolean printToConsole , Queue < String > fauxDatabase) implements Runnable
{
@Override
public void run ( )
{
// Our goal is to serialize the report-processing per device.
// Each device can have only one report being processed at a time.
// In Project Loom this can be accomplished simply by spawning virtual threads for all such
// reports but process them serially by synchronizing on a binary (single-permit) semaphore.
// Each thread working on a report submitted for that device waits on semaphore assigned to that device.
// Blocking to wait for the semaphore is cheap in Project Loom using virtual threads. The underlying
// platform/kernel thread carrying this virtual thread will be assigned other work while this
// virtual thread is parked.
try
{
semaphore.acquire(); // Blocks until the semaphore for this particular device becomes available. Blocking is cheap on a virtual thread.
// Simulate more lengthy work being done by sleeping the virtual thread handling this task via the executor service.
try {Thread.sleep( Duration.ofMillis( 100 ) );} catch ( InterruptedException e ) {e.printStackTrace();}
String fauxData = "Insert into database table for device ID # " + this.deviceIdentifier + " at " + Instant.now();
fauxDatabase.add( fauxData );
if ( Objects.nonNull( this.printToConsole ) && this.printToConsole ) { System.out.println( fauxData ); }
semaphore.release(); // For fun, comment-out this line to see the effect of the per-device semaphore at runtime.
}
catch ( InterruptedException e )
{
e.printStackTrace();
}
}
}
record IncomingReportsSimulatorRunnable(Map < Integer, Semaphore > deviceToSemaphoreMap ,
ExecutorService reportProcessingExecutorService ,
int countOfReportsToGeneratePerBatch ,
boolean printToConsole ,
Queue < String > fauxDatabase)
implements Runnable
{
@Override
public void run ( )
{
if ( printToConsole ) System.out.println( "INFO - Generating " + countOfReportsToGeneratePerBatch + " reports at " + Instant.now() );
for ( int i = 0 ; i < countOfReportsToGeneratePerBatch ; i++ )
{
// Make a new Runnable task containing report data to be processed, and submit this task to the executor service using virtual threads.
// To simulate a device sending in a report, we randomly pick one of the devices to pretend it is our source of report data.
final List < Integer > deviceIdentifiers = List.copyOf( deviceToSemaphoreMap.keySet() );
int randomIndexNumber = ThreadLocalRandom.current().nextInt( 0 , deviceIdentifiers.size() );
Integer deviceIdentifier = deviceIdentifiers.get( randomIndexNumber );
Semaphore semaphore = deviceToSemaphoreMap.get( deviceIdentifier );
Runnable processReport = new ReportProcessorRunnable( semaphore , deviceIdentifier , printToConsole , fauxDatabase );
reportProcessingExecutorService.submit( processReport );
}
}
}
private void demo ( )
{
// Configure experiment.
Duration durationOfExperiment = Duration.ofSeconds( 20 );
int countOfReportsToGeneratePerBatch = 7; // Would be 40 per the Stack Overflow Question.
boolean printToConsole = true;
// To use as a concurrent list, I found this suggestion to use `ConcurrentLinkedQueue`: https://stackoverflow.com/a/25630263/642706
Queue < String > fauxDatabase = new ConcurrentLinkedQueue < String >();
// Represent each of the thousand devices that are sending us report data to be processed.
// We map each device to a Java `Semaphore` object, to serialize the processing of multiple reports per device.
final int firstDeviceNumber = 1_000;
final int countDevices = 10; // Would be 1_000 per the Stack Overflow question.
final Map < Integer, Semaphore > deviceToSemaphoreMap = new TreeMap <>();
for ( int i = 0 ; i < countDevices ; i++ )
{
Integer deviceIdentifier = i + firstDeviceNumber; // Our devices are identified as numbered 1,000 to 1,999.
Semaphore semaphore = new Semaphore( 1 , true ); // A single permit to make a binary semaphore, and make it fair.
deviceToSemaphoreMap.put( deviceIdentifier , semaphore );
}
// Run experiment.
// Notice that in Project Loom the `ExecutorService` interface is now `AutoCloseable`, for use in try-with-resources syntax.
try (
ScheduledExecutorService reportGeneratingExecutorService = Executors.newSingleThreadScheduledExecutor() ;
ExecutorService reportProcessingExecutorService = Executors.newVirtualThreadExecutor() ;
)
{
Runnable simulateIncommingReports = new IncomingReportsSimulatorRunnable( deviceToSemaphoreMap , reportProcessingExecutorService , countOfReportsToGeneratePerBatch , printToConsole , fauxDatabase );
ScheduledFuture scheduledFuture = reportGeneratingExecutorService.scheduleAtFixedRate( simulateIncommingReports , 0 , 1 , TimeUnit.SECONDS );
try {Thread.sleep( durationOfExperiment );} catch ( InterruptedException e ) {e.printStackTrace();}
}
// Notice that when reaching this point we block until all submitted tasks still running are finished,
// because that is the new behavior of `ExecutorService` being `AutoCloseable`.
System.out.println( "INFO - executor services shut down at this point. " + Instant.now() );
// Results of experiment
System.out.println( "fauxDatabase.size(): " + fauxDatabase.size() );
System.out.println( "fauxDatabase = " + fauxDatabase );
}
}