Ray distributed simple tutorial

This tutorial will introduce the core functions of Ray.

Ray provides Python and Java API s. To use ray in Python, first install ray using the following command: pip install ray. To use ray in Java, first add ray API and ray runtime dependencies to your project. Then we can use ray to parallelize your program. Only Python API s are used here.

1. Parallelize Python functions with Ray tasks

First, import ray and initialize the ray service. Then use @ ray Remote decorates your function and declares that you want to run the function remotely. Finally, use remote() calls this function instead of calling it normally. This remote call generates a future, or ObjectRef, and then you can use ray Get get it.

import ray

def f(x):
	return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futers))  # [0, 1, 4, 9]

2. Parallelize Python classes using Ray Actors

Ray provides actors that allow you to parallelize instances of a class in Python. When you instantiate a class belonging to Ray actor, ray will start a remote instance of the class in the cluster. The actor can then perform remote method calls and maintain its own internal state.

import ray
ray.init()  # Only call this once

class Counter(object):
	def __init__(self):
		self.n = 0
	def increment(self):
	def read(self):
		return self.n

int __name__ == "__main__":
	counters = [Counter.remote() for i in range(4)]
	[c.increment.remote() for c in counters]
	futures = [c.read.remote() for c in counters]
	print(ray.get(futures))  # [1, 1, 1, 1]

3. Overview of ray Library

Ray has a rich library and a framework ecosystem built on it. The main are:

  • Tune: scalable hyper parameter tuning
  • RLlib: industrial intensive learning
  • Ray Train: distributed deep learning
  • Serve: scalable and programmable services

3.1 Tune quick get start

Tune is a library for super parameter adjustment of any scale. With tune, you can start a multi node distributed superparameter scan in less than 10 lines of code. Tune supports any deep learning framework, including PyTorch, TensorFlow, and Keras.

To run this example, you need to install the following: pip install "ray[tune]"

# coding=utf-8
# /usr/bin/env python
from ray import tune

def objective(step, alpha, beta):
    return (0.1 + alpha * step / 100) ** (-1) + beta * 0.1

def training_function(config):
    # Hyperparameters
    alpha, beta = config["alpha"], config["beta"]
    for step in range(10):
        # Iterative training function - can be arbitrary training procedure
        intermediate_score = objective(step, alpha, beta)
        # Feed the score back to Tune

if __name__ == '__main__':
    analysis = tune.run(
            "alpha": tune.grid_search([0.001, 0.01, 0.1]),
            "beta": tune.choice([1, 2, 3])
    print("Best config: ", analysis.get_best_config(
        metric="mean_loss", mode="min"
    # get a dataframe for analyzing trial results
    df = analysis.results_df

If TensorBoard is installed, all test results are automatically displayed:

tensorboard --logdir ~/ray_results

3.2 RLlib quick start

RLlib is an open source library for reinforcement learning based on Ray. It provides high scalability and unified API for various applications.
To run this sample, you need to install the following:
pip install tensorflow # or tensorflow-gpu
pip install "ray[rllib]"

import gym
from gym.spaces import Discrete, Box
from ray import tune

class SimpleCorridor(gym.Env):
    def __init__(self, config):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(0.0, self.end_pos, shape=(1, ))

    def reset(self):
        self.cur_pos = 0
        return [self.cur_pos]

    def step(self, action):
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        return [self.cur_pos], 1 if done else 0, done, {}

        "env": SimpleCorridor,
        "num_workers": 4,
        "env_config": {"corridor_length": 5}})

4. Simple example

This program can filter out the pictures with a specific person's face from a pile of pictures.

import os, json
import random
from glob import glob
from pprint import pprint

import cv2
import face_recognition
import numpy as np
from tqdm import tqdm
import ray

def get_file_paths(root='D:\PHOTOS\*'):
	"""obtain root All under JPG End image file"""
    queue = [root]
    files = []
    while len(queue) > 0:
        dir = queue.pop(0)
        for file in glob(dir):
            if os.path.isfile(file) and file.endswith('JPG'):
                queue.append(file + '\*')
    return files

def get_my_image(dir=os.path.abspath('database')):
	"""obtain database The standard image file name is david start, And encode the face detected from the standard image file"""
    db_imgs = []
    db_enc = []
    for file in glob(dir + '\*'):
        suff = os.path.split(file)[-1]
        if suff.startswith('david'):
            img = cv2.imread(file)
            encoding = face_recognition.face_encodings(img)[0]

    return db_imgs, np.array(db_enc)

def get_similar_imgs(files):
	"""Functions that need to be distributed"""
    my_imgs, my_enc = get_my_image()
    matches = []
    pbar = tqdm(total=len(files))
    tc = 1
    for file in files:
        # file = r'D:\PHOTOS\DAY 2\5 BARAT\_IMK5917.JPG'
        img = cv2.imread(file) # r'D:\PHOTOS\1 day 1 haldi mehandi\_IMK4982.JPG' r'D:\PHOTOS\DAY 2\5 BARAT\_IMK5917.JPG'
        img = cv2.resize(img, (0, 0), fx=0.6, fy=0.6)

        face_locations = face_recognition.face_locations(img)
        encoding = face_recognition.face_encodings(img)
        # print(face_locations)
        # print("==> Matching ", len(encoding), "Faces")
        # print(my_enc)

        idx2 = 0
        for enc in encoding:
            results = face_recognition.compare_faces(my_enc, enc, tolerance=0.3)
            # print("results=>", results)
            idx = 0
            for res in results:
                if res:
                    # print("Found match in", file)
                    y, xx, yy, x = face_locations[idx2]
                    # cv2.imshow(f'img1_{random.randint(1, 1000)}', my_imgs[idx])
                    # cv2.imshow(f'img2_{random.randint(1, 1000)}', img[y: yy, x: xx])
                    # cv2.imshow('img3', img)
                    # cv2.waitKey(1)
                idx += 1
            idx2 += 1
        if tc%10 == 0:
        tc += 1
        # break
    # print("Matched files=>", matches)
    print(f"[INFO] processed {len(matches)} images")
    return matches

def main():
    files = get_file_paths(root=r'G:\PHOTOS\*')
    print("files count: ", len(files))
    process_count = 8
    chunk = len(files)//process_count
    futures = []
    print("len->", len(files))
    for i in range(process_count):
        start, end = i*chunk, (i+1)*chunk
        fs = files[start: end]
    matches = ray.get(futures)
    json.dump(matches, open('matches_2.json', 'w'))

if __name__ == "__main__":

