Skip to content

Commit 343c4ce

Browse files
author
ksiero@man.poznan.pl
committed
Spack (Ruche) compatible
1 parent 712a58f commit 343c4ce

11 files changed

Lines changed: 562 additions & 4 deletions

File tree

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
cmake_minimum_required(VERSION 3.9)
2525
project(pdi_examples LANGUAGES C)
2626

27+
find_package(spdlog)
2728
find_package(MPI REQUIRED COMPONENTS C)
2829
find_package(paraconf REQUIRED COMPONENTS C)
2930
find_package(PDI 0.7 REQUIRED COMPONENTS C)

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ cd tutorial
3535

3636
Before compilation, configure the tutorial by detecting all dependencies:
3737
```bash
38-
pdirun cmake .
38+
cmake .
3939
```
4040

4141
\attention
@@ -44,7 +44,7 @@ If you installed PDI in a standard path, the `pdirun` prefix is never required.
4444
Once you have correctly modified each exercise according to instructions, you
4545
can compile it by running:
4646
```bash
47-
pdirun make ex?
47+
make ex?
4848
```
4949
Where `?` is the number of the exercise.
5050

@@ -53,15 +53,15 @@ Where `?` is the number of the exercise.
5353

5454
You can run each exercise with the following command:
5555
```bash
56-
pdirun mpirun -n 4 ./ex?
56+
/usr/bin/srun -n 4 ./ex?
5757
```
5858
Where `?` is the number of the exercise and 4 represents the number of MPI
5959
processes to use.
6060

6161
To store the logs for later comparison, you can use the following command (for
6262
example for ex2.):
6363
```bash
64-
pdirun mpirun -n 1 ./ex2 > ex2.result.log
64+
/usr/bin/srun -n 1 ./ex2 > ex2.result.log
6565
```
6666

6767
Now you're ready to work, **good luck**!

deisa_example/CMakeLists.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
cmake_minimum_required(VERSION 3.9)
2+
project(Coupling LANGUAGES C CXX)
3+
4+
find_package(MPI REQUIRED COMPONENTS CXX C)
5+
find_package(paraconf REQUIRED COMPONENTS C )
6+
find_package(PDI 1.4 REQUIRED COMPONENTS C)
7+
8+
set(CMAKE_C_STANDARD 99)
9+
10+
add_executable(simulation simulation.c)
11+
target_link_libraries(simulation m MPI::MPI_C paraconf::paraconf PDI::pdi)
12+

