文章出處

本篇源碼基于趙星對Spark 1.3.1解析進行整理。話說,我不認為我這下文源碼的排版很好,不能適應的還是看總結吧。

雖然1.3.1有點老了,但對于standalone模式下的Master、Worker和劃分stage的理解是很有幫助的。
=====================================================
總結:

master和worker都要創建ActorSystem來創建自身的Actor對象,master內部維護了一個保存workerinfo的hashSet和一個key為workerid,
value為workerInfo的HashMap。
master構造方法執行后會啟動一個定時器,定期檢查超時的worker。
worker構造方法執行后會嘗試與master建立連接并發送注冊消息,master收到消息后,封裝worker并持久化,再給worker反饋,
worker收到反饋后,啟動定時任務向master發送心跳,master收到心跳后更新心跳時間。

new SparkContext(),執行主構造器,創建SparkEnv,env里創建了ActorSystem用于通信,
然后創建TaskScheduler,創建DAGScheduler。TaskScheduler里創建了2個actor分別負責與master和executors進行通信。
ClientActor創建之前,會準備一大堆的參數,包括spark參數,java參數,executor的實現類等,
封裝進AppClient,然后創建ClientActor與Master建立連接發送注冊信息,Master收到后保存app的信息并反饋。
這時Master開始調度資源并啟動worker,有兩種調度方式:盡量打散,盡量集中,默認打散。
Master發消息給Worker,worker拼接Java命令,啟動子進程。
(TaskScheduler 里會創建一個backend,backend調用start方法后,會先調用父類的start方法,父類的start方法會創建DriverActor,再執行自己的start方法創建ClientActor)

執行到Action算子會執行sparkContext里的runJob(),再調用DAGScheduler的runJob(),通過2個HashSet和1個Stack劃分stage,然后提交stage。
將stage創建成多個Task,分為shuffleMapTask和ResultTask,組成taskSet,由taskScheduler通過DriverActor向Executor進行提交。

DAG的邏輯:
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new Stack[RDD[_]]
將最后一個rdd壓棧waitingForVisit,當waitingForVisit非空時while循環,waitingForVisit彈棧出的rdd判斷是否在visited中,
否,則rdd添加進visited,循環rdd的父rdd,如果不是shuffleMapStage,將rdd壓棧waitingForVisit,是shuffleMapStage,則再
求父stage加入parents,求父stage是調用本方法的遞歸過程。
=====================================================
object Master
  |--def main()
    |--加載配置文件并解析。
    |--//創建ActorSystem和Actor
    |--def startSyatemAndActor()
      |--//通過AkkaUtils工具類創建ActorSystem
      |--AkkaUtils.createActorSystem()
        |--//定義一個函數,創建ActorSystem
        |--val startService: Int => (返回值) = {doCreateActorSystem()}
          |--val (actorSystem, boundPort) = doCreateActorSystem()
            |--//創建ActorSystem
            |--//準備Akka參數
            |--val akkaConf = xxxx
            |--//創建ActorSystem
            |--val actorSystem = ActorSyatem(name, akkaConf)
            |--return (actorSystem, boundPort)
        |--//調用函數
        |--Utils.startServiceOnPort(startService())
          |--從一個沒有被占用的端口啟動服務,調用startService函數
    |--//通過ActorSystem創建Actor:master
    |--val actor(master) = actorSystem.actorOf(master)//創建master,master也是一個actor
  |--//成員變量:保存workerInfo
  |--val workers = new HashSet[WorkerInfo]
  |--//成員變量:保存(workerId,workInfo)
  |--val idToWorker = new HashMap[String, WorkerInfo]
  |--//構造方法之后,receive方法之前
  |--def preStart()
    |--//啟動一個定時器,定時檢測超時的worker
    |--context.system.scheduler.scheduler(self,checkxxx)//自己給自己發消息,發送到自己的recevice方法,啟動任務
  |--接收worker向master注冊的消息
  |--case RegisterWorker()
    |--//封裝worker信息
    |--val worker = new WorkerInfo()
    |--//持久化到zk
    |--persistenceEngine.addWorker(worker)
    |--//向worker反饋信息
    |--sender ! RegisteredWorker(masterUrl)
    |--//任務調度
    |--schedule()
  |--case Heartbeat(workerId)//worker發來的心跳
    |--//更新上一次心跳時間
    |--workerInfo.lastHeartbeat = Syatem.currentTimeMillis()
