【问题标题】:How the caller thread wait till task under ScheduledExecutorService finish the job periodicaly调用者线程如何等待 ScheduledExecutorService 下的任务定期完成作业
【发布时间】:2020-07-23 17:03:45
【问题描述】:

我有一个要求,例如在进行 http 调用后必须更新每 45 分钟的值。

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;

import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class TokenManagerRunnable implements Runnable{
    
    String token;

    
    public String fetchToken() {
        return this.token;
    }
    

    @Override
    public void run() {
        
        String result = "";
        
        HttpPost post = new HttpPost("https://login.microsoftonline.com/{{TenentId}}/oauth2/token");
        List<NameValuePair> urlParameters = new ArrayList<>();
        urlParameters.add(new BasicNameValuePair("grant_type", "client_credentials"));
        urlParameters.add(new BasicNameValuePair("client_id", "some client id"));
        urlParameters.add(new BasicNameValuePair("client_secret", "some secret id"));
        urlParameters.add(new BasicNameValuePair("resource", "https://database.windows.net"));

        try {
            post.setEntity(new UrlEncodedFormEntity(urlParameters));
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        try (CloseableHttpClient httpClient = HttpClients.createDefault();
             CloseableHttpResponse response = httpClient.execute(post)){

            result = EntityUtils.toString(response.getEntity());
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        
        ObjectNode node;
        try {
            node = new ObjectMapper().readValue(result, ObjectNode.class);
        
        
        System.out.println(result);
        
        if (node.has("access_token")) {
            result = node.get("access_token").toString();           
        }
        System.out.println(result);
        System.out.println(result.substring(1, result.length()-1));
        
        
        //updating the token
        this.token = result.substring(1, result.length()-1);
        
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

这是我的主要功能

        SQLServerDataSource ds = new SQLServerDataSource();
        TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable();
        ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
        sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
        System.out.println("fetching the token ---- "+tokenManagerRunnable.fetchToken());
        ds.setAccessToken(tokenManagerRunnable.fetchToken());
       try {
        
        Connection connection = ds.getConnection(); 
        Statement stmt = connection.createStatement();
        ResultSet rs = stmt.executeQuery(" select * from [dbo].[CLIENT]"); 
        System.out.println("You have successfully logged in");
           
        while(rs.next()) {
            System.out.println(rs.getString(1));
        }
        
    }catch(Exception ex) {
        ex.printStackTrace();
    }

tokenManagerRunnable.fetchToken() 会带来 null,因为 TokenManagerRunnable 类尚未执行。

我们如何实现等待 ScheduledExecutorService 完成任务,以便我们可以在每 45 分钟后从 tokenManagerRunnable.fetchToken() 获取值并在 Datasource 中设置新值而不是 null?

有什么想法吗?

【问题讨论】:

    标签: java multithreading executorservice scheduledexecutorservice


    【解决方案1】:

    如果我的问题没问题,您可以使用CompletableFuture。您将token 包装在CompletableFuture 中,调度线程完成它。由于CompletableFutureFuture,其他线程可以等待结果。

    这是一个说明机制的基本实现。

    import java.util.concurrent.CompletableFuture;
    
    class Main {
        
        static CompletableFuture<String> token = new CompletableFuture<>();
        
        
        public static void main(String[] args) {
            new Thread(() -> {
                for (int i = 0; i < 100_000_000; i++) {
                    Math.log(Math.sqrt(i));
                    if (i % 10_000_000 == 0) {
                        System.out.println("doing stuff");
                    }
                }
                token.complete("result");
            }).start();
            
            String result = token.join(); // wait until token is set and get it
            System.out.println("got " + result);
        }
    }
    
    

    请记住,在获得结果后,您必须为 token 分配一个新的 CompletableFuture。那是因为它们只能完成一次。

    【讨论】:

      【解决方案2】:

      如您所知,这是一个同步问题。 您主要有两种同步线程的方法:

      • 使用连接同步,
      • 异步使用回调。

      从重复性任务的异步性质来看,我想说最好的办法是使用回调。 这样您就可以在检索时设置新的令牌值。

      我在下面更新了你的代码:

      // Imports [...]
      
      public class TokenManagerRunnable implements Runnable {
          private final SQLServerDataSource ds;
      
          /**
           * New constructor taking a datasource to trigger the persistence of the token
           * on retrieval.
           * @param ds
           */
          public TokenManagerRunnable(SQLServerDataSource ds) {
              this.ds = ds;
          }
      
          /**
           * New method persisting the token on datasource when retrieved.
           * @param token
           */
          private void setToken(String token) {
              System.out.println("fetching the token ---- " + token);
              this.ds.setAccessToken(token);
          }
      
          @Override
          public void run() {
              // Retrieval [...]
              try {
                  // Parsing [...]
                  //updating the token
                  this.setToken(result.substring(1, result.length() - 1));
      
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }
      }
      

      如您所见,您不需要 Runner 上的任何状态,因为它将直接将结果流式传输到数据源。 您只需将此数据源提供给正在施工的跑步者。

      SQLServerDataSource ds = new SQLServerDataSource();
      TokenManagerRunnable tokenManagerRunnable = new TokenManagerRunnable(ds);
      
      // Run 1 time synchronously, at the end of this run call
      // the token will be set
      tokenManagerRunnable.run();
              
      // Schedule a token refresh every 45 minutes starting now
      ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
      sches.scheduleWithFixedDelay(tokenManagerRunnable, 0, 45, TimeUnit.MINUTES);
      
      // Use the token [...]
      

      编辑

      正如您在评论中所说,您需要先执行Runnable,然后才能联系您的数据库。 您需要同步执行此操作或在单独的线程中添加以下代码,具体取决于您打算对应用程序执行的操作。下面的问题是您的应用程序是否依赖于这个初始化?

      事实上,您可以调用 run() 方法而不将其放入线程中,这将简单地同步运行您的令牌更新。 这意味着您需要在计划线程执行中开始自动刷新之前同步调用tokenManagerRunnable.run();

      【讨论】:

      • 即使任务未完成,我的调用者线程也会继续进行,我也使用了您的代码更改。我已经更新了这个问题,所以它会给你一个清晰的画面,我被困在哪里。
      • 所以你需要第一次同步执行,我用编辑部分相应地更新了答案:)
      【解决方案3】:

      我能够使用以下代码实现

      import com.fasterxml.jackson.databind.ObjectMapper;
      import com.fasterxml.jackson.databind.node.ObjectNode;
      import com.microsoft.sqlserver.jdbc.SQLServerDataSource;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.apache.http.NameValuePair;
      import org.apache.http.client.entity.UrlEncodedFormEntity;
      import org.apache.http.client.methods.CloseableHttpResponse;
      import org.apache.http.client.methods.HttpPost;
      import org.apache.http.impl.client.CloseableHttpClient;
      import org.apache.http.impl.client.HttpClients;
      import org.apache.http.message.BasicNameValuePair;
      import org.apache.http.util.EntityUtils;
       
      import java.util.ArrayList;
      import java.util.List;
       
      public class AzureServicePrincipleTokenManager implements Runnable {
       
       
          SQLServerDataSource ds ;
          String secret;
          String clientId;
          String tenetId;
          static final String RESOURCE = "https://database.windows.net";
          static final String ENDPOINT = "https://login.microsoftonline.com/";
          static final String GRANT_TYPE = "client_credentials";
          boolean confirmTokenFlag=false;
          private static Log logger = LogFactory.getLog( "AzureServicePrincipleTokenManager" );
       
          public AzureServicePrincipleTokenManager(SQLServerDataSource ds, String tenetId, String clientId, String secret) {
              this.ds = ds;
              this.secret = secret;
              this.clientId = clientId;
              this.tenetId = tenetId;
          }
       
          public boolean getConfirmTokenFlag(){
              return this.confirmTokenFlag;
          }
       
          private void setAccessToken(String token){
              this.ds.setAccessToken(token);
          }
       
       
       
       
          @Override
          public void run() {
              logger.info("Fetching Service Principle accessToken");
              String result = "";
              HttpPost post = new HttpPost(ENDPOINT+this.tenetId+"/oauth2/token");
              List<NameValuePair> urlParameters = new ArrayList<>();
              urlParameters.add(new BasicNameValuePair("grant_type", GRANT_TYPE));
              urlParameters.add(new BasicNameValuePair("client_id", this.clientId));
              urlParameters.add(new BasicNameValuePair("client_secret", this.secret));
              urlParameters.add(new BasicNameValuePair("resource", RESOURCE));
       
              try{
       
                  post.setEntity(new UrlEncodedFormEntity(urlParameters));
                  CloseableHttpClient httpClient = HttpClients.createDefault();
                  CloseableHttpResponse response = httpClient.execute(post);
                  result = EntityUtils.toString(response.getEntity());
                  ObjectNode node = new ObjectMapper().readValue(result, ObjectNode.class);
       
                  if (node.has("access_token")) {
                      result = node.get("access_token").toString();
                  }
       
              }catch (Exception ex){
                  logger.error(ex.getMessage(),ex);
              }
       
              this.setAccessToken(result.substring(1, result.length()-1));
              confirmTokenFlag=true;
              logger.info("set confirmTokenFlag true");
       
       
          }
      } 
      

      调用者会是这样的

              SQLServerDataSource ds = new SQLServerDataSource();
      
              AzureServicePrincipleTokenManager azureServicePrincipleTokenManager = new AzureServicePrincipleTokenManager(ds,"your tenentID","your clientID","your secret");
              ScheduledExecutorService sches = Executors.newScheduledThreadPool(1);
              sches.scheduleWithFixedDelay(azureServicePrincipleTokenManager, 0, 45, TimeUnit.MINUTES);
              logger.info("----ExecuterService started the Runnable task");
      
              while (azureServicePrincipleTokenManager.getConfirmTokenFlag()!=true){
      
                  ds.getAccessToken(); //I wonder If i leave while body balnk it never become true. so intentionally i'm calling ds.getAccessToken();
              }
                  logger.info("----get the token after settingup  "+ds.getAccessToken());
      

      【讨论】:

      • 如果您已经解决了自己的问题,那么您可以将您的答案标记为解决方案。那么帖子就不会永远处于找不到解决方案的状态。
      猜你喜欢
      • 1970-01-01
      • 2017-12-16
      • 2021-07-11
      • 1970-01-01
      • 2011-06-02
      • 2013-12-11
      • 1970-01-01
      • 1970-01-01
      • 2011-06-05
      相关资源
      最近更新 更多