当前位置: 首页 > PHP > PHP支持多进程的deamon服务程序

PHP支持多进程的deamon服务程序

  • 分类:PHP
  • 本文标签: daemon 多进程 php
  • 发布时间:2017-10-11 10:40:19
  • 作者:Ferman
  • 查看数: 19

    1️⃣自定义设置进程数量

    2️⃣自定义设置每个进程每次处理的消息数量

    3️⃣没有消息的时候,deamon会sleep,节省服务开销

    4️⃣支持后台监控

    5️⃣后台监控可查看历史处理消息数量和今日处理消息数量

    6️⃣后台可查看进程的运行状态和运行数量

    7️⃣后台可查看每个服务所在服务器和服务所在脚本


脚本是在yii框架,耦合性很低,可以方便的迁移

基础脚本:

<?php
/**
 * 多进程处理后台任务
 * @author hongbo
 * @date 20170913
 */

namespace songbook\daemons;

use common\models\SongbookMonitor;
use Yii;
use yii\console\Application;
use yii\helpers\ArrayHelper;

require(__DIR__.'/../../vendor/autoload.php');
importYiiFramework('prod');

class BaseDaemon {
  public $workers_max_num;
  protected $workers = [];
  protected $parent_pid = 0;
  protected $show_log = false;
  protected $config = [];



  public function __construct($config) {
    //设置并检查配置数据
    $this->setConfig($config);

  }

  /**
   * go start
   */
  public function run() {
    //安装信号
    $this->setSignal();

    //fork 进程
    $this->setProcess();

  }

  /**
   * 检查并设置配置文件
   * @param $config
   * @return bool
   */
  private function setConfig ($config) {
    $options = array(
      'job_name'     => '',      #当前处理的job名称
      'queue_name'    => '',      #要监听的消息队列名(key)
      'process_num'  => '',      #子进程数量
      'msg_num_atm'  => 10,      #每次处理的消息数,如果是多个会有合并处理
      'max_sleep'    => 30,      #没有消息的时候,deamon将sleep,如果队列消息不多,尽量设置大点,减少处理压力
      'admin_mail'   => '',      #接受监控报警的邮件地址,多个地址逗号分割
      'filepath'     => '',      #php文件地址
      'life'         => 0,       #程序的生命周期,如果0表示是一直循环的Deamon处理,如果设置了时间,必须采用crontab的形式
    );

    $options = array_merge($options, $config);

    if (empty($options['job_name'])) {
      echo '[ERROR]消息队列名称不可为空!'.PHP_EOL;
      exit();
    }

    $options['start_time'] = time();
    $options['server_ip'] = $this->getServerIp();

    $this->parseParameters();
    $this->config = array_merge($this->config, $options);
    unset($options);

    //用一个文件来确定 队列是否有数据
    $this->markFile('create');

    return true;

  }

  /**
   * @param string $type file_name/crate/unlink
   * @return string
   */
  protected function markFile($type = 'file_name') {
    $file_name = '/tmp/'.$this->config['queue_name'];
    switch ($type) {
      case 'file_name' :
        return $file_name;

      case 'create' :
        if (!file_exists($file_name)) {
          file_put_contents($file_name, '1');
        }
        break;

      case 'unlink' :
        //没有数据了,可以做个rand,防止同时unlink
        sleep(mt_rand(10, 20));
        file_exists($file_name) && unlink($file_name);
        break;
    }
    return true;
  }
  /**
   * Worker的参数语法
   * @return array [key => value]
   */
  private function parseParameters() {

    global $argv;

    if (empty($argv[0]) || empty($argv[1])) {
      echo 'ERROR: 参数有误'.PHP_EOL;
      exit();
    }

    $this->config['daemon'] = $argv[0];

    if ((isset($argv[2]) && $argv[2] === 'stop') || $argv[1] === 'stop') {
      $this->killProcess();
      exit();
    }

    if (false === strpos($argv[1], '--env=')) {
      echo 'ERROR: you shoud "php x.php --env=prod" to start '.PHP_EOL;
      exit();
    }

    $process_num = $this->getProcessNum();
    if ($process_num > 1) {
      echo 'ERROR: process has already running, please stop it first'.PHP_EOL;
      exit();
    }

    $this->config['env'] = substr($argv[1], strpos($argv[1], '=') + 1);

    return true;
  }

