Commit 467a35db authored by Simon Brass's avatar Simon Brass
Browse files

Clean up and add sedable TIME signatures.

parent 0865bd8e
!> \file balancer_stub.f90 - Mimick a full-blown load balancer.
module balancer
use iso_fortran_env, only: ERROR_UNIT
use mpi_f08
implicit none
private
type :: balancer_t
!! Map local i_channel to globale i_channel index.
integer, dimension(:), allocatable :: map
contains
procedure :: init => balancer_init
procedure :: n_channels => balancer_n_channels
procedure :: i_channel => balancer_i_channel
procedure :: mapped => balancer_mapped
generic :: write(formatted) => print
procedure, private :: print => balancer_print
end type balancer_t
public :: balancer_t
contains
subroutine balancer_init (balancer, n_channels, comm)
class(balancer_t), intent(out) :: balancer
integer, intent(in) :: n_channels
type(MPI_COMM), intent(in) :: comm
integer :: i, n_channels_local, n_size, rank
call MPI_COMM_SIZE (comm, n_size)
call MPI_COMM_RANK (comm, rank)
if (rank == 0) then
balancer%map = [(i, i = 1, n_channels)]
else
!! Exclude master worker from share.
n_channels_local = n_channels / (n_size - 1)
!! Share remainding channels along workers (uniquely).
if (mod (n_channels, n_size - 1) >= rank) n_channels_local = n_channels_local + 1
allocate (balancer%map (n_channels_local), &
source = [(rank + (i - 1) * (n_size - 1), i = 1, n_channels_local)])
end if
end subroutine balancer_init
integer function balancer_n_channels (balancer) result (n_channels_local)
class(balancer_t), intent(in) :: balancer
n_channels_local = size (balancer%map)
end function balancer_n_channels
integer function balancer_i_channel (balancer, i_channel_local) result (i_channel)
class(balancer_t), intent(in) :: balancer
integer, intent(in) :: i_channel_local
i_channel = balancer%map (i_channel_local)
end function balancer_i_channel
logical function balancer_mapped (balancer, i_channel) result (flag)
class(balancer_t), intent(in) :: balancer
integer, intent(in) :: i_channel
flag = any (i_channel == balancer%map)
end function balancer_mapped
subroutine balancer_print (dtv, unit, iotype, vlist, iostat, iomsg)
class(balancer_t), intent(in) :: dtv
integer, intent(in) :: unit
character (len=*), intent(in) :: iotype
integer, dimension(:), intent(in) :: vlist
integer, intent(out) :: iostat
character (len=*), intent(inout) :: iomsg
write (unit, iostat=iostat) "N_CHANNELS: ", size (dtv%map)
write (unit, iostat=iostat) "> ", dtv%map
end subroutine balancer_print
end module balancer
program main
#include "debug.h"
use iso_fortran_env, only: r64 => REAL64, OUTPUT_UNIT
use iso_c_binding, only: c_int
use balancer
use request_handler
use result_handler
use mpi_f08
use utils, only: sleep
use utils, only: sleep, cpu_time_t
implicit none
......@@ -15,57 +15,59 @@ program main
type(result_t), dimension(n_channels), target :: result
integer :: i_channel, rank, n_size, source, n_handler, handler_id
real :: r
integer(c_int) :: secs, how_long
type(balancer_t) :: blc
type(request_handler_manager_t) :: rhm
type(cpu_time_t) :: timer
call timer%begin ()
write (ERROR_UNIT, *) "#START", timer%since_start ()
call MPI_INIT ()
call MPI_COMM_RANK (MPI_COMM_WORLD, rank)
call MPI_COMM_SIZE (MPI_COMM_WORLD, n_size)
if (rank == 0) then
n_handler = n_channels
call rhm%init (n_size = n_handler, comm = MPI_COMM_WORLD)
else
n_handler = n_channels / (n_size - 1)
write (ERROR_UNIT, *) "[MOD]", mod (n_channels, n_size - 1)
if (mod (n_channels, n_size - 1) >= rank) n_handler = n_handler + 1
call rhm%init (n_size = n_handler, comm = MPI_COMM_WORLD)
write (ERROR_UNIT, *) "[INIT]", rank, n_handler
end if
write (ERROR_UNIT, *) "#MPI_INIT", timer%since_start ()
!! Must be called after MPI_INIT ().
call blc%init (n_channels, MPI_COMM_WORLD)
n_handler = blc%n_channels ()
call rhm%init (n_size = n_handler, comm = MPI_COMM_WORLD)
handler_id = 0
do i_channel = 1, n_channels
write (ERROR_UNIT, *) "[", rank, "]", i_channel, n_size
source = mod (i_channel - 1, n_size - 1) + 1
write (ERROR_UNIT, *) "#INIT", timer%since_start ()
call MPI_BARRIER (MPI_COMM_WORLD)
do i_channel = 1, blc%n_channels ()
write (ERROR_UNIT, *) "#CHANNEL", timer%since_start ()
handler_id = i_channel
source = blc%i_channel (i_channel)
!! Master: Register only handlers.
if (rank == 0) then
handler_id = i_channel
call register_handle (rhm, handler_id, result(i_channel))
else if (rank == source) then
handler_id = handler_id + 1
! Slave: Computation.
call random_number (r)
secs = int (r * 4, c_int)
write (ERROR_UNIT, *) "COMPUTATION TIME: ", secs
how_long = sleep (secs)
result(i_channel)%samples = i_channel
result(i_channel)%sum_integral = i_channel
result(i_channel)%sum_integral_sq = i_channel**2
write (ERROR_UNIT, *) "#REGISTER", timer%since_start ()
call register_handle (rhm, handler_id, result(i_channel))
if (handler_id > 1) call rhm%handler_wait (handler_id - 1)
call rhm%request (i_channel, handler_id)
!! Slave: Compute and register handler.
else if (blc%mapped (source)) then
call slave_compute (result(source), source)
write (ERROR_UNIT, *) "#COMPUTE", timer%since_start ()
call register_handle (rhm, handler_id, result(source))
write (ERROR_UNIT, *) "#REGISTER", timer%since_start ()
call rhm%request (source, handler_id)
write (ERROR_UNIT, *) "#REQUEST", timer%since_start ()
!! Free request and status handler?
end if
end do
if (rank == 0) then
!! Handle requests, and finish all communication of the handlers.
write (ERROR_UNIT, *) "#HANDLE", timer%since_start ()
call rhm%handle_request ()
else
call rhm%handler_wait (handler_id)
!! Wait for last request to finish.
! call rhm%handler_wait (handler_id)
end if
write (ERROR_UNIT, *) "#FINISH", timer%since_start ()
call rhm%handler_waitall ()
write (ERROR_UNIT, *) "#WAIT", timer%since_start ()
if (rank == 0) then
print *, result
end if
......@@ -80,4 +82,17 @@ contains
call handler%init (result, result%get_n_requests ())
call rhm%register (id, handler)
end subroutine register_handle
subroutine slave_compute (result, val)
type(result_t), intent(inout) :: result
integer, intent(in) :: val
real :: r
integer(c_int) :: secs, how_long
call random_number (r)
secs = int (r * 10, c_int)
how_long = sleep (secs)
result%samples = val
result%sum_integral = val
result%sum_integral_sq = val**2
end subroutine slave_compute
end program main
......@@ -42,7 +42,6 @@ contains
class(result_handler_t), intent(inout) :: handler
integer, intent(in) :: source
type(MPI_COMM), intent(in) :: comm
if (associated (handler%obj)) write (ERROR_UNIT, *) "[ASSOCIATED]"
call handler%obj%receive (source, comm, handler%request)
handler%finished = .false.
end subroutine result_handler_handle
......@@ -51,9 +50,7 @@ contains
class(result_handler_t), intent(inout) :: handler
integer, intent(in) :: rank
type(MPI_COMM), intent(in) :: comm
if (associated (handler%obj)) write (ERROR_UNIT, *) "CLIENT: [ASSOCIATED]"
call handler%obj%send (rank, comm, handler%request)
!! \todo{sbrass} Communication has not finished... so, no success?
handler%finished = .false.
end subroutine result_handler_client_handle
......@@ -71,14 +68,12 @@ contains
integer, intent(in) :: receiver
type(MPI_COMM), intent(in) :: comm
type(MPI_REQUEST), dimension(3), intent(inout) :: reqs
write (ERROR_UNIT, *) "RESULT SEND TO: ", receiver
call MPI_ISEND (result%samples, 1, MPI_INTEGER, &
receiver, 1, comm, reqs(1))
call MPI_ISEND (result%sum_integral, 1, MPI_DOUBLE_PRECISION, &
receiver, 1, comm, reqs(2))
call MPI_ISEND (result%sum_integral_sq, 1, MPI_DOUBLE_PRECISION, &
receiver, 2, comm, reqs(3))
write (ERROR_UNIT, *) "... DISPATCHED"
end subroutine result_send
subroutine result_receive (result, source, comm, reqs)
......@@ -86,14 +81,12 @@ contains
integer, intent(in) :: source
type(MPI_COMM), intent(in) :: comm
type(MPI_REQUEST), dimension(3), intent(inout) :: reqs
write (ERROR_UNIT, *) "RESULT RECEIVE FROM: ", source
call MPI_IRECV (result%samples, 1, MPI_INTEGER, &
source, 1, comm, reqs(1))
call MPI_IRECV (result%sum_integral, 1, MPI_DOUBLE_PRECISION, &
source, 1, comm, reqs(2))
call MPI_IRECV (result%sum_integral_sq, 1, MPI_DOUBLE_PRECISION, &
source, 2, comm, reqs(3))
write (ERROR_UNIT, *) "... DISPATCHED"
end subroutine result_receive
integer function result_get_n_requests (result) result (n_requests)
......
......@@ -39,9 +39,6 @@ module request_handler
type(MPI_REQUEST), dimension(:), allocatable :: request
type(MPI_STATUS), dimension(:), allocatable :: status
logical :: finished = .false.
#ifdef TIME
type(cpu_time_t) :: reqT
#endif
contains
!! \todo{sbrass} implement initialization procedure.
procedure(request_handler_handle), deferred :: handle
......@@ -76,7 +73,7 @@ module request_handler
procedure :: handle => request_handler_manager_handle
procedure :: handle_request => request_handler_manager_handle_request
procedure :: handler_waitall => request_handler_manager_handler_waitall
procedure :: handler_testall => request_handler_manager_handler_testall
procedure :: handler_test => request_handler_manager_handler_test
procedure :: handler_wait => request_handler_manager_handler_wait
generic :: write(formatted) => print
procedure, private :: print => request_handler_manager_write
......@@ -189,25 +186,20 @@ contains
type(MPI_REQUEST) :: req
type(MPI_STATUS) :: stat
integer, parameter :: REQUEST_TAG = 1
write (ERROR_UNIT, *) rhm%comm_rank, "REQUESTS", server_id, "..."
!! Check whether previous request has already finished.
if (client_id > 1) then
!! Test locally the request, if not finished, wait until finished.
! if (.not. rhm%handler_test (client_id - 1)) call rhm%handler_wait (client_id - 1)
call rhm%handler_wait (client_id - 1)
end if
call MPI_ISEND (server_id, 1, MPI_INTEGER, &
0, REQUEST_TAG, rhm%comm, req)
call MPI_WAIT (req, stat)
write (ERROR_UNIT, *) rhm%comm_rank, "... FINISHED."
!! \todo{sbrass} Catch MPI error.
!! Client handle, which may differ from the server handle.
call rhm%handler(client_id)%ptr%client_handle (0, rhm%comm)
end subroutine request_handler_manager_request
!> Test if handler with id has finished.
!!
!! \param[in] id Index for the respective handler.
logical function request_handler_manager_handler_testall (rhm, id) result (flag)
class(request_handler_manager_t), intent(inout) :: rhm
integer, intent(in) :: id
flag = rhm%handler(id)%ptr%testall ()
end function request_handler_manager_handler_testall
!> The master worker listens to handle requests from the slave workers.
!!
!! \param[in]
......@@ -228,14 +220,9 @@ contains
call MPI_WAITSOME (rhm%n_comm_size - 1, reqs, n_done, &
indices, stats)
do i = 1, n_done
!! Do service.
write (ERROR_UNIT, *) "REQUEST FROM : ", stats(i)%MPI_SOURCE, stats(i)%MPI_TAG, stats(i)%MPI_ERROR
write (ERROR_UNIT, *) "REQUEST FOR ID : ", handler(indices(i))
call rhm%handle (handler(indices(i)), stats(i)%MPI_SOURCE)
count_handler = count_handler + 1
!! |todo{sbrass} Old handle has to finish at this point!!! Keep track
!! of that, somehow...
!! \note{}
!! \todo{Keep track of old requests.?}
call MPI_IRECV (handler(indices(i)), 1, MPI_INTEGER, &
indices(i), MPI_ANY_TAG, rhm%comm, reqs(indices(i)))
end do
......@@ -244,7 +231,7 @@ contains
do i = 1, rhm%n_comm_size - 1
call MPI_REQUEST_FREE (reqs(i))
end do
call rhm%handler_waitall ()
! call rhm%handler_waitall ()
end subroutine request_handler_manager_handle_request
subroutine request_handler_manager_handler_waitall (rhm)
......@@ -261,20 +248,26 @@ contains
subroutine request_handler_manager_handler_wait (rhm, id)
class(request_handler_manager_t), intent(in) :: rhm
integer, intent(in) :: id
write (ERROR_UNIT, *) "WAIT FOR HANDLER ", id
call rhm%handler(id)%ptr%waitall ()
end subroutine request_handler_manager_handler_wait
!> Test if handler with id has finished.
!!
!! \param[in] id Index for the respective handler.
logical function request_handler_manager_handler_test (rhm, id) result (flag)
class(request_handler_manager_t), intent(in) :: rhm
integer, intent(in) :: id
flag = rhm%handler(id)%ptr%testall ()
end function request_handler_manager_handler_test
!> Run handler with [[id]], and let the handler know the source of the request.
!!
!! \param[in] id Handler-id.
!! \param[in] request_id Index for the requested handler.
!! \param[in] request_source Source rank.
subroutine request_handler_manager_handle (rhm, request_id, request_source)
class(request_handler_manager_t), intent(inout) :: rhm
integer, intent(in) :: request_id
integer, intent(in) :: request_source
!! \note{sbrass} It's neither necessary nor adversible to just give a
!! "rank". What is the intention? It would be much more versatiler, to let
!! this be done by the extension of request_handler_t???
call rhm%handler(request_id)%ptr%handle (request_source, rhm%comm)
!! \todo{sbrass} Check on success of handle?
!! Is it already finished, or is the finalization postponed?
......
......@@ -24,9 +24,12 @@ module utils
procedure :: begin => cpu_time_begin
procedure :: end => cpu_time_end
procedure :: time => cpu_time_time
procedure :: since_start => cpu_time_since_start
generic :: write(formatted) => print
procedure :: print => cpu_time_print
end type cpu_time_t
public :: cpu_time_t
contains
subroutine cpu_time_print (dtv, unit, iotype, vlist, iostat, iomsg)
class(cpu_time_t), intent(in) :: dtv
......@@ -52,4 +55,10 @@ contains
class(cpu_time_t), intent(in) :: ct
delta = ct%finish - ct%start
end function cpu_time_time
real(r64) function cpu_time_since_start (ct) result (current_delta)
class(cpu_time_t), intent(in) :: ct
call cpu_time (current_delta)
current_delta = current_delta - ct%start
end function cpu_time_since_start
end module utils
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment