Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified days/__pycache__/bert.cpython-38.pyc
Binary file not shown.
147 changes: 147 additions & 0 deletions days/dp_resnet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import os
import random
import torch.distributed as dist
import torch.multiprocessing as mp
import torchvision
#import gin
import sys
import torch as t
import numpy as np
from sklearn.datasets import make_moons
from utils import *
import signal
from tqdm import tqdm


def load_data(train, bsz):
data = torchvision.datasets.CIFAR10("~/mlab/datasets/cifar10_" + ("train" if train else "test"),
transform= torchvision.transforms.Compose([
torchvision.transforms.PILToTensor(),
torchvision.transforms.ConvertImageDtype(t.float),
torchvision.transforms.Resize((64, 64))]),
download=False,
train=train)
data = [t.stack([p[0] for p in data]), t.tensor([p[1] for p in data])]
return to_batches(data, bsz, trim=True)

def load_model():
t.random.manual_seed(0)
return torchvision.models.resnet18()

class DistributedDataLoader:
def __init__(
self,
rank,
size,
mini_batch_size=10000,
random_seed=0,
):
self.rank = rank
self.size = size
self.mini_batch_size = mini_batch_size
if rank == 0:
self.batches = load_data(train=True, bsz=mini_batch_size * size)
self.len = t.tensor(len(self.batches))
else:
self.len = t.tensor(-1)
self.batches = None

print("broadcast length from", self.rank)
dist.broadcast(self.len, src=0) #everyone gets 0's len

# Reason to do it this way: put as much data distribution as possibe as late as possible
# because we want to do as much training compute as possible
def __iter__(self):
for i in range(self.len):
x_mb = t.zeros((self.mini_batch_size, 3, 64, 64), dtype=t.float32)
y_mb = t.zeros((self.mini_batch_size), dtype=t.int64)
tensors = [x_mb, y_mb]
#scatter_lists = to_batches(self.batches[i], self.mini_batch_size) if self.batches is not None else [None, None]
if self.batches is not None:
scatter_lists = [list(rearrange(tensor, "(s m) ... -> s m ...", s=self.size)) for tensor in self.batches[i]]
else:
scatter_lists = [None, None]

#dist.scatter_object_list(tensors, scatter_lists, src = 0)
dist.scatter(x_mb, scatter_list=scatter_lists[0], src = 0, async_op = True).wait()
dist.scatter(y_mb, scatter_list=scatter_lists[1], src = 0, async_op = True).wait()
yield tensors

def alladd_grad(model):
# NOT EMPIRCALLY FASTER TO DO ASYNC: does wait regardless
# https://pytorch.org/docs/stable/_modules/torch/distributed/distributed_c10d.html#all_reduce
for param in model.parameters():
dist.all_reduce(param.grad, op=dist.ReduceOp.SUM)

def init_process(rank, size, device, backend="gloo"):
"""Initialize the distributed environment."""
dist.init_process_group(backend, rank=rank, world_size=size)

# Comment this to run on one GPU
if device == "cuda":
device += ":" + str(rank)

print("inited process group", rank, " on device ", device)

model = load_model()
model.train()
model.to(device)
optimizer = t.optim.Adam(model.parameters(), lr=0.005)
dataloader = DistributedDataLoader(rank=rank, size=size)
loss_fn = t.nn.CrossEntropyLoss(reduction="sum")

# train
for epoch in range(40):
for batch_num, (x, y) in enumerate(tqdm(dataloader)):
# NOTE: look out for reduction == mean instead, whichj seems wrong
loss = loss_fn(model(x.to(device)), y.to(device))
#print(f"Loss before: {loss}, pid: {rank}")
optimizer.zero_grad()
loss.backward()
alladd_grad(model)
optimizer.step()
print(f"Epoch: {epoch}, Loss: {loss}")

# test
test_batches = load_data(train=False, bsz=200)
with t.no_grad():
model.eval()
total_loss = 0
total = 0
correct = 0
for x, y in test_batches:
x = x.to(device)
y = y.to(device)
total_loss += loss_fn(model(x.to(device)), y.to(device))
y_hat = t.argmax(model(x.to(device)), dim=1)
total += y_hat.shape[0]
correct += t.sum(y_hat == y)
print(f"Final Loss: {total_loss} and rank {rank} and prop correct {correct / total}")

dist.all_reduce(t.zeros(2), op=dist.ReduceOp.SUM) # syncs processes

if rank == 0:
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)

if __name__ == "__main__":



if len(sys.argv) < 3:
local_parallelism = 2
device = "cpu"
else:
local_parallelism = int(sys.argv[2])
device = "cpu" if sys.argv[3] == "cpu" else "cuda"

os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "29500"

mp.set_start_method("spawn") #breaks if removed
processes = []
for rank in range(local_parallelism): # for each process index
p = mp.Process(target=init_process, args=(rank, local_parallelism, device))
p.start()
processes.append(p)
for p in processes:
p.join()
169 changes: 169 additions & 0 deletions days/dp_simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import os
import random
import torch.distributed as dist
import torch.multiprocessing as mp
#import gin
import sys
import torch as t
import numpy as np
from sklearn.datasets import make_moons
from utils import *
import os
import signal

# class DistributedDataLoaderSlow:
# def __init__(
# self,
# rank,
# size,
# mini_batch_size=4,
# random_seed=0,
# ):
# self.rank = rank
# self.size = size
# if rank == 0:
# # load and shuffle x and y
# X, y = make_moons(n_samples=4 * 4 * 5, noise=0.1, random_state=354)
# self.data_tensors = [t.Tensor(X).float(), t.Tensor(y).float()]
# t.manual_seed(random_seed)
# perm = t.randperm(X.shape[0])
# for i, ten in enumerate(self.data_tensors): # shuffle
# self.data_tensors[i] = ten[perm]

