Source code for gocd.api.pipeline

import time
from gocd.api.response import Response
from gocd.api.endpoint import Endpoint
from gocd.api.artifact import Artifact
from gocd.api.stage import Stage

__all__ = ['Pipeline']


[docs]class Pipeline(Endpoint): base_path = 'go/api/pipelines/{id}' id = 'name' #: The result of a job/stage has been finalised when these values are set final_results = ['Passed', 'Failed'] def __init__(self, server, name): """A wrapper for the `Go pipeline API`__ .. __: http://api.go.cd/current/#pipelines Args: server (Server): A configured instance of :class:gocd.server.Server name (str): The name of the pipeline we're working on """ self.server = server self.name = name
[docs] def history(self, offset=0): """Lists previous instances/runs of the pipeline See the `Go pipeline history documentation`__ for example responses. .. __: http://api.go.cd/current/#get-pipeline-history Args: offset (int, optional): How many instances to skip for this response. Returns: Response: :class:`gocd.api.response.Response` object """ return self._get('/history/{offset:d}'.format(offset=offset or 0))
[docs] def release(self): """Releases a previously locked pipeline See the `Go pipeline release lock documentation`__ for example responses. .. __: http://api.go.cd/current/#releasing-a-pipeline-lock Returns: Response: :class:`gocd.api.response.Response` object """ return self._post('/releaseLock', headers={"Confirm": True})
#: This is an alias for :meth:`release` unlock = release
[docs] def pause(self, reason=''): """Pauses the current pipeline See the `Go pipeline pause documentation`__ for example responses. .. __: http://api.go.cd/current/#pause-a-pipeline Args: reason (str, optional): The reason the pipeline is being paused. Returns: Response: :class:`gocd.api.response.Response` object """ return self._post('/pause', headers={"Confirm": True}, pauseCause=reason)
[docs] def unpause(self): """Unpauses the pipeline See the `Go pipeline unpause documentation`__ for example responses. .. __: http://api.go.cd/current/#unpause-a-pipeline Returns: Response: :class:`gocd.api.response.Response` object """ return self._post('/unpause', headers={"Confirm": True})
[docs] def status(self): """Returns the current status of this pipeline See the `Go pipeline status documentation`__ for example responses. .. __: http://api.go.cd/current/#get-pipeline-status Returns: Response: :class:`gocd.api.response.Response` object """ return self._get('/status')
[docs] def instance(self, counter=None): """Returns all the information regarding a specific pipeline run See the `Go pipeline instance documentation`__ for examples. .. __: http://api.go.cd/current/#get-pipeline-instance Args: counter (int): The pipeline instance to fetch. If falsey returns the latest pipeline instance from :meth:`history`. Returns: Response: :class:`gocd.api.response.Response` object """ if not counter: history = self.history() if not history: return history else: return Response._from_json(history['pipelines'][0]) return self._get('/instance/{counter:d}'.format(counter=counter))
[docs] def schedule(self, variables=None, secure_variables=None, materials=None, return_new_instance=False, backoff_time=1.0): """Schedule a pipeline run Aliased as :meth:`run`, :meth:`schedule`, and :meth:`trigger`. Args: variables (dict, optional): Variables to set/override secure_variables (dict, optional): Secure variables to set/override materials (dict, optional): Material revisions to be used for this pipeline run. The exact format for this is a bit iffy, have a look at the official `Go pipeline scheduling documentation`__ or inspect a call from triggering manually in the UI. return_new_instance (bool): Returns a :meth:`history` compatible response for the newly scheduled instance. This is primarily so users easily can get the new instance number. **Note:** This is done in a very naive way, it just checks that the instance number is higher than before the pipeline was triggered. backoff_time (float): How long between each check for :arg:`return_new_instance`. .. __: http://api.go.cd/current/#scheduling-pipelines Returns: Response: :class:`gocd.api.response.Response` object """ scheduling_args = dict( variables=variables, secure_variables=secure_variables, material_fingerprint=materials, headers={"Confirm": True}, ) scheduling_args = dict((k, v) for k, v in scheduling_args.items() if v is not None) # TODO: Replace this with whatever is the official way as soon as gocd#990 is fixed. # https://github.com/gocd/gocd/issues/990 if return_new_instance: pipelines = self.history()['pipelines'] if len(pipelines) == 0: last_run = None else: last_run = pipelines[0]['counter'] response = self._post('/schedule', ok_status=202, **scheduling_args) if not response: return response max_tries = 10 while max_tries > 0: current = self.instance() if not last_run and current: return current elif last_run and current['counter'] > last_run: return current else: time.sleep(backoff_time) max_tries -= 1 # I can't come up with a scenario in testing where this would happen, but it seems # better than returning None. return response else: return self._post('/schedule', ok_status=202, **scheduling_args)
#: This is an alias for :meth:`schedule` run = schedule #: This is an alias for :meth:`schedule` trigger = schedule
[docs] def artifact(self, counter, stage, job, stage_counter=1): """Helper to instantiate an :class:`gocd.api.artifact.Artifact` object Args: counter (int): The pipeline counter to get the artifact for stage: Stage name job: Job name stage_counter: Defaults to 1 Returns: Artifact: :class:`gocd.api.artifact.Artifact` object """ return Artifact(self.server, self.name, counter, stage, job, stage_counter)
# TODO: It would be nice if this could stream the output as it happens. # Currently it's built with the assumption that this is done after all output has finished.
[docs] def console_output(self, instance=None): """Yields the output and metadata from all jobs in the pipeline Args: instance: The result of a :meth:`instance` call, if not supplied the latest of the pipeline will be used. Yields: tuple: (metadata (dict), output (str)). metadata contains: - pipeline - pipeline_counter - stage - stage_counter - job - job_result """ if instance is None: instance = self.instance() for stage in instance['stages']: for job in stage['jobs']: if job['result'] not in self.final_results: continue artifact = self.artifact( instance['counter'], stage['name'], job['name'], stage['counter'] ) output = artifact.get('cruise-output/console.log') yield ( { 'pipeline': self.name, 'pipeline_counter': instance['counter'], 'stage': stage['name'], 'stage_counter': stage['counter'], 'job': job['name'], 'job_result': job['result'], }, output.body )
[docs] def stage(self, name, pipeline_counter=None): """Helper to instantiate a :class:`gocd.api.stage.Stage` object Args: name: The name of the stage pipeline_counter: Returns: """ return Stage( self.server, pipeline_name=self.name, stage_name=name, pipeline_counter=pipeline_counter, )