가장 쉬운 입자 군집 최적화 (PSO) 예제 [python 3] by 바죠

가장 쉬운 입자 군집 최적화 (PSO) 예제 [python 3]
particle swarm optimization algorithm



\[\vec{x}\]
\[f(\vec{x})\]
우리는 여러 가지 잠정적인 해들을 동시에 고려한다. 
\[ \{ \vec{x}\}\]
\[\{f(\vec{x})\}\]


파이썬 리스트를 이용한다. 
동일한 형식으로 정의된 객체를 다수 만들 것이다. 이들은 동일한 형식이지만 여러개가 존재한다. 
파이썬 리스트 항목으로 저장할 수 있다. 
그런데 객체가 클래스로 정의 된 것이 될 수도 있다. 
파이썬 리스트는 매우 포괄적인 것을 하나의 항목으로 인정한다.
먼저 PARTICLE이라는 객체를 정의한다. 
[입자 하나에 대한 많은 정보를 가지고 있다. 위치, 속도, 목적함수 값, 역대 최적값, 역대 최적 위치] 
[입자마다 고유의 특성이 다를 수 있다. 운동 특성이 다를 수 있다. ]
이것들로 구성된 또 다른 종류의 객체를 정의한다. 
[입자들로 구성된 무리를 정의한다.]


PSO 계산의 효율성을 높이기 위해서 국소최적화를 사용하였다.

------------------------------------------------------------------------------------------------------------------------
import random
import numpy as np
from scipy.optimize import minimize
def functuser(x):
    case=3

    if case == 1:
       total=0.
       for j in range(len(x)):
           total+=(x[j])**2
    if case == 2:
#    Rastrigin
       total=10.*len(x)
       for j in range(len(x)):
           total+=x[j]**2-10.*np.cos(2.*np.pi*x[j])
    if case == 3:
#   Rosenbrock
       xarray0=np.zeros(len(x))
       for j in range(len(x)):
          xarray0[j]=x[j]
       total=sum(100.0*(xarray0[1:]-xarray0[:-1]**2.0)**2.0 + (1-xarray0[:-1])**2.0)
    if case == 4:
#   Styblinski-Tang
       total=0.
       for j in range(len(x)):
           total+=(x[j]**4-16.*x[j]**2+5.*x[j])/2.

    return total
class Particle:
    def __init__(self,startx0,ww,c1,c2,xbounds,lverbo):
        self.bnds=[ (xbounds[i][0],xbounds[i][1]) for i in range(len(startx0)) ]
        self.position_i=[]
        self.velocity_i=[]
        self.position_best_i=[]
        self.obj_best_i=9e99
        self.obj_i=9e99
        self.dimensions=len(startx0)
        self.ww=ww+(random.random()-0.5)*0.2
        self.c1=c1+(random.random()-0.5)*0.2*1.
        self.c2=c2+(random.random()-0.5)*0.2*1.
        if lverbo:
           print(self.ww,self.c1,self.c2)
        for j in range(self.dimensions):
            self.velocity_i.append(random.uniform(-1,1))
            self.position_i.append( startx0[j]*( 1.+(random.random()-0.5)*0.) )
        if random.random() < 0.00000000000001:
           for j in range(self.dimensions):
               self.position_i[j]=self.bnds[j][0]+(self.bnds[j][1]-self.bnds[j][0])*random.random()
        for j in range(self.dimensions):
            if self.position_i[j] > self.bnds[j][1] or self.position_i[j] < self.bnds[j][0]:
               self.position_i[j]=self.bnds[j][0]+(self.bnds[j][1]-self.bnds[j][0])*random.random()
        self.position_best_i=self.position_i.copy()
    def evaluate(self,objfunct):
#       self.obj_i=objfunct(self.position_i)
        xarray0=np.zeros(self.dimensions)
        for j in range(self.dimensions):
            xarray0[j]=self.position_i[j]
#       res=minimize(objfunct,xarray0,method='nelder-mead',options={'xtol':1e-6,'disp':True})
        res=minimize(objfunct,xarray0,method='TNC', bounds=self.bnds, jac='2-point',options={'maxfun':1000,'xtol':1e-6,'disp':False})
        self.position_i=res.x.copy()
        self.obj_i=res.fun
        if self.obj_i < self.obj_best_i :
           self.position_best_i=self.position_i.copy()
           self.obj_best_i=self.obj_i
    def update_velocity(self,position_best_g):
        for j in range(self.dimensions):
            vc=self.c1*(self.position_best_i[j]-self.position_i[j])*random.random()
            vs=self.c2*(position_best_g[j]-self.position_i[j])*random.random()
            self.velocity_i[j]=self.ww*self.velocity_i[j]+vc+vs
    def update_position(self):
        for j in range(self.dimensions):
            self.position_i[j]=self.position_i[j]+self.velocity_i[j]
            if self.position_i[j] > self.bnds[j][1] or self.position_i[j] < self.bnds[j][0]:
                  self.position_i[j]=self.bnds[j][0]+(self.bnds[j][1]-self.bnds[j][0])*random.random()