# # batches are of size mbs * size, as mbs is the batch size for one process
# # minibatches are of size mbs. There are "size" of them in a list, forming a batch
# # so self.batches is a list of batches, which are each a list of minibatches, which are each
# # a list of parameters (like x and y), which are each a tensor of elements
# self.batches = [to_batches(batch, mini_batch_size, trim=True)
# for batch in to_batches(self.data_tensors, mini_batch_size * size)]
# self.len = t.tensor(len(self.batches))
# print("broadcast length from", self.rank)
# dist.broadcast(self.len, src=0) #everyone gets 0's len
# else:
# self.len = t.tensor(-1)
# print("broadcast length from", self.rank)
# dist.broadcast(self.len, src=0) #everyone gets 0's len
# self.batches = [[] for _ in range(self.len)]

# dist.broadcast_object_list(self.batches, src=0) #everyone gets 0's len
# self.mini_batches = map(lambda x : x[rank], self.batches)

# def __iter__(self):
# return self.mini_batches

class DistributedDataLoader:
def __init__(
self,
rank,
size,
mini_batch_size=4,
random_seed=0,
):
self.rank = rank
self.size = size
if rank == 0:
# load and shuffle x and y
X, y = make_moons(n_samples=4 * 4 * 5, noise=0.1, random_state=354)
self.data_tensors = [t.Tensor(X).float(), t.Tensor(y).float()]
t.manual_seed(random_seed)
perm = t.randperm(X.shape[0])
for i, ten in enumerate(self.data_tensors): # shuffle
self.data_tensors[i] = ten[perm]

# batches are of size mbs * size, as mbs is the batch size for one process
# minibatches are of size mbs. There are "size" of them in a list, forming a batch
# so self.batches is a list of batches, which are each a list of minibatches, which are each
# a list of parameters (like x and y), which are each a tensor of elements
self.batches = [to_batches(batch, mini_batch_size, trim=True) # chops the last batch if necessary
for batch in to_batches(self.data_tensors, mini_batch_size * size)]
self.len = t.tensor(len(self.batches))
self.batches = iter(self.batches)
else:
self.len = t.tensor(-1)
self.batches = None

print("broadcast length from", self.rank)
dist.broadcast(self.len, src=0) #everyone gets 0's len

# Reason to do it this way: put as much data distribution as possibe as late as possible
# because we want to do as much training compute as possible
def __iter__(self):
for _ in range(self.len):
if self.batches is not None:
mini_batches = next(self.batches)
else:
mini_batches = [None for _ in range(self.size)]
dist.broadcast_object_list(mini_batches, src=0)
my_batch = mini_batches[self.rank]
yield my_batch


def alladd_grad(model):

# if you do non async, does these operations sequentially
# Async starts them all whenever, then waits for them all to finish before continuing
reduce_ops = [
dist.all_reduce(param.grad, op=dist.ReduceOp.SUM, async_op=True)
for param in model.parameters()
]
for op in reduce_ops:
op.wait()


def init_process(rank, size, device, backend="gloo"):
"""Initialize the distributed environment."""
dist.init_process_group(backend, rank=rank, world_size=size)

print("test:", os.environ["test"])

print("inited process group", rank)

# init model, optim, data
t.random.manual_seed(0)
model = t.nn.Sequential(t.nn.Linear(2, 20), t.nn.Linear(20, 1))
model.train()
model.to(device)
optimizer = t.optim.Adam(model.parameters(), lr=1e-4)
dataloader = DistributedDataLoader(rank=rank, size=size)

# train
for batch_num, (x, y) in enumerate(dataloader):
# print("batch", batch)
loss = t.sum((model(x.to(device)) - y.to(device)) ** 2)
#print(f"Loss before: {loss}, pid: {rank}")
optimizer.zero_grad()
loss.backward()
alladd_grad(model) # broadcast gradients
optimizer.step()

# print(rank, "loss", loss.cpu().detach().numpy())
#print(rank, batch_num)
# print(rank, "done training")

# total_loss = 0
# for x, y in dataloader:
# total_loss += t.sum((model(x.to(device)) - y.to(device)) ** 2)
# print(f"Final Loss: {total_loss} and pid {rank}")

dist.all_reduce(t.zeros(2), op=dist.ReduceOp.SUM) # syncs processes, look into?

# ps -eg | test.txt

if rank == 0:
os.killpg(os.getpgid(os.getpid()), signal.SIGKILL)

if __name__ == "__main__":
#print(t.cuda.get_device_capability("cuda:7"))

local_parallelism = 2 if len(sys.argv) < 3 else int(sys.argv[2])
device = "cpu" if sys.argv[3] == "cpu" else "cuda"
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "29500"
os.environ["test"] = "hello"

mp.set_start_method("spawn") #breaks if removed
for rank in range(local_parallelism): # for each process index
p = mp.Process(target=init_process, args=(rank, local_parallelism, device))
p.start()


2 changes: 1 addition & 1 deletion days/hpsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def train(optimizer, num_epochs, lr):
"Model.P": [30],
"data_train.image_name": ["image_match1.png"],
},
"local",
"remote",
)
elif sys.argv[1] == "work":
params = json.loads(os.environ["PARAMS"])
Expand Down
Loading