  private function setSignal() {
    $this->workers_max_num = $this->config['process_num'];
    $this->parent_pid = getmypid();

    //安装以下信号
    pcntl_signal(SIGCHLD, array(__CLASS__, 'signalHandler')); //子进程退出发给父进程的信号
    pcntl_signal(SIGINT, array(__CLASS__, 'signalHandler')); //ctrl+c
    pcntl_signal(SIGTERM, array(__CLASS__, 'signalHandler')); //kill/killall, SIGKILL(kill -9)无法捕获


    //父进程启动
    $log = 'start. parent pid:'. $this->parent_pid;
    $this->log($log);
  }

  /**
   * 调用运行
   *
   */
  public function setProcess() {
    //启动,fork出指定数量子进程
    $this->forkWorkers();

    $i = 0;
    while(1) {
      //有信号来时,触发信号处理函数
      pcntl_signal_dispatch();

      $i++;
      //挂起父进程等待有子进程退出时,回收子进程
      $pid = pcntl_waitpid(-1, $status);
      unset($this->workers[$pid]);

      //log
      $exit_code = pcntl_wexitstatus($status);
      $log = 'run times:'. $i. ' child:'. $pid. ' exit with status '. $exit_code;
      $this->log($log);
    }
  }

  /**
   * 启动所需的所有子进程
   *
   */
  public function forkWorkers() {

    if ($this->shouldStop()) {
      $this->killProcess();
    }
    while (count($this->workers) < $this->workers_max_num) {
      $this->monitor();
      $this->fork();
    }
  }

  /**
   * 创建一个子进程
   *
   */
  protected function fork() {
    $pid = pcntl_fork();

    if($pid == -1) {
      //fork faild
      $log = 'fork failed!'. getmypid();
      $this->log($log);

      return false;
    } elseif ($pid > 0) {
      // 父进程对子进程做登记
      $this->workers[$pid] = $this->parent_pid;

      $log = 'fork child:'. $pid;
      $this->log($log);

    } else {
      $log = 'i\'m children:'. getmypid();
      $this->log($log);

      //要执行的具体内容
      $this->doWork();
      exit();
    }

    return true;
  }

  /**
   * 要处理的任务,继承后重写
   */
  public function doWork() {
    $num = 1000;
    for($i=0;$i<$num;$i++) {
      echo $i. PHP_EOL;
      echo date('Y-m-d H:i:s'). "\t". 'do task, pid:'. getmypid(). PHP_EOL;
    }
  }

  /**
   * 信号处理
   *
   */
  public function signalHandler($signo) {
    switch($signo) {
      case SIGUSR1:
        //todo
        break;

      //子进程结束信号
      case SIGCHLD:
        $log = 'father pid:'. getmypid(). ' capture sigchld, continue fork.';
        $this->log($log);

        $this->forkWorkers();

        break;

      case SIGTERM:
        $log = 'father pid:'. getmypid(). ' capture kill, exit!';
        $this->log($log);
        exit;
        break;

      case SIGHUP:
      case SIGQUIT:
        $log = 'father pid:'. getmypid(). ' capture signo:'. $signo. ', exit!';
        $this->log($log);
        break;

      case SIGINT:
        $log = 'father pid:'. getmypid(). ' capture ctrl+c, exit!';
        $this->log($log);
        exit;
        break;

      default:
        return;
    }
  }
  private function getServerIp() {

    if(isset($_SERVER['SERVER_ADDR'])){

      $serverIp = $_SERVER['SERVER_ADDR']; //当前服务器的IP

    }else{

      if(isset($_SERVER['SSH_CONNECTION']))
        $serverIp = explode(" ", $_SERVER['SSH_CONNECTION']);

      $serverIp = isset($serverIp) ? $serverIp[2] : ''; //当前服务器的IP

    }

    $serverIp = $serverIp ? $serverIp : $this->getIpByShell();

    return $serverIp;

  }

  /**
   * 用shell 获取ip
   * @param $log
   */
  public function getIpByShell() {
    $cmd = "/sbin/ifconfig eth0|grep inet|grep -v 127.0.0.1|grep -v inet6|awk '{print $2}'|tr -d 'addr:'";
    exec($cmd, $out);
    if (isset($out[0])) {
      return $out[0];
    }
    return '';
  }