--------接SparkContext,Driver創建ClientActor向Master注冊應用信息-----------
  |--case RegisterApplication(description)
    |--//封裝消息
    |--val app = createApplication(description, sender)
    |--//注冊消息,即存入集合
    |--registerApplication(app) //方法內部就是把app放進map等
      |--HashMap waitingApps(appid, app)
    |--//持久化保存
    |--persistenceEngine.addApplication(app)
    |--//Master向ClientActor發送注冊成功的消息
    |--sender ! RegisterApplication(app.id, masterUrl)
    |--//Master開始調度資源,將任務啟動到worker上
    |--//兩種情況下會進行調度:
    |--//1、提交任務,殺死任務
    |--//2、worker新增或減少
    |--schedule()
--------Master進行資源的調度-------------
  |--//兩種調度方式:盡量打散,盡量集中
  |--def schedule()
    |--//盡量打散
      |--//進行一系列的判斷過濾,例如worker上剩余的核數或內存是否大于app所需資源
      |--//分核數的邏輯:
      |--//假設需要10個核心,現有4臺機器,各有8個核心
      |--//創建一個長度為4的數組,角標為0,角標=(角標+1)%4,那角標只會在0~3之間循環,
      |--//循環一次需要的核心-1,worker(角標)的核心+1
      |--//Master發信息讓worker啟動executor
      |--launchExecutor(usableWorkers(pos), exec)
    |--//盡量集中
      |--//一下子把worker剩余的資源全部分配完在分配下一個worker
      |--//Master發信息讓worker啟動executor
      |--launchExecutor(worker, exec)
--------Master發信息讓worker啟動executor-------------
  |--def launchExecutor(worker, exec)
    |--//記錄worker使用資源
    |--worker.addExecutor(exec)
    |--//master發消息給worker,將參數傳遞給worker,讓他啟動executor
    |--worker.actor ! LaunchExecutor(.....)
    |--//Master向ClientActor發消息,告訴他executor已經啟動了
    |--exec.application.driver ! ExecutorAdded(......)
-----------------------------------------------------
object Worker
  |--def main()
    |--//創建ActorSystem和Actor
    |--def startSyatemAndActor()
      |--與Master過程相同
    |--//通過ActorSystem創建Actor:worker
    |--val actor(worker) = actorSystem.actorOf(worker)
  |--//構造方法之后,receive方法之前
  |--def preStart()
    |--//與master建立連接,發送注冊消息
    |--registerWithMaster()
      |--//嘗試注冊,如果失敗嘗試多次
      |--tryRegidterAllMasters()
        |--//建立連接
        |--val actor(master) = context.actorSelection(masterAkkaUrl)
        |--//發送注冊消息
        |--actor ! RegisterWorker(workId, host, port, cores, memory...)
  |--//Master發給Worker注冊成功的消息
  |--case RegisteredWorker(masterUrl)
    |--//啟動定時器,定期發送心跳
    |--context.system.scheduler.scheduler(self,SendHeartbeat)//自己給自己發消息,發送到自己的recevice方法,啟動任務
  |--case SendHeartbeat
    |--//發送心跳
    |--master ! Heartbeat(workid)
-------------上接:Master發信息讓worker啟動executor-----------
  |--case LaunchExecutor(...)
    |--創建ExecutorRunner,將參數放入其中,然后再通過他啟動Executor
    |--val manager = new ExecutorRunner(...)
    |--//調用ExecutorRunner的start方法來啟動executor java子進程
    |--manager.start()
class ExecutorRunner
  |--def start()
    |--//創建線程,通過線程的start來啟動java子進程
    |--workerThread = new Thread(){def run(){fetchAndRunExecutor()}}
    |--workerThread.start()
  |--def fetchAndRunExecutor()
    |--//啟動子進程
    |--//有具體的類,拼接java命令啟動相應的類

