【问题标题】:Java threads:Comminucation between 3 java threadsJava线程:3个java线程之间的通信
【发布时间】:2018-04-29 20:31:40
【问题描述】:

所以我有 3 个线程,

从互联网下载音频和音频时间表对象的线程。 根据音频时间表播放音频的线程。 还有一个网络套接字“通知”侦听器,用于侦听说我们必须下载新音频并安排因为我们当前的音频已过时的消息。

程序流程如下: 在应用程序启动时:ScheduleDownloader 启动,下载音频和时间表文件。完成后,它需要告诉音频播放器“嘿,文件准备好了,这是时间表”,它现在不需要做任何事情

音频播放器启动并连续循环,没有退出条件。 web socket监听器启动,当它收到一条消息时,它应该告诉调度下载器“你需要重新开始,因为有新的文件需要下载”,它不需要向下载的调度发送任何数据,只需启动它再次启动。音乐应该继续播放。一旦完成,它现在应该使用新的时间表重新启动音频播放器线程。

这是我目前所拥有的,我不知道如何让 ScheduleDownloader 告诉 AudioPlayer“文件已准备好,您需要开始,这是时间表”或“您需要重新启动新的时间表,在这里它是”或如何让听众说“ScheduleDownloader 你需要重新开始”

public class ScheduleDownloader extends Thread {
private Thread t;
private String threadName;

   String username;
 String password;

    public ScheduleDownloader(String username,String password,String threadName){
        this.username = username;
        this.password = password;
        this.threadName= threadName;
    }
public void start () {
    System.out.println("Starting " +  threadName );
    if (t == null) {
        t = new Thread (this, threadName);
        t.start ();
    }}
public void run() {
    try {
        Schedule schedule= null;
        while(schedule == null){
            System.out.println("Searching for schedule");
           schedule= getTodaysSchedule();
        }
        System.out.println("Schedule Found");
        boolean result = false;
        while(result == false){
            result = downloadFiles(schedule);
        }
        System.out.println("Files Downloaded");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

public Schedule getTodaysSchedule() throws IOException {
        Schedule schedule = null;
        CredentialsProvider provider = new BasicCredentialsProvider();
        UsernamePasswordCredentials credentials
                = new UsernamePasswordCredentials(username,password);
        provider.setCredentials(AuthScope.ANY, credentials);


        String url = "http://localhost:5000/api/schedule/today";
        HttpClient httpClient = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); //Use this instead
        HttpGet request = new HttpGet(url);

        HttpResponse response = httpClient.execute(request);
  //read content response body
        if (response.getStatusLine().getStatusCode() != 200) {
            System.out.println("sorry error:" + response.getStatusLine().getStatusCode());
        } else {
            BufferedReader rd = new BufferedReader(
                    new InputStreamReader(response.getEntity().getContent()));

            StringBuffer result = new StringBuffer();
            String line = "";
            while ((line = rd.readLine()) != null) {
                result.append(line);
            }
            //change json response to java objects


            Gson gson = new Gson();
             schedule = gson.fromJson(String.valueOf(result),Schedule.class);

}
        return schedule;

}
public static boolean downloadFiles(Schedule schedule) {
 //get the music
    for(int i =0;i<schedule.getMusicScheduleItems().size();i++){
    downloadOneFile("shoutloudaudio","music/" +
            schedule.getMusicScheduleItems().get(i).getMusic().getId()+
            "-music.wav");
    }
    //get the advertisements
    for(int i =0;i<schedule.getAdvertisementScheduleItems().size();i++){
        downloadOneFile("shoutloudaudio","advertisements/" +
                schedule.getAdvertisementScheduleItems().get(i).getAdvertisement().getId()+
                "-advertisement.wav");
    }

    return true;


}

public static boolean downloadOneFile(String bucketName,String key) {
    if( new File(key.split("/")[1]).isFile()){
        //check if we have it already and dont need to download it

        System.out.println(key + " alraeady exits");
        return true;
    }
    AWSCredentials awsCredentials = new BasicAWSCredentials(
            "removed",
            "removed"
    );
    AmazonS3 s3client = AmazonS3ClientBuilder
            .standard()
            .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
            .withRegion(Regions.EU_WEST_1)
            .build();
    S3Object s3object = s3client.getObject(bucketName, key);
    S3ObjectInputStream inputStream = s3object.getObjectContent();
    InputStream reader = new BufferedInputStream(
            inputStream);
    File file = new File(key.split("/")[1]);//save the file as whats after the / in key
    OutputStream writer = null;
    try {
        writer = new BufferedOutputStream(new FileOutputStream(file));
    } catch (FileNotFoundException e) {
        e.printStackTrace();
        return false;
    }

    int read = -1;
try {
while ((read = reader.read()) != -1) {
    writer.write(read);
}

writer.flush();
writer.close();
}catch(IOException e){
e.printStackTrace();
return false;
}
    return true;
}
}

音频播放器

public class AudioPlayer extends Thread {
Long currentFrameMusic;
Long currentFrameAdvertisement;
Clip clipMusic;
Clip clipAdvertisement;
private Thread t;