deisa_example/Launcher.sh

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/bin/bash
2+
3+
DIR=$PWD
4+
5+
### prescript.py is used to create the configuration file that is shared betwwen the simulation and the Dask cluster
6+
# sys.argv[1] : global_size.height
7+
# sys.argv[2] : global_size.width
8+
# sys.argv[3] : parallelism.height
9+
# sys.argv[4] : parallelism.width
10+
# sys.argv[5] : generation
11+
# sys.argv[6] : nworkers
12+
13+
source $WORKDIR/spack/share/spack/setup-env.sh
14+
spack load cmake@3.22.1
15+
spack load pdiplugin-deisa
16+
spack load /hbohtbo #pdiplugin-mpi
17+
18+
NWORKER=4
19+
20+
PARALLELISM1=2
21+
PARALLELISM2=2
22+
23+
DATASIZE1=1024
24+
DATASIZE2=1024
25+
26+
GENERATION=5
27+
28+
mkdir -p $WORKDIR/Deisa
29+
WORKSPACE=$(mktemp -d -p $WORKDIR/Deisa/ Dask-run-XXX)
30+
cd $WORKSPACE
31+
cp $DIR/simulation.yml $DIR/*.py $DIR/Script.sh $DIR/Launcher.sh $DIR/*.c $DIR/CMakeLists.txt .
32+
pdirun cmake .
33+
make -B simulation
34+
echo Running $WORKSPACE
35+
`which python` prescript.py $DATASIZE1 $DATASIZE2 $PARALLELISM1 $PARALLELISM2 $GENERATION $NWORKER
36+
sbatch Script.sh

deisa_example/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Dask-Enabled In Situ Analytics
2+
This repository contains an example on how deisa can be used.
3+
4+
## Requirement :
5+
- [Deisa PDI plugin](https://github.com/GueroudjiAmal/deisa)
6+
- [Dask Distributed Deisa version repo](https://github.com/GueroudjiAmal/distributed)
7+
8+
## How it works ?
9+
10+
A simulation can be instrumented in PDI to make its internal data available for **_DEISA_**. The plugin retrieves it, creates corresponding keys, and sends it to Dask workers.
11+
12+
Internally, a **_DEISA Bridge_** is created per MPI process. Once a piece of data is shared with PDI, the Bridge sends it to a worker that has been chosen in a round-robin fashion.
13+
14+
**_DEISA_** python library implements a **_DEISA Adaptor_**. This component is used from the Dask client-side to create Dask arrays describing the data generated by the simulation. The **_DEISA Adaptor_** waits for an array descriptor to be sent from the **_DEISA Bridge_** in MPI rank 0. This descriptor is a dictionary with data names as keys and a dictionary containing the sizes, dimensions, and chunk sizes, as values.
15+
The **_DEISA Adaptor_** uses this information to create Dask arrays, that can be retrieved by calling `get_data()` method.
16+
17+
## Files :
18+
- simulation.c : is a toy example of a C simulation code, here we have used heat 2D from [PDI examples](https://pdi.dev/master/PDI_example.html).
19+
- simulation.yml : is the PDI configuration code.
20+
- dask-interface.py : contains **_DEISA_** python libaray (Bridge and Adaptor classes).
21+
- Client.py containts an example of a python script for analytics. It it run is the Dask Client.
22+
- prescript.py creates a file Config.yml that contains simulation configuration such as the size of data, the number of timesteps and the domain decomposition.
23+
- Launcher.sh and Script.sh can be used to launch the simulation and Dask cluster in [Ruche](https://mesocentre.pages.centralesupelec.fr/user_doc/)
24+

deisa_example/Script.sh

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#!/bin/bash
2+
3+
#SBATCH -J dask-cluster
4+
#SBATCH -A dask_coupling
5+
#SBATCH --time=01:00:00
6+
#SBATCH --nodes=1
7+
#SBATCH --partition=cpu_med
8+
#SBATCH --exclusive
9+
10+
11+
NPROC=4 # Total number of processes
12+
NPROCPNODE=4 # Number of processes per node
13+
NWORKERPNODE=4 # Number of Dask workers per node
14+
15+
SCHEFILE=scheduler.json
16+
17+
source $WORKDIR/spack/share/spack/setup-env.sh
18+
spack load pdiplugin-deisa
19+
spack load /hbohtbo #pdiplugin-mpi
20+
spack load py-bokeh
21+
22+
# Launch Dask Scheduler in a 1 Node and save the connection information in $SCHEFILE
23+
echo launching Scheduler
24+
srun --cpu-bind=verbose --ntasks=1 --nodes=1 -l \
25+
--output=scheduler.log \
26+
dask-scheduler \
27+
--interface ib0 \
28+
--scheduler-file=$SCHEFILE &
29+
30+
# Wait for the SCHEFILE to be created
31+
while ! [ -f $SCHEFILE ]; do
32+
sleep 3
33+
echo -n .
34+
done
35+
36+
# Connect the client to the Dask scheduler
37+
echo Connect Master Client
38+
`which python` client.py &
39+
client_pid=$!
40+
41+
# Launch Dask workers in the rest of the allocated nodes
42+
echo Scheduler booted, Client connected, launching workers
43+
srun --cpu-bind=verbose -l \
44+
--output=worker-%t.log \
45+
dask-worker \
46+
--interface ib0 \
47+
--local-directory /tmp \
48+
--nprocs $NWORKERPNODE \
49+
--scheduler-file=${SCHEFILE} &
50+
51+
# Launch the simulation code
52+
echo Running Simulation
53+
pdirun srun --ntasks=$NPROC --ntasks-per-node=$NPROCPNODE -l ./simulation &
54+
55+
# Wait for the client process to be finished
56+
wait $client_pid
57+
58+

deisa_example/client.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import yaml
2+
from dask_interface import Initialization
3+
import dask
4+
import dask.array as da
5+
from dask.distributed import performance_report
6+
7+
# Get configuration
8+
with open(r'config.yml') as file:
9+
data = yaml.load(file, Loader=yaml.FullLoader)
10+
Sworkers = data["workers"]
11+
12+
# Scheduler file name
13+
scheduler_info = 'scheduler.json'
14+
15+
# Initialize the Deisa Adaptor
16+
Adaptor = Initialization(Sworkers, scheduler_info)
17+
18+
# Check if client version is compatible with scheduler version
19+
Adaptor.client.get_versions(check=True)
20+
21+
# Get data descriptor as a dict of Dask arrays
22+
arrays = Adaptor.get_data()
23+
24+
# py-bokeh is needed if you wanna see the perf report
25+
with performance_report(filename="dask-report.html"):
26+
# Get the Dask array global_t
27+
gt = arrays["global_t"]
28+
#gt = gt.rechunk({1: 'auto', 2: 'auto'})
29+
print(gt.chunks)
30+
# Construct a lazy task graph
31+
cpt = (gt.sum() - gt.mean())*5.99 / gt.mean()
32+
# Submit the task graph to the scheduler
33+
s = Adaptor.client.compute(cpt, release=True)
34+
# Print the result, note that "s" is a future object, to get the result of the computation, we call `s.result()` to retreive it.
35+
print(s.result())

deisa_example/dask_interface.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import sys
2+
import dask
3+
import numpy as np
4+
import dask.array as da
5+
from dask.distributed import Client, Event, get_client, comm, Queue, Future, Variable
6+
from dask.delayed import Delayed
7+
import time
8+
import asyncio
9+
import json
10+
import itertools
11+
12+
13+
class metadata:
14+
index = list()
15+
data = ""
16+
shap = None
17+
typ = ""
18+
def __init__(self, name):
19+
self.name = name
20+
21+
def connect(sched_file):
22+
sched = ''.join(chr(i) for i in sched_file)
23+
with open(sched[:-1]) as f:
24+
s = json.load(f)
25+
adr = s["address"]
26+
client = get_client(adr)
27+
return client
28+
29+
30+
def init(sched_file, rank, size, arrays, deisa_arrays_dtype):
31+
client = connect(sched_file)
32+
return Bridge(client, size, rank, arrays, deisa_arrays_dtype)
33+
34+
class Bridge:
35+
workers = []
36+
def __init__(self, Client, Ssize, rank, arrays, deisa_arrays_dtype):
37+
self.client = Client
38+
self.rank = rank
39+
listw = Variable("workers").get()
40+
if Ssize > len(listw): # more processes than workers
41+
self.workers = [listw[rank%len(listw)]]
42+
else:
43+
k = len(listw)//Ssize # more workers than processes
44+
self.workers = listw[rank*k:rank*k+ k]
45+
self.arrays = arrays
46+
for ele in self.arrays:
47+
self.arrays[ele]["dtype"] = str(deisa_arrays_dtype[ele])
48+
self.arrays[ele]["timedim"] = self.arrays[ele]["timedim"][0]
49+
self.position = [self.arrays[ele]["starts"][i]//self.arrays[ele]["subsizes"][i] for i in range(len(np.array(self.arrays[ele]["sizes"])))]
50+
if rank==0:
51+
Queue("Arrays").put(self.arrays) # If and only if I have a perfect domain decomposition
52+
53+
54+
def create_key(self, timestep, name):
55+
self.position[self.arrays[name]["timedim"]]= timestep
56+
position = tuple(self.position)
57+
return ("deisa-"+name, position)
58+
59+
def publish_data(self, data, data_name, timestep):
60+
event = Event("Done")
61+
if (timestep==0):
62+
event.wait()
63+
key = self.create_key(timestep, data_name)
64+
shap = list(data.shape)
65+
new_shape = tuple(shap[:self.arrays[data_name]["timedim"]]+[1]+shap[self.arrays[data_name]["timedim"]:])
66+
data.shape = new_shape #will not copy, if not possible raise an error so handle it :p
67+
#data = data.reshape(new_shape)
68+
f = self.client.scatter(data, direct = True, workers=self.workers, keys=[key], deisa=True)
69+
while (f.status != 'finished'):
70+
f = self.client.scatter(data, direct = True, workers=self.workers, keys=[key], deisa=True)
71+
data=None
72+
class Adaptor :
73+
adr = ""
74+
client = None
75+
workers = []
76+
queues = []
77+
def __init__(self, Sworker, scheduler_info):
78+
with open(scheduler_info) as f:
79+
s = json.load(f)
80+
self.adr = s["address"]
81+
self.client = Client(self.adr, serializers=['dask', 'pickle']) # msgpack pour grand message ne serialize pas
82+
dask.config.set({"distributed.deploy.lost-worker-timeout": 60, "distributed.workers.memory.spill":0.97, "distributed.workers.memory.target":0.95, "distributed.workers.memory.terminate":0.99 })
83+
self.workers = [comm.get_address_host_port(i,strict=False) for i in self.client.scheduler_info()["workers"].keys()]
84+
while (len(self.workers)!= Sworker):
85+
self.workers = [comm.get_address_host_port(i,strict=False) for i in self.client.scheduler_info()["workers"].keys()]
86+
Variable("workers").set(self.workers)
87+
88+
89+
def create_array(self, name, shape, chunksize, dtype, timedim):
90+
chunks_in_each_dim = [shape[i]//chunksize[i] for i in range(len(shape))]
91+
l = list(itertools.product(*[range(i) for i in chunks_in_each_dim]))
92+
items = []
93+
for m in l:
94+
f=Future(key=("deisa-"+name,m), inform=True, deisa=True)
95+
d = da.from_delayed(dask.delayed(f), shape=chunksize, dtype=dtype)
96+
items.append([list(m),d])
97+
ll = self.array_sort(items)
98+
arrays = da.block(ll)
99+
return arrays
100+
101+
def create_array_list(self, name, shape, chunksize, dtype, timedim): #list arrays, one for each time step.
102+
chunks_in_each_dim = [shape[i]//chunksize[i] for i in range(len(shape))]
103+
l = list(itertools.product(*[range(i) for i in chunks_in_each_dim]))
104+
items = []
105+
for m in l:
106+
f=Future(key=("deisa-"+name,m), inform=True, deisa=True)
107+
d = da.from_delayed(dask.delayed(f), shape=chunksize, dtype=dtype)
108+
items.append([list(m),d])
109+
ll = self.array_sort(items)
110+
for i in ll:
111+
arrays.append(da.block(i))
112+
return arrays
113+
114+
def array_sort(self, ListDs):
115+
if len(ListDs[0][0]) == 0:
116+
return ListDs[0][1]
117+
else:
118+
dico = dict()
119+
for e in ListDs:
120+
dico.setdefault(e[0][0],[]).append([e[0][1:], e[1]])
121+
return [self.array_sort(dico[k]) for k in sorted(dico.keys())]
122+
123+
def get_data(self, as_list=False):
124+
arrays = dict()
125+
self.arrays_desc = Queue("Arrays").get()
126+
for name in self.arrays_desc:
127+
if not as_list:
128+
arrays[name] = self.create_array(name,self.arrays_desc[name]["sizes"], self.arrays_desc[name]["subsizes"], self.arrays_desc[name]["dtype"], self.arrays_desc[name]["timedim"])
129+
else: #TODO test this
130+
arrays[name] = self.create_array_list(name,self.arrays_desc[name]["sizes"], self.arrays_desc[name]["subsizes"], self.arrays_desc[name]["dtype"], self.arrays_desc[name]["timedim"])
131+
#Barrier after the creation of all the dask arrays
132+
e = Event("Done")
133+
e.set()
134+
return arrays
135+
136+
def Initialization(Sworker, scheduler_info):
137+
return Adaptor(Sworker, scheduler_info)
138+

deisa_example/prescript.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import yaml
2+
import sys
3+
# sys.argv[1] : global_size.height
4+
# sys.argv[2] : global_size.width
5+
# sys.argv[3] : parallelism.height
6+
# sys.argv[4] : parallelism.width
7+
# sys.argv[5] : generation
8+
# sys.argv[6] : nworkers
9+
10+
with open('config.yml', 'w') as file:
11+
data = {"global_size": {"height": int(sys.argv[1]), "width": int(sys.argv[2])},
12+
"parallelism": { "height": int(sys.argv[3]), "width": int(sys.argv[4])},
13+
"MaxtimeSteps": int(sys.argv[5]),
14+
"workers": int(sys.argv[6])}
15+
if data:
16+
with open('config.yml','w') as file:
17+
yaml.safe_dump(data, file)
18+

0 commit comments

Comments
 (0)