總結:Master和Worker之間的通信:
master和worker都要創建ActorSystem來創建自身的Actor對象,master內部維護了一個保存workerinfo的hashSet和一個key為workerid,
value為workerInfo的HashMap。
master構造方法執行后會啟動一個定時器,定期檢查超時的worker。
worker構造方法執行后會嘗試與master建立連接并發送注冊消息,master收到消息后,封裝worker并持久化,再給worker反饋,
worker收到反饋后,啟動定時任務向master發送心跳,master收到心跳后更新心跳時間。
=====================================================
class SparkContext//即Driver端
  |--//主構造器
    |--def this()
    |--//創建SparkEnv,包含了一個ActorSyatem
    |--val env = createSparkEnv()
    |--//創建ActorSyatem的方法
    |--def createSparkEnv()
      |--//調用 SparkEnv 的靜態方法創建SparkEnv
      |--SparkEnv.createDriverEnv()
    |--//創建 TaskScheduler
    |--var taskScheduler(schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
    |--//創建 executors 和 DriverActor 的心跳Actor
    |--val heartbeatReceiver = env.actorSystem.actorOf(new HeartbeatReceiver(taskScheduler),...)
    |--//創建DAGScheduler
    |--var dagScheduler = new DAGScheduler(this)
    |--//啟動TaskSecheduler
    |--taskScheduler.start()
  |--//創建 TaskScheduler 方法,
  |--//根據提交任務時指定的url(本地/yarn/standalone)創建相應的 TaskScheduler
  |--def createTaskScheduler()
    |--//spark的standalone模式
    |--case SPARK_REGEX(sparkUrl)
      |--//創建 TaskSchedulerImpl
      |--val scheduler = new TaskSchedulerImpl(sc)
      |--//創建 SparkDeploySchedulerBackend
      |--val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
      |--//調用 initialize 創建調度器,默認使用先進先出的調度器
      |--scheduler.initialize(backend)

class TaskSchedulerImpl
  |--def initialize(backend)
    |--val backend = backend
  |--def start()
    |--//首先調用 SparkDeploySchedulerBackend 的start()
    |--backend.start()
-----------★★★調用taskScheduler的submitTasks方法來提交TaskSet-------------
  |--def submitTasks(taskSet)
    |--//Driver發消息任務
    |--backend.reviveOffers()
class SparkDeploySchedulerBackend extends CoarseGrainedSchedulerBackend
  |--def start()
    |--//調用父類的 start 來創建 DriverActor
    |--super.start() //CoarseGrainedSchedulerBackend 的 start 方法
    |--//準備一大堆的參數,例如spark的參數,java的參數,在Driver端都準備好,屆時直接發給master,master拿到后發給executor執行即可
    |--conf......
    |--//將參數封裝成Command,這是以后executor的實現類,類名也封裝好了,yarn中啟動的也是這個,所以不是yarnChild
    |--val command = Command("org.apache.executor.CoarseGrainedExecutorBackend",conf,...)
    |--//將參數封裝到ApplicationDescription
    |--val appDesc = new ApplicationDescription(sc.appName, command, ....)
    |--創建AppClient
    |--client = new AppClient(sc.actorSystem, masters, appDesc, ...)
    |--//調用AppClient的start方法,創建ClientActor用于與Master通信
    |--client.start()
class CoarseGrainedSchedulerBackend
  |--def start()
    |--//通過 actorSystem 創建 DriverActor
    |--driverActor = actorSystem.actorOf(new DriverActor(..)) //等待 executor 過來通信
----------上接:Executor向Driver注冊"|--//Driver建立連接,注冊exectuor"------------------------
  |--def receiveWithLogging
    |--//Driver收到executor發來的注冊消息
    |--case RegisterExecutor()
      |--//反饋注冊成功
      |--//★★★查看是否有任務需要提交
      |--makeOffers()//暫時沒有任務,還沒有構建DAG
-----------上接:提交前面的stage-------------------------
  |--def makeOffers()
    |--//調用launchTask向Executor提交Task
    |--launchTask(tasks)
  |--def launchTask(tasks)
    |--//序列化task
    |--val serializedTask = ser.serialize(task)
    |--//向Executor發送序列化好的Task
    |--executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
-----------上接:backend.reviveOffers()------------------
  |--def reviveOffers()
    |--driverActor ! ReviveOffers
class DriverActor
  |--★★★調用makeOffers向Executor提交Task
  |--case ReviveOffers => makeOffers()
class AppClient
  |--def start()
    |--//創建ClientActor用于與Master通信
    |--actor = actorSystem.actorOf(new ClientActor)
  |--//主構造器
    |--def preStart()
      |--//ClientActor向Master注冊
      |--registerWithMaster()
  |--def registerWithMaster()
    |--//向Master注冊
    |--tryRegidterAllMasters()
  |--def tryRegidterAllMasters()
    |--//循環所有Master,建立連接
    |--val actor = context.actorSelection(masterAkkaUrl)
    |--//拿到Master的引用,向master注冊,備用的master不給反饋,活躍的才給
    |--//參數都保存在appDescription中,例如核數,內存大小,java參數,executor實現類
    |--actor ! RegisterApplication(appDescription)
  |--def receiveWithLogging
    |--//ClientActor收到Master發來的注冊成功的消息
    |--case RegisterApplication
      |--//更新Master地址
      |--changeMaster(masterUrl)
object SparkEnv
  |--def createDriverEnv()
    |//調用 create 創建 Actor
    |--create
      |--//創建 ActorSystem
      |--val (actorSystem, boundPort) = AkkaUtils.createActorSystem()
總結:new SparkContext(),執行主構造器,創建SparkEnv,env里創建了ActorSystem用于通信,
然后創建TaskScheduler,創建AGScheduler。TaskScheduler里創建了2個actor分別負責與master
和executors進行通信。(TaskScheduler 里會創建一個backend,backend調用start方法后,會
先調用父類的start方法,父類的start方法會創建DriverActor,再執行自己的start方法創建ClientActor)
ClientActor創建之前,會準備一大堆的參數,包括spark參數,java參數,executor的實現類等,
封裝進AppClient,然后創建ClientActor與Master建立連接發送注冊信息,Master收到后保存app的信息并反饋。
這時Master開始調度資源并啟動worker,有兩種調度方式:盡量打散,盡量集中,默認打散。
Master發消息給Worker,worker拼接Java命令,啟動子進程。
=====================================================
spark-submit腳本提交流程源碼分析:
spark-submit腳本
|--/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-class腳本
|--1.3.1 echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2 //org.apache.spark.deploy.SparkSubmit
|--1.6.1/2.0 "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
-----------------------------------------------------
object org.apache.spark.deploy.SparkSubmit
  |--def main()
    |--//進行匹配
    |--appArgs.action match{case SparkSubmitAction.SUBMIT => submit(appArgs)}
  |--def submit()
    |--def doRunMain()
      |--
    |--//調用doRunMain
    |--doRunMain()
      |--proxyUser.doAs(new xxxAction(){
        override def run():Unit = {
          runMain(...,childMainClass,...)
        }
      })
    |--def runMain(...,childMainClass,...)
      |--//反射自定義的spark程序 class
      |--mainClass = Class.forName(childMainClass,...)
      |--//調用main方法
      |--val mainMethod = mainClass.getMethod("main",...)
      |--mainMethod.invoke(null, childArgs.toArray)
總結: spark-submit啟動了一個spark自己的submit程序,通過反射調用我們自定義的spark程序
=====================================================
Executor跟Driver通信過程源碼分析
org.apache.executor.CoarseGrainedExecutorBackend
  |--def main()
    |--//解析一大堆參數
    |--//調用run方法
    |--run(....)
  |--def run()
    |--//在executor里創建ActorSystem
    |--val fetcher = AkkaUtils.createActorSystem(...)
    |--//跟Driver建立連接
    |--env.actorSystem.actorOf(new CoarseGrainedExecutorBackend)
  |--def preStart()
    |--//Driver建立連接,注冊exectuor
    |--.....
  |--def receiveWithLogging
    |--//Driver反饋注冊成功
    |--case RegisteredExecutor
      |--//創建Executor實例,執行業務邏輯
      |--executor = new Executor(....)
Executor
|--//初始化線程池
|--val threadPool = Utils.newDaemonCachedThreadPoll()
總結:Executor啟動后,創建actor向driver注冊,創建Executor實例執行業務邏輯
=====================================================
任務提交流源碼分析,DAScheduler執行過程
sc.textFile-->hadoopFile-->hadoopRDD-->MapParitionsRDD-->shuffleRDD
rdd.saveAsTextFile()-->MapPartitionsRDD
Driver端提交任務,執行self.context.runJob(....)

class SparkContext
  |--def runJob()
    |--//DAGScheduler切分Stage,轉成TaskSet給TaskScheduler再提交給Executor
    |--DAGScheduler.runJob(.....)
class DAGScheduler
  |--//runjob切分stage
  |--def runJob()
    |--//調用submitJob返回一個回調器
    |--val waiter = submitJob(rdd, ...)
    |--//進行模式匹配
    |--waiter.awaitResult() match
      |--case JobSuccesded
      |--case JobFailed
  |--def submitJob(rdd, ...)
    |--//將數據封裝到事件中放入eventProcessLoop的阻塞隊列中
    |--eventProcessLoop.post(JobSubmitted(...))
  |--val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
class DAGSchedulerEventProcessLoop extends EventLoop
  |--def onReceive()
    |--//提交計算任務
    |--case JobSubmitted(jobId,...)
      |--//調用DAGScheduler的handleJobSubmitted方法處理
      |--dagScheduler.handleJobSubmitted(jobId,...)
  |--//切分stage
  |--def handleJobSubmitted(jobId,...)
    |--★★★劃分stage
    |--finalStage = newStage(finalRDD, partitons.size, None, jobId, ...)
    |--//開始提交Stage
    |--submitStage(finalStage)
  |--def submitStage(finalStage)
    |--//獲取父stage
    |--val missing = getMissingParentStages(stage).sortBy(_.id)
    |--if(missing == null){
        //提交前面的stage
        submitMissingTasks(stage, jobId.get)
      }else{
        //有父stage,遞歸執行本方法
        for(parent <- missing){
          submitStage(parent)
        }
        |--//放進waitingStages
        |--waitingStages += stage
      }
  |--def submitMissingTasks(stage, jobId.get)
    |--//將stage創建成多個Task,分為shuffleMapTask和ResultTask
    |--new ShuffleMapTask(stage.id, taskBinary, part, locs)
    |--new ResultTask(stage.id, taskBinary, part, locs, id)
    |--//★★★調用taskScheduler的submitTasks方法來提交TaskSet
    |--taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, ..., stage.jobId, properties))
  |--def newStage
    |--//獲取父stage
    |--val parentStages = getParentStages(rdd, jobId)
    |--val stage = new Stage(...,parentStages,...)
  |--def getParentStages
    |--//使用了3個數據結構來處理父類stage
    |--val parents = new HashSet[Stage]
    |--val visited = new HashSet[RDD]
    |--val waitingForVisit = new Stack[RDD]
    |--//思路:通過遞歸,壓棧彈棧
    |--//見最后源碼
  |--def getMissingParentStages
    |--//與getParentStages一樣的數據結構找父stage