 // current status of clip
 String statusMusic;
 String statusAdvertisement;

static AudioInputStream musicInputStream;
static AudioInputStream advertisementInputStream;
static String filePath;
Schedule schedule;

 // constructor to initialize streams and clip
public AudioPlayer(Schedule schedule)
        throws UnsupportedAudioFileException,
        IOException, LineUnavailableException
{
    //setup audio stream for music first
    // create AudioInputStream object
this.schedule = schedule;
    appendMusicFiles(schedule);

    // create clip reference
    clipMusic = AudioSystem.getClip();

    // open audioInputStream to the clip
    clipMusic.open(musicInputStream);

    clipMusic.loop(Clip.LOOP_CONTINUOUSLY);
}

public void run(){
    playMusic();
    try {
        checkShouldWePlayAnAdvertisement();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (UnsupportedAudioFileException e) {
        e.printStackTrace();
    } catch (LineUnavailableException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
public void start(){
    t = new Thread (this, "AudioPlayerThread");
    t.start ();
}
public void start2() throws IOException, UnsupportedAudioFileException, LineUnavailableException, InterruptedException {
playMusic();
checkShouldWePlayAnAdvertisement();
}
public void playMusic()
{
    //start the clip
    clipMusic.start();

    statusMusic = "play";
}

// Method to pause the audio
public void pauseMusic()
{
    if (statusMusic.equals("paused"))
    {
        System.out.println("audio is already paused");
        return;
    }
    this.currentFrameMusic =
            this.clipMusic.getMicrosecondPosition();
    clipMusic.stop();
    statusMusic = "paused";
    System.out.println("pausing music");
}

// Method to resume the audio
public void resumeAudioMusic() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException
{
    if (statusMusic.equals("play"))
    {
        System.out.println("Audio is already "+
                "being played");
        return;
    }
    clipMusic.close();
    resetAudioStreamMusic();
    clipMusic.setMicrosecondPosition(currentFrameMusic);
    System.out.println("resuming music");
    this.playMusic();
}

// Method to restart the audio
public void restartMusic() throws IOException, LineUnavailableException,
        UnsupportedAudioFileException
{
    clipMusic.stop();
    clipMusic.close();
    resetAudioStreamMusic();
    currentFrameMusic = 0L;
    clipMusic.setMicrosecondPosition(0);
    this.playMusic();
}

// Method to stop the audio
public void stopMusic() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException
{
    currentFrameMusic = 0L;
    clipMusic.stop();
    clipMusic.close();
}
public void resetAudioStreamMusic() throws UnsupportedAudioFileException, IOException,
        LineUnavailableException
{
   clipMusic =  AudioSystem.getClip();
   appendMusicFiles(schedule);

    // open audioInputStream to the clip
    clipMusic.open(musicInputStream);

    clipMusic.loop(Clip.LOOP_CONTINUOUSLY);
}

public static void appendMusicFiles(Schedule schedule) throws IOException, UnsupportedAudioFileException {
    //add the first audio file to stream
    AudioInputStream appendedFiles = AudioSystem.getAudioInputStream(
            new File(schedule.getMusicScheduleItems().get(0).getMusic()
                    .getId() + "-music.wav"));
    //loop through an combine
    for(int i =1;i<schedule.getMusicScheduleItems().size();i++){

        File file=  new File(schedule.getMusicScheduleItems().get(i).getMusic()
                .getId() + "-music.wav");
        AudioInputStream toBeAppended = AudioSystem.getAudioInputStream(file);
        //append them
        appendedFiles =
                new AudioInputStream(
                        new SequenceInputStream(appendedFiles, toBeAppended),
                        appendedFiles.getFormat(),
                        appendedFiles.getFrameLength() + toBeAppended.getFrameLength());
    }
    musicInputStream = appendedFiles;

}

//advertisement methods

public void playAdvertisements() throws LineUnavailableException, IOException, InterruptedException {
    clipAdvertisement = AudioSystem.getClip();

    // open audioInputStream to the clip
    clipAdvertisement.open(advertisementInputStream);

    System.out.println(clipAdvertisement.getMicrosecondLength());
    //start the clip
    clipAdvertisement.start();
    Thread.sleep(clipAdvertisement.getMicrosecondLength() / 1000);
    statusAdvertisement = "play";

    System.out.println("playing advertisements");
}

// Method to pause the audio
public void pauseAdvertisements()
{
    if (statusAdvertisement.equals("paused"))
    {
        System.out.println("audio is already paused");
        return;
    }
    this.currentFrameAdvertisement =
            this.clipAdvertisement.getMicrosecondPosition();
    clipAdvertisement.stop();
    statusAdvertisement = "paused";
}

// Method to resume the audio
public void resumeAudioAdvertisement() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException, InterruptedException {
    if (statusAdvertisement.equals("play"))
    {
        System.out.println("Audio is already "+
                "being played");
        return;
    }
    clipAdvertisement.close();
    resetAudioStreamAdvertisement();
    clipAdvertisement.setMicrosecondPosition(currentFrameMusic);
    this.playAdvertisements();
}

// Method to restart the audio
public void restartAdvertisement() throws IOException, LineUnavailableException,
        UnsupportedAudioFileException, InterruptedException {
    clipAdvertisement.stop();
    clipAdvertisement.close();
    resetAudioStreamAdvertisement();
    currentFrameAdvertisement = 0L;
    clipAdvertisement.setMicrosecondPosition(0);
    this.playAdvertisements();
}

// Method to stop the audio
public void stopAdvertisement() throws UnsupportedAudioFileException,
        IOException, LineUnavailableException, InterruptedException {
    currentFrameAdvertisement = 0L;
    clipAdvertisement.stop();
    clipAdvertisement.close();
    System.out.println("stopping advertisement");
}
public void resetAudioStreamAdvertisement() throws UnsupportedAudioFileException, IOException,
        LineUnavailableException
{
    advertisementInputStream = AudioSystem.getAudioInputStream(
            new File(filePath).getAbsoluteFile());
    clipAdvertisement.open(musicInputStream);
    clipAdvertisement.loop(Clip.LOOP_CONTINUOUSLY);
}

public static void appendAdvertisementFiles(List<Advertisement> advertisementItems) throws IOException, UnsupportedAudioFileException {
    //add the first audio file to stream
    AudioInputStream appendedFiles = AudioSystem.getAudioInputStream(
            new File(advertisementItems.get(0)
                    .getId() + "-advertisement.wav"));
    //loop through an combine
    for(int i =1;i<advertisementItems.size();i++){

        File file=  new File(advertisementItems.get(i)
                .getId() + "-advertisement.wav");
        AudioInputStream toBeAppended = AudioSystem.getAudioInputStream(file);
        //append them
        appendedFiles =
                new AudioInputStream(
                        new SequenceInputStream(appendedFiles, toBeAppended),
                        appendedFiles.getFormat(),
                        appendedFiles.getFrameLength() + toBeAppended.getFrameLength());
    }
    advertisementInputStream = appendedFiles;

}

     public void checkShouldWePlayAnAdvertisement() throws IOException, UnsupportedAudioFileException, LineUnavailableException, InterruptedException {
    ArrayList<String> playedAtTimes = new ArrayList<>();
    ArrayList<Advertisement> advertisementsToBePlayed = new ArrayList<>();
    boolean found;
    //played at times is used to keep track of what time we played advertisements
    //so when the loop reruns and the time hasnt changed it doesnt play it again
    while(true){
        found = false;
        ZonedDateTime zdt = ZonedDateTime.now();
      String timeHHMM =zdt.toString().substring(11,16);
     for(int i =0;i<schedule.getAdvertisementScheduleItems().size();i++)
  {







    if(schedule.getAdvertisementScheduleItems().get(i).getTimes()
    .contains(timeHHMM))
      {
//this item should be played now
if(playedAtTimes.contains(timeHHMM)){
    //we already played this,but the time hasnt changed when the loop ran again
}else{
advertisementsToBePlayed.add(schedule.getAdvertisementScheduleItems().get(i).getAdvertisement());
found = true;
}
}
}
 if(found== true){
playedAtTimes.add(timeHHMM);
appendAdvertisementFiles(advertisementsToBePlayed);
pauseMusic();
playAdvertisements();
stopAdvertisement();

resumeAudioMusic();
}

    }
 }
 }

IotClient(监听器的一部分)

public class IotClient extends Thread {
 Thread t;
 String username;
public IotClient(String username)  {
    this.username = username;
}
 public void run(){
 String clientEndpoint = "removve";       // replace <prefix> and <region> with your own
 String clientId = "1";                              // replace with your own client ID. Use unique client IDs for concurrent connections.

// AWS IAM credentials could be retrieved from AWS Cognito, STS, or other secure sources
AWSIotMqttClient client = new AWSIotMqttClient(clientEndpoint, clientId, "remove", "remove");

    // optional parameters can be set before connect()
try {
    client.connect();
} catch (AWSIotException e) {
    e.printStackTrace();
}
AWSIotQos qos = AWSIotQos.QOS0;
AWSIotTopic topic = new MyTopic("schedule/"+ username, qos);
try {
    client.subscribe(topic, true);
} catch (AWSIotException e) {
    e.printStackTrace();
}
while(true){

}
}
public void start(){
if (t == null) {
    t = new Thread (this, "IotClientThread");
    t.start ();
}
   }

MyTopic(听众的一部分)

public class MyTopic extends AWSIotTopic {
public MyTopic(String topic, AWSIotQos qos) {
    super(topic, qos);
}

@Override
public void onMessage(AWSIotMessage message) {
    System.out.println("Message recieved from topic: "+ message.getStringPayload());
}
}

【问题讨论】:

  • 附带问题:您几乎从不想要扩展 Thread,而是根据需要实现 RunnableCallable&lt;V&gt;
  • 是的,我意识到扩展线程会限制你继承其他东西,但对于我的用例来说,这很好
  • 不仅如此,它还限制了您可以对它们进行的操作。现在你不能使用 ExecutorService 或线程池,......只是不要这样做。
  • 此外,这种类型的问题通常是XY Problem 类型的问题,因为“线程”不通信 -- objects 可以。
  • 只是一个提示,对于这样一个通用问题来说,具体代码太多了,你的问题应该更早更清楚地说明。

标签: java multithreading


【解决方案1】:

线程通过对内存中消息“容器”对象的共享引用进行通信。这可能很简单,只是某个类的共享实例的可变字段,或者更典型的集合,如列表、地图,尤其是队列。

ArrayBlockingQueue 是一个很好的共享参考。从一个线程到另一个线程的每个消息方向都会有一个队列。如果您有 3 个线程可以真正相互交谈,那么您将有 3 对,因此有 6 个队列(每对 2 个)。但是,消息通常只在一个方向流动,因此您可以节省一些。

现在,这些通信的核心是等待某些消息(读者/消费者),并在消息被推送时通知(作者/生产者)的机制。

当然,你可以从底层学习(大量教程),从原始的等待/通知,或者你可以跳到像 ArrayBlockingQueue 这样的类,它将消息的等待/通知抽象到 take()/放()。我建议从底部开始,因为当您遇到 java.util.concurrent.* 中的其他类时,事情会变得更快。*

我不能给你代码,如果没有学习 ITC(线程间通信)的基础知识,在你的水平上基本上是无法理解的。

好好学习!

PS:路上有很多陷阱,比如线程安全、写入的原子性、无锁算法、死锁、活锁、饥饿。只是上面的多队列示例可能导致对消息到达的循环依赖,特别是当队列已满并阻塞时。这是一门科学!

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2011-04-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-13
    • 2013-04-30
    • 1970-01-01
    • 2013-05-18
    相关资源
    最近更新 更多