This tutorial will introduce the core functions of Ray.
Ray provides Python and Java API s. To use ray in Python, first install ray using the following command: pip install ray. To use ray in Java, first add ray API and ray runtime dependencies to your project. Then we can use ray to parallelize your program. Only Python API s are used here.
1. Parallelize Python functions with Ray tasks
First, import ray and initialize the ray service. Then use @ ray Remote decorates your function and declares that you want to run the function remotely. Finally, use remote() calls this function instead of calling it normally. This remote call generates a future, or ObjectRef, and then you can use ray Get get it.
import ray ray.init() @ray.remote def f(x): return x * x futures = [f.remote(i) for i in range(4)] print(ray.get(futers)) # [0, 1, 4, 9]
2. Parallelize Python classes using Ray Actors
Ray provides actors that allow you to parallelize instances of a class in Python. When you instantiate a class belonging to Ray actor, ray will start a remote instance of the class in the cluster. The actor can then perform remote method calls and maintain its own internal state.
import ray ray.init() # Only call this once @ray.remote class Counter(object): def __init__(self): self.n = 0 def increment(self): self.n+=1 def read(self): return self.n int __name__ == "__main__": counters = [Counter.remote() for i in range(4)] [c.increment.remote() for c in counters] futures = [c.read.remote() for c in counters] print(ray.get(futures)) # [1, 1, 1, 1]
3. Overview of ray Library
Ray has a rich library and a framework ecosystem built on it. The main are:
- Tune: scalable hyper parameter tuning
- RLlib: industrial intensive learning
- Ray Train: distributed deep learning
- Serve: scalable and programmable services
3.1 Tune quick get start
Tune is a library for super parameter adjustment of any scale. With tune, you can start a multi node distributed superparameter scan in less than 10 lines of code. Tune supports any deep learning framework, including PyTorch, TensorFlow, and Keras.
To run this example, you need to install the following: pip install "ray[tune]"
# coding=utf-8 # /usr/bin/env python from ray import tune def objective(step, alpha, beta): return (0.1 + alpha * step / 100) ** (-1) + beta * 0.1 def training_function(config): # Hyperparameters alpha, beta = config["alpha"], config["beta"] for step in range(10): # Iterative training function - can be arbitrary training procedure intermediate_score = objective(step, alpha, beta) # Feed the score back to Tune tune.report(mean_loss=intermediate_score) if __name__ == '__main__': analysis = tune.run( training_function, config={ "alpha": tune.grid_search([0.001, 0.01, 0.1]), "beta": tune.choice([1, 2, 3]) } ) print("Best config: ", analysis.get_best_config( metric="mean_loss", mode="min" )) # get a dataframe for analyzing trial results df = analysis.results_df
If TensorBoard is installed, all test results are automatically displayed:
tensorboard --logdir ~/ray_results
3.2 RLlib quick start
RLlib is an open source library for reinforcement learning based on Ray. It provides high scalability and unified API for various applications.
To run this sample, you need to install the following:
pip install tensorflow # or tensorflow-gpu
pip install "ray[rllib]"
import gym from gym.spaces import Discrete, Box from ray import tune class SimpleCorridor(gym.Env): def __init__(self, config): self.end_pos = config["corridor_length"] self.cur_pos = 0 self.action_space = Discrete(2) self.observation_space = Box(0.0, self.end_pos, shape=(1, )) def reset(self): self.cur_pos = 0 return [self.cur_pos] def step(self, action): if action == 0 and self.cur_pos > 0: self.cur_pos -= 1 elif action == 1: self.cur_pos += 1 done = self.cur_pos >= self.end_pos return [self.cur_pos], 1 if done else 0, done, {} tune.run( "PPO", config={ "env": SimpleCorridor, "num_workers": 4, "env_config": {"corridor_length": 5}})
4. Simple example
This program can filter out the pictures with a specific person's face from a pile of pictures.
import os, json import random from glob import glob from pprint import pprint import cv2 import face_recognition import numpy as np from tqdm import tqdm import ray ray.init() def get_file_paths(root='D:\PHOTOS\*'): """obtain root All under JPG End image file""" queue = [root] files = [] while len(queue) > 0: dir = queue.pop(0) for file in glob(dir): if os.path.isfile(file) and file.endswith('JPG'): files.append(file) else: queue.append(file + '\*') return files def get_my_image(dir=os.path.abspath('database')): """obtain database The standard image file name is david start, And encode the face detected from the standard image file""" db_imgs = [] db_enc = [] for file in glob(dir + '\*'): suff = os.path.split(file)[-1] if suff.startswith('david'): img = cv2.imread(file) encoding = face_recognition.face_encodings(img)[0] db_imgs.append(img) db_enc.append(encoding) return db_imgs, np.array(db_enc) @ray.remote def get_similar_imgs(files): """Functions that need to be distributed""" my_imgs, my_enc = get_my_image() matches = [] pbar = tqdm(total=len(files)) tc = 1 for file in files: # file = r'D:\PHOTOS\DAY 2\5 BARAT\_IMK5917.JPG' img = cv2.imread(file) # r'D:\PHOTOS\1 day 1 haldi mehandi\_IMK4982.JPG' r'D:\PHOTOS\DAY 2\5 BARAT\_IMK5917.JPG' img = cv2.resize(img, (0, 0), fx=0.6, fy=0.6) face_locations = face_recognition.face_locations(img) encoding = face_recognition.face_encodings(img) # print(face_locations) # print("==> Matching ", len(encoding), "Faces") # print(my_enc) idx2 = 0 for enc in encoding: results = face_recognition.compare_faces(my_enc, enc, tolerance=0.3) # print("results=>", results) idx = 0 for res in results: if res: matches.append(file) # print("Found match in", file) y, xx, yy, x = face_locations[idx2] # cv2.imshow(f'img1_{random.randint(1, 1000)}', my_imgs[idx]) # cv2.imshow(f'img2_{random.randint(1, 1000)}', img[y: yy, x: xx]) # cv2.imshow('img3', img) # cv2.waitKey(1) idx += 1 idx2 += 1 if tc%10 == 0: cv2.destroyAllWindows() pbar.update(tc) tc += 1 # break pbar.close() # print("Matched files=>", matches) print(f"[INFO] processed {len(matches)} images") return matches def main(): files = get_file_paths(root=r'G:\PHOTOS\*') print("files count: ", len(files)) pprint(files) process_count = 8 chunk = len(files)//process_count futures = [] print("len->", len(files)) for i in range(process_count): start, end = i*chunk, (i+1)*chunk print(start,end) fs = files[start: end] futures.append(get_similar_imgs.remote(fs)) matches = ray.get(futures) json.dump(matches, open('matches_2.json', 'w')) if __name__ == "__main__": main()
Reference catalogue
https://github.com/Arham-Aalam/facematch/blob/main/main.py
https://docs.ray.io/en/latest/ray-overview/index.html