  public function log($log) {
    $log = date('H:i:s'). "\t". $log;

    if ($this->show_log) {
      echo $log. PHP_EOL;
    }
  }

  public function shouldStop() {

    if (
      $this->config['life'] > 0
      && time() - $this->config['start_time'] > $this->config['life']
    ) {
      return true;
    }

    return false;
  }

  /**
   * 这是一个通用的监控方法
   */
  protected $sleepSec = 0;
  protected $startTm = 0;
  protected $runCnt = 0;

  public function monitor() {

    $maxSleep = (int)$this->config['max_sleep'] > 10
      ? (int)$this->config['max_sleep']
      : 10;

    //获取队列中的数据数量

    if (file_exists($this->markFile())) {

      $this->runCnt += $this->config['msg_num_atm'];
      $this->sleepSec = 0;

    } else {

      $this->sleepSec = min($this->sleepSec + 2, $maxSleep); //递增,但限制最大sleep数字,不可等待过长
      if($this->sleepSec == 0) $this->sleepSec = 1;
      sleep($this->sleepSec);

    }

    //定时将自动运行的存活状态记录到数据库主中,便于监控和统计
    //用这种方法模拟心跳,让监控程序知道这循环还活着
    $tm = time();
    if ($tm - $this->startTm > 240 || $this->shouldStop()) {

      $this->startTm = $tm;// 每 240/60 分钟模拟心跳
      $nowD = date("d");

      $where = [

        'job_name' => $this->config['job_name'],
        'queue_name' => $this->config['queue_name'],
        'server_ip'  => $this->config['server_ip'],

      ];

      $res = SongbookMonitor::find()
        ->select(['msgcnt_day', 'msgcnt_date', 'dostop'])
        ->where($where)
        ->asArray()
        ->one();

      //正在进行的进程数量
      $num_running = $this->getProcessNum();

      //有监控,更新
      if ($res) {
        //获得当天处理的消息数量
        $todaycnt = $res['msgcnt_date'] != $nowD
          ? $this->runCnt
          : $res['msgcnt_day'] + $this->runCnt;

        $sql = "update queue_daemon_status
                set tm = {$tm},server_ip = '{$this->config['server_ip']}',
                filepath='{$this->config['filepath']}',
                admin='{$this->config['admin_mail']}', dostop=0,
                msgcnt_all = msgcnt_all + {$this->runCnt} , 
                msgcnt_day = {$todaycnt}, msgcnt_date='{$nowD}',
                process_num = {$this->config['process_num']},
                process_num_running = {$num_running}
                where job_name = '{$this->config['job_name']}'
                and queue_name = '{$this->config['queue_name']}' 
                and server_ip = '{$this->config['server_ip']}'";
        Yii::$app->songbook_monitor->createCommand($sql)->query();
        $this->runCnt = 0;
        if ((int)$res['dostop']) {
          $this->killProcess();
          exit(1);
        }

      } else {
        //无监控,添加
        $sql = "insert into queue_daemon_status
                (job_name, queue_name, tm, server_ip, filepath, admin, msgcnt_date, msgcnt_all, msgcnt_day, process_num, process_num_running)
                values('{$this->config['job_name']}', '{$this->config['queue_name']}', {$tm}, 
                '{$this->config['server_ip']}', '{$this->config['filepath']}', 
                '{$this->config['admin_mail']}', {$nowD},$this->runCnt,
                $this->runCnt, {$this->config['process_num']}, $num_running)";

        Yii::$app->songbook_monitor->createCommand($sql)->query();
      }

    }

    if ($this->shouldStop()) {
      $this->killProcess();
      exit(1);
    }
    //及时关闭数据库连接
    Yii::$app->songbook_monitor->close();
    return true;

  }

  /**
   * kill daemon
   */
  public function killProcess() {
    $num = $this->getProcessNum();
    if ($num < 1) {
      echo 'no process running'.PHP_EOL;
    } else {
      echo ' process num is '.$num.PHP_EOL;
      $cmd = "ps aux | grep {$this->config['daemon']} | grep -ivE 'vi|tail|more| stop|grep' | awk '{print $2}' | xargs kill -9";

      exec($cmd, $out);
      sleep(1);

      $num = $this->getProcessNum();
      if ($num > 0) {
        exec($cmd, $out);
      }


      if ($num < 1) {
        echo 'Kill process success'.PHP_EOL;
      } else {
        echo 'kill process failed'.PHP_EOL;
      }
    }
    exit();
  }

