我使用 DBMS_SCHEDULER 和 PIPE 进行同步/IPC 解决了这个问题,它不依赖轮询并且不需要额外的表。不过,它仍然会在每个完成的工作中唤醒一次。
这是相当的努力,所以如果有人可以提出更简单的解决方案,请分享!
- 定义一个调用可以运行的实际过程的过程
来自程序/作业并处理 IPC(完成后将消息写入管道)。
- 定义一个调用此过程并定义要传递给该过程的参数的程序
- 定义从程序创建作业、将参数映射到作业参数并运行作业的过程
- 定义等待所有作业完成的逻辑:等待每个作业都在管道上发送消息。
--
-- Define stored procedures to be executed by job
--
/** Actual method that should be run in parallel*/
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS(
in_job IN VARCHAR2)
IS
BEGIN
-- Imagine a lot of IF, ELSEIF, ELSE statements here
DELETE FROM TEST_BONUS WHERE JOB=in_job;
END;
/
/** Stored procedure to be run from the job: Uses pipes for job synchronization, executes PROC_DELETE_TEST_BONUS. */
CREATE OR REPLACE PROCEDURE PROC_DELETE_TEST_BONUS_CONCUR(in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2)
IS
flag INTEGER;
BEGIN
-- Execute actual procedure
PROC_DELETE_TEST_BONUS(in_job);
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||': Success ' ||in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
EXCEPTION
WHEN OTHERS THEN
-- Signal completion
-- Use the procedure to put a message in the local buffer.
DBMS_PIPE.PACK_MESSAGE(SYSDATE ||':Failed ' || in_job);
-- Send message, success is a zero return value.
flag := DBMS_PIPE.SEND_MESSAGE(in_pipe_name);
RAISE;
END;
/
--
-- Run Jobs
--
DECLARE
timestart NUMBER;
duration_insert NUMBER;
jobs_amount NUMBER := 0;
retval INTEGER;
message VARCHAR2(4000);
rows_amount NUMBER;
/** Create and define a program that calls PROG_DELETE_TEST_BONUS_CONCUR to be run as job. */
PROCEDURE create_prog_delete_test_bonus
IS
BEGIN
-- define new in each run in order to ease development. TODO Once it works, no need to redefine for each run!
dbms_scheduler.drop_program(program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', force=> TRUE);
dbms_scheduler.create_program ( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR', program_action =>
'PROC_DELETE_TEST_BONUS_CONCUR', program_type => 'STORED_PROCEDURE', number_of_arguments => 2,
enabled => FALSE );
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name => 'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 1, argument_name => 'in_pipe_name', argument_type => 'VARCHAR2');
dbms_scheduler.DEFINE_PROGRAM_ARGUMENT( program_name=>'PROG_DELETE_TEST_BONUS_CONCUR',
argument_position => 2, argument_name => 'in_job', argument_type => 'VARCHAR2');
dbms_scheduler.enable('PROG_DELETE_TEST_BONUS_CONCUR');
END;
/** "Forks" a job that runs PROG_DELETE_TEST_BONUS_CONCUR */
PROCEDURE RUN_TEST_BONUS_JOB(
in_pipe_name IN VARCHAR2,
in_job IN VARCHAR2,
io_job_amount IN OUT NUMBER)
IS
jobname VARCHAR2(100);
BEGIN
jobname:=DBMS_SCHEDULER.GENERATE_JOB_NAME;
dbms_scheduler.create_job(job_name => jobname, program_name =>
'PROG_DELETE_TEST_BONUS_CONCUR');
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_pipe_name' , argument_value => in_pipe_name);
dbms_scheduler.set_job_argument_value(job_name => jobname, argument_name =>
'in_job' , argument_value => in_job);
dbms_output.put_line(SYSDATE || ': Running job: '|| jobname);
dbms_scheduler.RUN_JOB(jobname, false );
io_job_amount:= io_job_amount+1;
END;
-- Anonymous "Main" block
BEGIN
create_prog_delete_test_bonus;
-- Define private pipe
retval := DBMS_PIPE.CREATE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME, 100, FALSE);
dbms_output.put_line(SYSDATE || ': Created pipe: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned ' ||retval);
timestart := dbms_utility.get_time();
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'A' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'B' FROM DUAL CONNECT BY LEVEL <= 1000000;
INSERT into TEST_BONUS(ENAME, JOB) SELECT ROWNUM, 'C' FROM DUAL CONNECT BY LEVEL <= 1000000;
COMMIT;
duration_insert := dbms_utility.get_time() - timestart;
dbms_output.put_line(SYSDATE || ': Duration (1/100s): INSERT=' || duration_insert);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
timestart := dbms_utility.get_time();
-- -- Process sequentially
-- PROC_DELETE_TEST_BONUS('A');
-- PROC_DELETE_TEST_BONUS('B');
-- PROC_DELETE_TEST_BONUS('C');
-- start concurrent processing
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'A', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'B', jobs_amount);
RUN_TEST_BONUS_JOB(DBMS_PIPE.UNIQUE_SESSION_NAME, 'C', jobs_amount);
-- "Barrier": Wait for all jobs to finish
for i in 1 .. jobs_amount loop
-- Reset the local buffer.
DBMS_PIPE.RESET_BUFFER;
-- Wait and receive message. Timeout after an hour.
retval := SYS.DBMS_PIPE.RECEIVE_MESSAGE(SYS.DBMS_PIPE.UNIQUE_SESSION_NAME, 3600);
-- Handle errors: timeout, etc.
IF retval != 0 THEN
raise_application_error(-20000, 'Error: '||to_char(retval)||' receiving on pipe. See Job Log in table user_scheduler_job_run_details');
END IF;
-- Read message from local buffer.
DBMS_PIPE.UNPACK_MESSAGE(message);
dbms_output.put_line(SYSDATE || ': Received message on '''|| DBMS_PIPE.UNIQUE_SESSION_NAME ||''' (Status='|| retval ||'): ' || message);
end loop;
dbms_output.put(SYSDATE || ': Duration (1/100s): DELETE=');
dbms_output.put_line(dbms_utility.get_time() - timestart);
SELECT COUNT(*) INTO rows_amount FROM TEST_BONUS;
dbms_output.put_line(SYSDATE || ': COUNT(*) FROM TEST_BONUS: ' || rows_amount);
retval :=DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(systimestamp || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
EXCEPTION
WHEN OTHERS THEN
dbms_output.put_line(SYSDATE || SUBSTR(SQLERRM, 1, 1000) || ' ' ||
SUBSTR(DBMS_UTILITY.FORMAT_ERROR_BACKTRACE, 1, 1000));
retval := DBMS_PIPE.REMOVE_PIPE(DBMS_PIPE.UNIQUE_SESSION_NAME);
dbms_output.put_line(SYSDATE || ': REMOVE_PIPE: ''' || DBMS_PIPE.UNIQUE_SESSION_NAME || ''' returned: ' ||retval);
-- Clean up in case of error
PROC_DELETE_TEST_BONUS('A');
PROC_DELETE_TEST_BONUS('B');
PROC_DELETE_TEST_BONUS('C');
RAISE;
END;
/
您应该始终牢记,在作业中执行的更改是在单独的事务中提交的。
只是为了了解这种并发性实现了什么,这里有一些平均测量值:问题中的顺序代码大约需要 60 秒才能完成,并行代码大约需要 40 秒。
当并行运行的作业超过三个时,这将是一个有趣的进一步调查。
PS
以下是一个有用的查询,可帮助您了解作业的状态
SELECT job_name,
destination,
TO_CHAR(actual_start_date) AS actual_start_date,
run_duration,
TO_CHAR((ACTUAL_START_DATE+run_duration)) AS actual_end_date,
status,
error#,
ADDITIONAL_INFO
FROM user_scheduler_job_run_details
ORDER BY actual_start_date desc;