class EventLoop
  |--//阻塞隊列
  |--val eventQueue = new LinkedBlockingDeque()
  |--//不停的取事件
  |--val eventThread = new Thread(name){
      def run(){
        while(){
          val event = eventQueue.take()
          onReceive(event)
        }
      }
    }

總結:Action算子會執行sparkContext里的runJob(),再調用DAGScheduler的runJob(),
通過2個HashSet和1個Stack劃分stage,然后提交stage

=======================劃分stage源碼==============================
/**
* Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
* of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
* jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
* directly.
*/
//★★★用于創建Stage
private def newStage(
  rdd: RDD[_],
  numTasks: Int,
  shuffleDep: Option[ShuffleDependency[_, _, _]],
  jobId: Int,
  callSite: CallSite)
  : Stage =
{
  //★★★獲取他的父Stage
  val parentStages = getParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}
------------------------------------------------------
/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided jobId if they haven't already been created with a lower jobId.
*/
//TODO 用戶獲取父Stage
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
  val parents = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]
  // We are manually maintaining a stack here to prevent StackOverflowError
  // caused by recursively visiting
  val waitingForVisit = new Stack[RDD[_]]
  def visit(r: RDD[_]) {
    if (!visited(r)) {
      visited += r
      // Kind of ugly: need to register RDDs with the cache here since
      // we can't do it in its constructor because # of partitions is unknown
    for (dep <- r.dependencies) {
      dep match {
        case shufDep: ShuffleDependency[_, _, _] =>
          //★★★把寬依賴傳進去,獲得父Stage
          parents += getShuffleMapStage(shufDep, jobId)
        case _ =>
          waitingForVisit.push(dep.rdd)
        }
      }
    }
  }
  waitingForVisit.push(rdd)
  while (!waitingForVisit.isEmpty) {
    visit(waitingForVisit.pop())
  }
  parents.toList
}
------------------------------------------------------
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
*/
private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
  shuffleToMapStage.get(shuffleDep.shuffleId) match {
    case Some(stage) => stage
    case None =>
      // We are going to register ancestor shuffle dependencies
      registerShuffleDependencies(shuffleDep, jobId)
      // Then register current shuffleDep

      val stage =
      //★★★創建服父Stage
        newOrUsedStage(
          shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
          shuffleDep.rdd.creationSite)
      shuffleToMapStage(shuffleDep.shuffleId) = stage

      stage
    }
  }
------------------------------------------------------
/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedStage(
  rdd: RDD[_],
  numTasks: Int,
  shuffleDep: ShuffleDependency[_, _, _],
  jobId: Int,
  callSite: CallSite)
  : Stage =
{
  //★★★遞歸
  val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
  if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
    val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    for (i <- 0 until locs.size) {
      stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
    }
    stage.numAvailableOutputs = locs.count(_ != null)
  } else {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
  }
  stage
}


文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 大師兄 的頭像
    大師兄

    IT工程師數位筆記本

    大師兄 發表在 痞客邦 留言(0) 人氣()