class PSO():
    def __init__(self, objfunct, startx0, xbounds, ww=0.5, c1=1.0, c2=2.0, nparticles=50, maxiter=50000, verbose=False):
        obj_best_g=9e99
        position_best_g=[]
        swarm=[]
        for i in range(nparticles):
            if i > 0 :
                   for j in range(len(startx0)):
                        startx0[j]=xbounds[j][0]+(xbounds[j][1]-xbounds[j][0])*np.random.random()
            swarm.append(Particle(startx0,ww,c1,c2,xbounds,verbose))
        it=0
        while it < maxiter:
            if verbose:
               print(f'iter: {it:>6d} best solution: {obj_best_g:16.8e}')
            for i in range(nparticles):
                swarm[i].evaluate(objfunct)
                if swarm[i].obj_i < obj_best_g :
                   position_best_g=list(swarm[i].position_i)
                   obj_best_g=float(swarm[i].obj_i)
            for i in range(nparticles):
                swarm[i].update_velocity(position_best_g)
                swarm[i].update_position()
            it+=1
        print('\nfinal solution:')
        print(f'   > {position_best_g}')
        print(f'   > {obj_best_g}\n')
        if True:
           abc=np.zeros(nparticles)
           abcvec=np.zeros((nparticles,len(startx0)))
           for i in range(nparticles):
               abc[i]=swarm[i].obj_best_i
               abcvec[i]=swarm[i].position_best_i
           idx=abc.argsort()
           abc=abc[idx]
           abcvec=abcvec[idx,:]
           for i in range(nparticles):
               print(abc[i])
               print(abcvec[i,:])

startx0=[]
xbounds=[]
for j in range(10):
    startx0.append(np.random.random()-0.5)
for j in range(len(startx0)):
    xbounds.append((-20., 20.))
PSO(functuser, startx0, xbounds, ww=0.5, c1=1.0, c2=2.0, nparticles=50, maxiter=500, verbose=True)

------------------------------------------------------------------------------------------------------------------------


------------------------------------------------------------------------------------------------------------------------
from pyswarms.single.global_best import GlobalBestPSO
import numpy as np
# create a parameterized version of the classic Rosenbrock unconstrained optimzation function
def rosenbrock_with_args(x, a, b, c=0):
    f = (a - x[:, 0]) ** 2 + b * (x[:, 1] - x[:, 0] ** 2) ** 2 + c
    return f
