Strony

poniedziałek, 25 stycznia 2010

SVN hooks once again, this time tags, branches, log messages etc.

This one hook I wrote quite long ago is forcing users of Subversion repository to:

  • provide log message on every commit (class logChecker)
  • block removing of "/tags", "/branches" and "/trunk" directories from repo (class pathChecker)
  • forcing branch and tag naming convention
  • making subdirectories under /tags read-only


My company is running Subversion for storing all software packages and our repo hasn't got the default structure (/tags, /trunk and /branches as a top directories) — instead of that every package has got it's own /trunk, /tags and /branches subdirectories. So the repository structure is something like this:

/Path/
PackageA/
trunk/
src/
doc/
tags/
PackageA-00-00-00/
src/
doc/
...
branches/
PackageA-00-00-00-branch/
src/
doc/

PackageB/
...
/OtherPath/
PackageC/
...


Looks rather complicated, but it isn't. ;)

Also we've got tags and branches naming convention. Every tag should be made of package name, then a three (or four for tags made over a branch) groups of digits, e.g.: PackageName-ii-jj-kk or PackageName-ii-jj-kk-ll.
Every branch name is very similar to tag name, except there is additional (-branch) string at the end.

So here is the hook itself:
#!/usr/bin/env python
##
# @file svn-policy.py
# @author Krzysztof Daniel Ciba (Krzysztof.Ciba@NOSPAMgmail.com)
# @date 06/05/2009
# @brief pre-commit hook
#
import os
import sys
import getopt
try:
my_getopt = getopt.gnu_getopt
except AttributeError:
my_getopt = getopt.getopt
import re
import svn
import svn.fs
import svn.repos
import svn.core
import StringIO
from subprocess import Popen, PIPE
## txnChecker
# @class txnChecker
# @author Krzysztof Daniel Ciba (Krzysztof.Ciba@NOSPAMgmail.com)
# @brief base class for txn hook
class txnChecker( object ):
## c'tor
# @param self "Me, myself and Irene"
# @param txn txn
# @param fs fs
def __init__( self, txn, fs ):
self.txn = txn
self.fs = fs
## get txn property
def txnProp( self, prop_name ):
return svn.fs.svn_fs_txn_prop( self.txn, prop_name )
## block
def block( self, msg ):
sys.stderr.write( str( msg ) )
return False
## allow
def allow( self ):
return True
## logChecker
# @class logChecker
# @author Krzysztof Daniel Ciba (Krzysztof.Ciba@NOSPAMgmali.com)
# @brief blocks commit if empty "svn:log" property
class logChecker( txnChecker ):
## c'tor
# @param self "Me, myself and Irene"
# @param txn
def __init__( self, txn, fs ):
txnChecker.__init__( self, txn, fs )
## check if svn:log isn't empty
# @param self "Me, myself and Irene"
def check( self ):
logMsg = self.txnProp( "svn:log" )
if ( logMsg and logMsg.strip() != "" ): return self.allow()
return self.block( "\nSVN POLICY: Cannot enter in empty commit message.\n" )
## pathChecker
# @class pathChecker
# @author Krzysztof Daniel Ciba (Krzysztof.Ciba@NOSPAMgmail.com)
# @brief blocks commit if changed path outside "trunk/tags/branches" for ordinary user
class pathChecker( txnChecker ):
## c'tor
# @param self "Me, myself and Irene"
# @param txn_name
def __init__( self, txn, fs ):
txnChecker.__init__( self, txn, fs )
## checker
# @param self "Me, myself and Irene"
def check( self ):
rePath = re.compile("(/tags)|(/branches)|(/trunk)" )
if ( self.txnProp("svn:author") == "librarian" ): return self.allow()
self.txn_root = svn.fs.svn_fs_txn_root( self.txn )
self.changed_paths = svn.fs.paths_changed( self.txn_root )
for path, change in self.changed_paths.iteritems():
if ( not rePath.search( path ) ):
msg = "\nSVN POLICY: invalid path '" + path + "' in commit.\n\n"
msg += "Only librarian could modify paths outside package's '/tags', '/branches' or '/trunk' subdirectories.\n\n"
msg += "Contact librarian ('librarian@mail') for details.\n"
return self.block( msg )
if ( change.change_kind == svn.fs.path_change_delete ):
if ( path.endswith("/tags/") or path.endswith("/tags") ):
msg = "\nSVN POLICY: Only librarian is able to remove package's '/tags' directory!\n\n"
msg += "This directory is required for normal package operations.\n\n"
msg += "Contact librarian ('librarian@mail') for details.\n"
return self.block( msg )
if ( path.endswith("/branches/") or path.endswith("/branches") ):
msg = "\nSVN POLICY: Only librarian is able to remove package's '/branches' directory!\n\n"
msg += "This directory is required for normal package operations.\n\n"
msg += "Contact librarian ('librariran@mail') for details.\n"
return self.block( msg )
if ( path.endswith("/trunk/") or path.endswith("/trunk") ):
msg = "\nSVN POLICY: Only librarian is able to remove package's '/trunk' directory!\n\n"
msg += "This directory is required for normal package operations.\n\n"
msg += "Contact librarian ('librarian@mail') for details.\n"
return self.block( msg )
return self.allow()
## branchChecker
# @class brancheChecker
# @author Krzysztof Daniel Ciba (Krzysztof.Ciba@NOSPAMgmail.com)
# @brief blocks commit to branch if wrong branch name
class branchChecker( txnChecker ):
path_is_in_branches = re.compile( "(?i)\/branches\/" )
path_is_a_branch = re.compile( "(?i)\/branches\/[^\/]+\/?$" )
# modify this RE to match your branch naming convention,
# this one is PackageName-ii-jj-kk-branch
is_branch_name_valid = re.compile( "(^[A-Za-z0-9_]+-[0-9]{2}-[0-9]{2}-[0-9]{2}-branch$)" )
## c'tor
# @param self "Me, myself and Irene"
# @param txn txn
# @param fs fs
# @param repos_path path to the repo in server fs
def __init__( self, txn, fs, repos_path ):
txnChecker.__init__( self, txn, fs )
self.repos_path = repos_path
## check
# @param self "Me, myself and Irene"
def check( self ):
txn_root = svn.fs.svn_fs_txn_root( self.txn )
changed_paths = svn.fs.paths_changed( txn_root )
for path, change in changed_paths.iteritems():
if ( self.path_is_in_branches.search( path ) ):
if ( self.path_is_a_branch.search( path ) ):
if ( change.change_kind == svn.fs.path_change_delete and
self.txnProp("svn:author") not in ["librarian", "root" ] ):
msg = "\nSVN POLICY: Only an administrator can delete a branch.\n\n"
msg += "Contact librarian (librarian@mail) for details."
return self.block( msg )
if ( change.change_kind == svn.fs.path_change_add ):
words = path.split("/branches/")
if ( len(words) == 2 ):
new_branch = words[-1]
package_name_from_path = words[0].split("/")[-1]
package_name_from_branch = new_branch.split("-")[0]
if ( self.txnProp("svn:author") not in ["librarian", "root" ] ):
if ( package_name_from_branch != package_name_from_path or
not self.is_branch_name_valid.match( new_branch ) ):
msg = "\nSVN POLICY: invalid branch name '%s' for package %s\n\n" % ( new_branch , package_name_from_path )
msg += "The branch naming policy requires a branch of form "
msg += "'%s-ii-jj-kk-branch, where i,j and k are digits.\n" % package_name_from_path
return self.block( msg )
return self.allow()
## tagChecker
# @class tagChecker
# @author Krzysztof Daniel Ciba (Krzysztof.Ciba@NOSPAgmail.com)
# @brief tag checking hook
class tagChecker( txnChecker ):
path_is_in_tags = re.compile( "(?i)\/tags\/" )
path_is_a_tag = re.compile( "(?i)\/tags\/[^\/]+\/?$" )
# tag naming policy RE
# modify it to match your own
# this one is
# PackageName-ii-jj-kk or PackageName-ii-jj-kk-ll
is_tag_name_valid = re.compile("(^[A-Za-z0-9_]+-[0-9]{2}-[0-9]{2}-[0-9]{2}$)|(^[A-Za-z0-9_]+-[0-9]{2}-[0-9]{2}-[0-9]{2}-[0-9]{2}$)")
## c'tor
# @param self "Me, myself and Irene"
# @param txn transacation
# @param fs fs
# @param repos_path path in local file system to the repository
def __init__( self, txn, fs, repos_path ):
txnChecker.__init__( self, txn, fs )
self.repos_path = repos_path
## check tags
# @param self "Me, myself and Irene"
def check( self ):
txn_root = svn.fs.svn_fs_txn_root( self.txn )
changed_paths = svn.fs.paths_changed( txn_root )
for path, change in changed_paths.iteritems():
if ( self.path_is_in_tags.search( path ) ):
tagPath = path.split("/")
tagPath = "/".join( tagPath[0:tagPath.index("tags")+2])
self.pool = svn.core.svn_pool_create(None)
self.repos_ptr = svn.repos.open( self.repos_path, self.pool )
self.fs_ptr = svn.repos.fs( self.repos_ptr )
youngest_rev = svn.fs.svn_fs_youngest_rev( self.fs_ptr )
self.rev_root = svn.fs.revision_root( self.fs_ptr, youngest_rev, self.pool)
kind = svn.fs.svn_fs_check_path( self.rev_root, tagPath )
if ( kind != 0 and self.txnProp("svn:author") not in [ "librarian", "root"] ):
msg = "\nSVN POLICY: Unable to modify %s path.\n\n" % tagPath
msg += "It is within a '/tags' directory and tags are read-only.\n\n"
msg += "Possible reasons:\n"
msg += "[1] Maybe you forget to switch the context (svn switch) after check out of a tag?\n"
msg += "[2] This tag already exists in repository and you have tried to create it once again?\n\n"
msg += "Contact librarian ('librarian@mail') for details."
return self.block( msg )
if ( self.path_is_a_tag.search( path ) ):
#log.write(path + " is a tag!\n")
if ( change.change_kind == svn.fs.path_change_add ):
words = path.split("/tags/")
if ( len(words) == 2 ):
new_tag = words[-1]
package_name_from_path = words[0].split("/")[-1]
package_name_from_tag = new_tag.split("-")[0]
if ( self.txnProp("svn:author") not in [ "librarian", "root" ] ):
if ( package_name_from_tag != package_name_from_path or
not self.is_tag_name_valid.match( new_tag ) ):
msg = "\nSVN POLICY: invalid tag name '%s' for package %s\n\n" % ( new_tag , package_name_from_path )
msg += "The tag naming policy requires a tag of form:\n"
msg += " [01] %s-ii-jj-kk (regular tags)\n" % package_name_from_path
msg += " [02] %s-ii-jj-kk-ll (tags created on branches)\n" % package_name_from_path
msg += "where i,j,k and l are digits.\n"
return self.block( msg )
if ( change.change_kind == svn.fs.path_change_delete and
self.txnProp("svn:author") not in [ "alibrari", "root" ] ):
msg = "\nSVN POLICY: Only an administrator can delete a tag.\n\n"
msg += "You have tried to remove " + tagPath + " tag from repository, if you need so\n"
msg += "contact librarian ('librarian@mail') for details."
return self.block( msg )
return self.allow()
## write usage eand exit
def usage_and_exit(error_msg=None):
import os.path
stream = error_msg and sys.stderr or sys.stdout
if error_msg:
stream.write("ERROR: %s\n\n" % error_msg)
stream.write("USAGE: %s -t TXN_NAME REPOS\n"
% (os.path.basename(sys.argv[0])))
sys.exit(error_msg and 1 or 0)
## start processing
if __name__ == '__main__':
repos_path = None
txn_name = None
try:
opts, args = my_getopt( sys.argv[1:], 't:h?', ["help"])
except:
usage_and_exit("problem processing arguments / options.")
for opt, value in opts:
if opt == '--help' or opt == '-h' or opt == '-?':
usage_and_exit()
elif opt == '-t':
txn_name = value
else:
usage_and_exit("unknown option '%s'." % opt)
if txn_name is None:
usage_and_exit("must provide -t argument")
if len(args) != 1:
usage_and_exit("only one argument allowed (the repository).")
repos_path = svn.core.svn_path_canonicalize(args[0])
fs = svn.repos.svn_repos_fs( svn.repos.svn_repos_open(repos_path) )
txn = svn.fs.svn_fs_open_txn(fs, txn_name)
checks = [ logChecker( txn, fs ),
pathChecker( txn, fs ),
branchChecker( txn, fs, repos_path ),
tagChecker( txn, fs, repos_path ) ]
for check in checks:
if ( not check.check() ): sys.exit(1)
sys.exit(0)
view raw svn-policy.py hosted with ❤ by GitHub


If you want to use this one, you have to modify it a little:

  • replace 'librarian' or/and 'root' account name with your own repository admin
  • put repository admin mail to block messages - replace this fake 'librarian@mail' address
  • change regexp ffor tags and branches to match your criteria
  • put svn-policy.py script into your repository hook directory ($REPO/hooks)
  • modify "$REPO/hooks/pre-commit" script to switch this one on

    #!/bin/sh
    REPOS="$1"
    TXN="$2"
    SVNPOLICY=/path/to/repo/hooks/svn-policy.py # modify this line
    #SVN policy
    $SVNPOLICY -t "$TXN" "$REPOS" || exit 1
    # All checks passed, so allow the commit.
    exit 0



Maybe the code needs some cleaning and better structure but anyway happy using, comments and questions are welcome!

Cheers,
Krzysztof

Brak komentarzy:

Prześlij komentarz