  /**
   * get process num
   */
  public function getProcessNum() {
    if (!$this->config['daemon']) {
      echo 'can not found which to get'.PHP_EOL;
      exit;
    }
    $cmd = "ps aux | grep {$this->config['daemon']} | grep -ivE 'vi|tail|more| stop|grep' ";
    //$cmd = "ps aux | grep {$this->config['daemon']} | grep -ivE 'vi|tail|more| stop|grep' | awk '{print $2}' | xargs kill -9";
    exec($cmd, $out);
    return count($out);
  }

}

/**
 * woker进程启动后的回调
 * @throws \Exception
 */
function importYiiFramework($env) {

  if (!in_array($env, ['prod', 'test', 'dev'])) {
    echo 'env error'.PHP_EOL;
    exit();
  }

  defined('YII_DEBUG') or define('YII_DEBUG', false);
  defined('YII_ENV') or define('YII_ENV', $env);


  require(__DIR__ . '/../../vendor/yiisoft/yii2/Yii.php');
  switch ($env) {
    case 'dev': {
      require(__DIR__ . "/../../common/config/bootstrap-dev.php");
      break;
    }
    case 'test': {
      require(__DIR__ . "/../../common/config/bootstrap.php");
      require(__DIR__ . "/../../common/config/bootstrap-test.php");
      break;
    }
    case 'prod': {
      require(__DIR__ . "/../../common/config/bootstrap.php");
      break;
    }
    default: {
      throw new \Exception('invalid env.');
    }
  }

  $config = ArrayHelper::merge(
    require(__DIR__ . '/../../common/config/main.php'),
    require(__DIR__ . '/../../console/config/main.php')
  );
  switch ($env) {
    case 'dev': {
      $config = ArrayHelper::merge($config, require(__DIR__ . '/../../console/config/main-dev.php'));
      break;
    }
    case 'test': {
      $config = ArrayHelper::merge($config, require(__DIR__ . '/../../console/config/main-test.php'));
      break;
    }
    case 'prod': {
      $config = ArrayHelper::merge($config, require(__DIR__ . '/../../console/config/main.php'));
      break;
    }
    default: {
      throw new \Exception('invalid env.');
    }
  }

  $application = new Application($config);
  $application->init();
}

业务脚本:

<?php
namespace songbook\daemons;

require('BaseDaemon.php');

use common\services\InstrumentalService;
use Yii;
use songbook\helper\RedisHelper;

/**
 * 伴奏带清洗
 * @author hongbo
 * @package songbook\daemons
 */
class InstrumentalToWashDaemon extends BaseDaemon {

  /**
   * 处理数据,只需要关心这个方法即可
   */
  public function dealData($json) {

    if (!$json || !($payload = json_decode($json, true))) {
      return;
    }

    InstrumentalService::wash($payload);

  }

  public function doWork() {

    $redis = Yii::$app->qukuCache->redis;

    $num = max($this->config['msg_num_atm'], 1);

    for ($i = 0; $i < $num; $i++) {

      //从队列获取数据
      $json = $redis->rpop($this->config['queue_name']);

      //如果队列里没有数据了,要将数据标识unlink
      if (!$json) {
        $this->markFile('unlink');
        continue;
      }

      //有数据,创建标识
      $this->markFile('create');

      //处理数据
      $this->dealData($json);
    }

  }
}

$config = [
  'queue_name'  => RedisHelper::INSTRUMENTAL_NEED_WASH,
  'job_name'    => 'Instrumental_wash', //给自己的job起个名字
  'process_num' => 3, //设置进程数量
  'msg_num_atm' => 2,  //每个进程每次处理的消息数,如果是多个会有合并处理
  'max_sleep'   => 5, //没有消息的时候,deamon将sleep,如果队列消息不多,尽量设置大点,减少处理压力20+
  'life'        => 0,  //程序的生命周期,如果0表示是一直循环的Deamon处理,如果设置了时间,必须采用crontab的形式
  'filepath'    => __FILE__,
  'admin_mail'  => 'hongbo.cui@ushow.media',
];

(new InstrumentalToWashDaemon($config))->run();


转载时请以 超链接的形式 注明:转自Ferman

                  

About me