# instatiate the optimizer
x_max = 10 * np.ones(2)
x_min = -1 * x_max
bounds = (x_min, x_max)
options = {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
optimizer = GlobalBestPSO(n_particles=50, dimensions=2, options=options, bounds=bounds)
# now run the optimization, pass a=1 and b=100 as a tuple assigned to args
cost, pos = optimizer.optimize(rosenbrock_with_args, 1000, a=1, b=100, c=0)


------------------------------------------------------------------------------------------------

import numpy as np
import os
import os.path
from scipy.optimize import dual_annealing
from scipy.optimize import minimize
from scipy import optimize
def append_new_line(file_name, text_to_append):
    with open(file_name, "a+") as file_object:
        file_object.seek(0)
        data = file_object.read(100)
        if len(data) > 0:
            file_object.write("\n")
        file_object.write(text_to_append)
def append_multiple_lines(file_name, lines_to_append):
    with open(file_name, "a+") as file_object:
        appendEOL = False
        file_object.seek(0)
        data = file_object.read(100)
        if len(data) > 0:
            appendEOL = True
        for line in lines_to_append:
            if appendEOL == True:
                file_object.write("\n")
            else:
                appendEOL = True
            file_object.write(line)

lw = [-15.0] * 10
up = [15.0] * 10
if True:
   ret = dual_annealing(func, bounds=list(zip(lw, up)), seed=1234, maxiter=20)
if False:
   ret=optimize.differential_evolution(func, bounds=list(zip(lw, up)), maxiter=10)
if False:
   x0=[ 0. for i in range(10)]
   ret = minimize(func, x0, method='Nelder-Mead', tol=1e-6)
lines=[]
lines.append(str(ret.fun))
for i in range(len(ret.x)):
    lines.append(str(ret.x[i]))
append_multiple_lines('./OUTPUT', lines)




--------------------------------------------------------------------------------------------------------------

mpi4py를 활용한 함수 최소화
단, 함수는 모든 cpu들이 동원되어 병렬적으로 계산함. 
최종적으로 변수들의 최적값들은 1개의 cpu에서만 이루어짐. 1개의 cpu가 scipy의  minimization을 수행한다. 

scipy.optimize

#from scipy.optimize import minimize
from scipy.optimize import fmin
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

N = 100            # for testing
step = N//size   # say that N is divisible by size

def parallel_function_caller(x,stopp):
    stopp[0]=comm.bcast(stopp[0], root=0)
    summ=0
    if stopp[0]==0:
        #   your function here in parallel
        x=comm.bcast(x, root=0)
        array= np.arange(x[0]-N/2.+rank*step-42,x[0]-N/2.+(rank+1)*step-42,1.)
        summl=np.sum(np.square(array))
        summ=comm.reduce(summl,op=MPI.SUM, root=0)
        if rank==0:
            print ("value is "+str(summ))
    return summ

if rank == 0 :
   stop=[0]
   x = np.zeros(1)
   x[0]=20
   #xs = minimize(parallel_function_caller, x, args=(stop))
   xs = fmin(parallel_function_caller,x0= x, args=(stop,))
   print( "the argmin is "+str(xs))
   stop=[1]
   parallel_function_caller(x,stop)

else :
   stop=[0]
   x=np.zeros(1)
   while stop[0]==0:
      parallel_function_caller(x,stop)

--------------------------------------------------------------------------------------------------------------

import os
import sys
import numpy as np
from scipy.optimize import minimize
import scipy.optimize as optimize
from scipy.optimize import dual_annealing
from scipy.optimize import differential_evolution

def append_multiple_lines(file_name, lines_to_append):
    with open(file_name, "a+") as file_object:
        appendEOL = False
        file_object.seek(0)
        data = file_object.read(100)
        if len(data) > 0:
            appendEOL = True
        for line in lines_to_append:
            if appendEOL == True:
                file_object.write("\n")
            else:
                appendEOL = True
            file_object.write(line)
def eggholder(x):
    return (-(x[1] + 47) * np.sin(np.sqrt(abs(x[0]/2 + (x[1]  + 47))))
             -x[0] * np.sin(np.sqrt(abs(x[0] - (x[1]  + 47)))))
def rosen(x):
    """The Rosenbrock function"""
    return sum(100.0*(x[1:]-x[:-1]**2.0)**2.0 + (1-x[:-1])**2.0)

if False:
   optimize.show_options(solver='minimize',method='nelder-mead')
if False:
    x0 = np.array([1.3, 0.7, 0.8, 1.9, 1.2])
    ndim=len(x0)
    bnds=[]
    for _ in range(ndim):
       bnds.append((-512., 512.))
fname='input.txt'
if not os.path.isfile(fname) :
    print('input.txt is not present')
    sys.exit()
afile=open(fname,'r')
jline=0
for line in afile:
     if jline == 0:
          ndim=int(line.split()[0])
          x0=np.zeros(ndim)
     if jline > 0:
        if jline-1 < ndim:
            x0[jline-1]=float(line.split()[0])
            print(x0[jline-1])
     if jline == 1+ndim :
          ncal=int(line.split()[0])
     jline=jline+1
afile.close()
fname='bnds.txt'
if not os.path.isfile(fname) :
    print('bnds.txt is not present')
    sys.exit()
afile=open(fname,'r')
jline=0
for line in afile:
     if jline == 0:
          bnds=[]
     if jline > 0:
        if jline-1 < ndim:
            print( (float(line.split()[0]),float(line.split()[1]))   )
            bnds.append( (float(line.split()[0]),float(line.split()[1]))   )
     jline=jline+1
afile.close()
bnds=np.array(bnds)
if True:
    res = minimize(rosen, x0, method='nelder-mead', bounds=bnds, options={'xatol': 1e-8, 'disp': True})
if False:
    res = dual_annealing(rosen, x0=x0, bounds=bnds)
if False:
   res=differential_evolution(rosen, bounds=bnds, maxiter=10)
print(res.x)
print(res.fun)

lines_to_append=[]
lines_to_append.append(str(ndim))
for i in range(ndim):
    lines_to_append.append(str(res.x[i]))
lines_to_append.append(str(res.fun))
lines_to_append.append(str(ncal))
fname='output.txt'
if os.path.isfile(fname) :
    os.remove(fname)
append_multiple_lines(fname, lines_to_append)

--------------------------------------------------------------------------------------------------------------
각종 constraints를 활용하는 경우, 이 경우도 scipy 패키지가 지원한다.

from scipy.optimize import minimize
from scipy.optimize import fmin_slsqp
import numpy as np

def obj(x):
    return (x-1)*(x-1)
x0 = np.array([10])
res = minimize(obj,x0,method="SLSQP")
print(res.x)


def f1array(x):
    return x[0] ** 2 + x[1] ** 2
def eq_constraint(x):
    return x[0] + x[1] - 1
fmin_slsqp(f1array, np.array([1, 1]), eqcons=[eq_constraint])



def f2(x):
    return np.sqrt((x[0] - 4) ** 2 + (x[1] - 2) ** 2)
k = 1
def ieq_constraint(x):
    return np.atleast_1d(k - np.sum(np.abs(x)))
fmin_slsqp(f2, np.array([0, 0]), ieqcons=[ieq_constraint])


fun = lambda x: (x[0] - 1)**2 + (x[1] - 2.5)**2
cons = ({'type': 'ineq', 'fun': lambda x:  x[0] - 2 * x[1] + 2},
        {'type': 'ineq', 'fun': lambda x: -x[0] - 2 * x[1] + 6},
        {'type': 'ineq', 'fun': lambda x: -x[0] + 2 * x[1] + 2})
bnds=((0, None),(0, None))
res=minimize(fun, (2,0), method='SLSQP', bounds=bnds, constraints=cons)
print(res.fun)
print(res.x)


def obj_fun(x):
    return x[0] ** 2 + x[1] ** 2
def eq_const1(x):
    return x[0] + x[1] - 1
def ieq_const1(x): # returned value should be positive  constraint >=0
    return np.atleast_1d(1 - np.sum(np.abs(x)))

x0=np.zeros(2)
x0[0]=0.
x0[1]=0.
res = fmin_slsqp(obj_fun, x0, eqcons=[eq_const1], ieqcons=[ieq_constraint])
print(res)


def obj_fun(x):
    return x[0] ** 2 + x[1] ** 2
def eq_const1(x):
    return x[0] + x[1] - 1
def eq_const2(x):
    return x[0] + x[1] - 2
def ieq_const1(x): # returned value should be positive  constraint >=0
    return x[0]

res = fmin_slsqp(obj_fun, np.array([0, 0]), eqcons=[eq_const1, eq_const2], ieqcons=[ieq_const1])
print(res)

def obj_fun(x):
    return x[0] ** 2 + x[1] ** 2
def eq_const1(x):
    return x[0] + x[1] - 1
def eq_const2(x):
    return x[0] + x[1] - 2
def ieq_const1(x): # returned value should be positive  constraint >=0
    return x[0]

res = fmin_slsqp(obj_fun, np.array([0, 0]), eqcons=[eq_const1, eq_const2], ieqcons=[ieq_const1])
print(res)


def objective(x):
    return x[0]*x[3]*(x[0]+x[1]+x[2])+x[2]

def constraint1(x):
    return x[0]*x[1]*x[2]*x[3]-25.0

def constraint2(x):
    sum_eq = 40.0
    for i in range(4):
        sum_eq = sum_eq - x[i]**2
    return sum_eq

# initial guesses
n = 4
x0 = np.zeros(n)
x0[0] = 1.0
x0[1] = 5.0
x0[2] = 5.0
x0[3] = 1.0

# show initial objective
print('Initial SSE Objective: ' + str(objective(x0)))

# optimize
b = (1.0,5.0)
bnds = (b, b, b, b)
con1 = {'type': 'ineq', 'fun': constraint1}


from scipy.optimize import (BFGS, SR1, Bounds, NonlinearConstraint, minimize)

class problem:
  arg1 = 1
  arg2 = 2
  arg3 = 3

  def f(self,x):
    return -x[0]*self.arg1-x[1]*self.arg2

  def g(self,x):
    return x[0]-self.arg3*x[1]

p = problem()


bounds = Bounds([0,0], [2,3])
nonlinear_constraint = NonlinearConstraint(p.g, 0.0, 0.0, jac='2-point', hess=BFGS())
res = minimize(p.f,
                x0=[0,0],
                method='trust-constr',
                jac="2-point",
                hess=SR1(),
                constraints=[nonlinear_constraint],
                options={'verbose': 1},
                bounds=bounds)
print(res )

from math import cos, atan
def f(x):
    return 0.1 * x[0] * x[1]

def ineq_constraint(x):
    return x[0]**2 + x[1]**2 - (5. + 2.2 * cos(10 * atan(x[0] / x[1])))**2


con = {'type': 'ineq', 'fun': ineq_constraint}
x0 = [1, 1]
res = minimize(f, x0, method='SLSQP', constraints=con)
print(res)



--------------------------------------------------------------------------------------------------------------

Nonlinear constraints :






--------------------------------------------------------------------------------------------------------------


import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
 
def f(x,y):
    "Objective function"
    return (x-3.14)**2 + (y-2.72)**2 + np.sin(3*x+1.41) + np.sin(4*y-1.73)
    
# Compute and plot the function in 3D within [0,5]x[0,5]
x, y = np.array(np.meshgrid(np.linspace(0,5,100), np.linspace(0,5,100)))
z = f(x, y)
 
# Find the global minimum
x_min = x.ravel()[z.argmin()]
y_min = y.ravel()[z.argmin()]
 
# Hyper-parameter of the algorithm
c1 = c2 = 0.1
w = 0.8
 
# Create particles
n_particles = 20
np.random.seed(100)
X = np.random.rand(2, n_particles) * 5
V = np.random.randn(2, n_particles) * 0.1
 
# Initialize data
pbest = X
pbest_obj = f(X[0], X[1])
gbest = pbest[:, pbest_obj.argmin()]
gbest_obj = pbest_obj.min()
 
def update():
    "Function to do one iteration of particle swarm optimization"
    global V, X, pbest, pbest_obj, gbest, gbest_obj
    # Update params
    r1, r2 = np.random.rand(2)
    V = w * V + c1*r1*(pbest - X) + c2*r2*(gbest.reshape(-1,1)-X)
    X = X + V
    obj = f(X[0], X[1])
    pbest[:, (pbest_obj >= obj)] = X[:, (pbest_obj >= obj)]
    pbest_obj = np.array([pbest_obj, obj]).min(axis=0)
    gbest = pbest[:, pbest_obj.argmin()]
    gbest_obj = pbest_obj.min()
 
# Set up base figure: The contour map
fig, ax = plt.subplots(figsize=(8,6))
fig.set_tight_layout(True)
img = ax.imshow(z, extent=[0, 5, 0, 5], origin='lower', cmap='viridis', alpha=0.5)
fig.colorbar(img, ax=ax)
ax.plot([x_min], [y_min], marker='x', markersize=5, color="white")
contours = ax.contour(x, y, z, 10, colors='black', alpha=0.4)
ax.clabel(contours, inline=True, fontsize=8, fmt="%.0f")
pbest_plot = ax.scatter(pbest[0], pbest[1], marker='o', color='black', alpha=0.5)
p_plot = ax.scatter(X[0], X[1], marker='o', color='blue', alpha=0.5)
p_arrow = ax.quiver(X[0], X[1], V[0], V[1], color='blue', width=0.005, angles='xy', scale_units='xy', scale=1)
gbest_plot = plt.scatter([gbest[0]], [gbest[1]], marker='*', s=100, color='black', alpha=0.4)
ax.set_xlim([0,5])
ax.set_ylim([0,5])
 
def animate(i):
    "Steps of PSO: algorithm update and show in plot"
    title = 'Iteration {:02d}'.format(i)
    # Update params
    update()
    # Set picture
    ax.set_title(title)
    pbest_plot.set_offsets(pbest.T)
    p_plot.set_offsets(X.T)
    p_arrow.set_offsets(X.T)
    p_arrow.set_UVC(V[0], V[1])
    gbest_plot.set_offsets(gbest.reshape(1,-1))
    return ax, pbest_plot, p_plot, p_arrow, gbest_plot
 
anim = FuncAnimation(fig, animate, frames=list(range(1,50)), interval=500, blit=False, repeat=True)
anim.save("PSO.gif", dpi=120, writer="imagemagick")
 
print("PSO found best solution at f({})={}".format(gbest, gbest_obj))
print("Global optimal at f({})={}".format([x_min,y_min], f(x_min,y_min)))



핑백

덧글

댓글 입력 영역

최근 포토로그