Package xmpp :: Module filetransfer
[hide private]
[frames] | no frames]

Source Code for Module xmpp.filetransfer

  1  ##   filetransfer.py  
  2  ## 
  3  ##   Copyright (C) 2004 Alexey "Snake" Nezhdanov 
  4  ## 
  5  ##   This program is free software; you can redistribute it and/or modify 
  6  ##   it under the terms of the GNU General Public License as published by 
  7  ##   the Free Software Foundation; either version 2, or (at your option) 
  8  ##   any later version. 
  9  ## 
 10  ##   This program is distributed in the hope that it will be useful, 
 11  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  ##   GNU General Public License for more details. 
 14   
 15  # $Id: filetransfer.py,v 1.6 2004/12/25 20:06:59 snakeru Exp $ 
 16   
 17  """ 
 18  This module contains IBB class that is the simple implementation of JEP-0047. 
 19  Note that this is just a transport for data. You have to negotiate data transfer before 
 20  (via StreamInitiation most probably). Unfortunately SI is not implemented yet. 
 21  """ 
 22   
 23  from protocol import * 
 24  from dispatcher import PlugIn 
 25  import base64 
 26   
27 -class IBB(PlugIn):
28 """ IBB used to transfer small-sized data chunk over estabilished xmpp connection. 29 Data is split into small blocks (by default 3000 bytes each), encoded as base 64 30 and sent to another entity that compiles these blocks back into the data chunk. 31 This is very inefficiend but should work under any circumstances. Note that 32 using IBB normally should be the last resort. 33 """
34 - def __init__(self):
35 """ Initialise internal variables. """ 36 PlugIn.__init__(self) 37 self.DBG_LINE='ibb' 38 self._exported_methods=[self.OpenStream] 39 self._streams={} 40 self._ampnode=Node(NS_AMP+' amp',payload=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})])
41
42 - def plugin(self,owner):
43 """ Register handlers for receiving incoming datastreams. Used internally. """ 44 self._owner.RegisterHandlerOnce('iq',self.StreamOpenReplyHandler) # Move to StreamOpen and specify stanza id 45 self._owner.RegisterHandler('iq',self.IqHandler,ns=NS_IBB) 46 self._owner.RegisterHandler('message',self.ReceiveHandler,ns=NS_IBB)
47
48 - def IqHandler(self,conn,stanza):
49 """ Handles streams state change. Used internally. """ 50 typ=stanza.getType() 51 self.DEBUG('IqHandler called typ->%s'%typ,'info') 52 if typ=='set' and stanza.getTag('open',namespace=NS_IBB): self.StreamOpenHandler(conn,stanza) 53 elif typ=='set' and stanza.getTag('close',namespace=NS_IBB): self.StreamCloseHandler(conn,stanza) 54 elif typ=='result': self.StreamCommitHandler(conn,stanza) 55 elif typ=='error': self.StreamOpenReplyHandler(conn,stanza) 56 else: conn.send(Error(stanza,ERR_BAD_REQUEST)) 57 raise NodeProcessed
58
59 - def StreamOpenHandler(self,conn,stanza):
60 """ Handles opening of new incoming stream. Used internally. """ 61 """ 62 <iq type='set' 63 from='romeo@montague.net/orchard' 64 to='juliet@capulet.com/balcony' 65 id='inband_1'> 66 <open sid='mySID' 67 block-size='4096' 68 xmlns='http://jabber.org/protocol/ibb'/> 69 </iq> 70 """ 71 err=None 72 sid,blocksize=stanza.getTagAttr('open','sid'),stanza.getTagAttr('open','block-size') 73 self.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid,blocksize),'info') 74 try: blocksize=int(blocksize) 75 except: err=ERR_BAD_REQUEST 76 if not sid or not blocksize: err=ERR_BAD_REQUEST 77 elif sid in self._streams.keys(): err=ERR_UNEXPECTED_REQUEST 78 if err: rep=Error(stanza,err) 79 else: 80 self.DEBUG("Opening stream: id %s, block-size %s"%(sid,blocksize),'info') 81 rep=Protocol('iq',stanza.getFrom(),'result',stanza.getTo(),{'id':stanza.getID()}) 82 self._streams[sid]={'direction':'<'+str(stanza.getFrom()),'block-size':blocksize,'fp':open('/tmp/xmpp_file_'+sid,'w'),'seq':0,'syn_id':stanza.getID()} 83 conn.send(rep)
84
85 - def OpenStream(self,sid,to,fp,blocksize=3000):
86 """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to', 87 the file object containing info for send 'fp'. Also the desired blocksize can be specified. 88 Take into account that recommended stanza size is 4k and IBB uses base64 encoding 89 that increases size of data by 1/3.""" 90 if sid in self._streams.keys(): return 91 if not JID(to).getResource(): return 92 self._streams[sid]={'direction':'|>'+to,'block-size':blocksize,'fp':fp,'seq':0} 93 self._owner.RegisterCycleHandler(self.SendHandler) 94 syn=Protocol('iq',to,'set',payload=[Node(NS_IBB+' open',{'sid':sid,'block-size':blocksize})]) 95 self._owner.send(syn) 96 self._streams[sid]['syn_id']=syn.getID() 97 return self._streams[sid]
98
99 - def SendHandler(self,conn):
100 """ Send next portion of data if it is time to do it. Used internally. """ 101 self.DEBUG('SendHandler called','info') 102 for sid in self._streams.keys(): 103 stream=self._streams[sid] 104 if stream['direction'][:2]=='|>': cont=1 105 elif stream['direction'][0]=='>': 106 chunk=stream['fp'].read(stream['block-size']) 107 if chunk: 108 datanode=Node(NS_IBB+' data',{'sid':sid,'seq':stream['seq']},base64.encodestring(chunk)) 109 stream['seq']+=1 110 if stream['seq']==65536: stream['seq']=0 111 conn.send(Protocol('message',stream['direction'][1:],payload=[datanode,self._ampnode])) 112 else: 113 """ notify the other side about stream closing 114 notify the local user about sucessfull send 115 delete the local stream""" 116 conn.send(Protocol('iq',stream['direction'][1:],'set',payload=[Node(NS_IBB+' close',{'sid':sid})])) 117 conn.Event(self.DBG_LINE,'SUCCESSFULL SEND',stream) 118 del self._streams[sid] 119 self._owner.UnregisterCycleHandler(self.SendHandler) 120 121 """ 122 <message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'> 123 <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'> 124 qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ 125 WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu 126 IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P 127 AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH 128 kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA 129 </data> 130 <amp xmlns='http://jabber.org/protocol/amp'> 131 <rule condition='deliver-at' value='stored' action='error'/> 132 <rule condition='match-resource' value='exact' action='error'/> 133 </amp> 134 </message> 135 """
136
137 - def ReceiveHandler(self,conn,stanza):
138 """ Receive next portion of incoming datastream and store it write 139 it to temporary file. Used internally. 140 """ 141 sid,seq,data=stanza.getTagAttr('data','sid'),stanza.getTagAttr('data','seq'),stanza.getTagData('data') 142 self.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid,seq),'info') 143 try: seq=int(seq); data=base64.decodestring(data) 144 except: seq=''; data='' 145 err=None 146 if not sid in self._streams.keys(): err=ERR_ITEM_NOT_FOUND 147 else: 148 stream=self._streams[sid] 149 if not data: err=ERR_BAD_REQUEST 150 elif seq<>stream['seq']: err=ERR_UNEXPECTED_REQUEST 151 else: 152 self.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid,stream['fp'].tell(),len(data)),'ok') 153 stream['seq']+=1 154 stream['fp'].write(data) 155 if err: 156 self.DEBUG('Error on receive: %s'%err,'error') 157 conn.send(Error(Iq(to=stanza.getFrom(),frm=stanza.getTo(),payload=[Node(NS_IBB+' close')]),err,reply=0))
158
159 - def StreamCloseHandler(self,conn,stanza):
160 """ Handle stream closure due to all data transmitted. 161 Raise xmpppy event specifying successfull data receive. """ 162 sid=stanza.getTagAttr('close','sid') 163 self.DEBUG('StreamCloseHandler called sid->%s'%sid,'info') 164 if sid in self._streams.keys(): 165 conn.send(stanza.buildReply('result')) 166 conn.Event(self.DBG_LINE,'SUCCESSFULL RECEIVE',self._streams[sid]) 167 del self._streams[sid] 168 else: conn.send(Error(stanza,ERR_ITEM_NOT_FOUND))
169
170 - def StreamBrokenHandler(self,conn,stanza):
171 """ Handle stream closure due to all some error while receiving data. 172 Raise xmpppy event specifying unsuccessfull data receive. """ 173 syn_id=stanza.getID() 174 self.DEBUG('StreamBrokenHandler called syn_id->%s'%syn_id,'info') 175 for sid in self._streams.keys(): 176 stream=self._streams[sid] 177 if stream['syn_id']==syn_id: 178 if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream) 179 else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream) 180 del self._streams[sid]
181
182 - def StreamOpenReplyHandler(self,conn,stanza):
183 """ Handle remote side reply about is it agree or not to receive our datastream. 184 Used internally. Raises xmpppy event specfiying if the data transfer 185 is agreed upon.""" 186 syn_id=stanza.getID() 187 self.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn_id,'info') 188 for sid in self._streams.keys(): 189 stream=self._streams[sid] 190 if stream['syn_id']==syn_id: 191 if stanza.getType()=='error': 192 if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream) 193 else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream) 194 del self._streams[sid] 195 elif stanza.getType()=='result': 196 if stream['direction'][0]=='|': 197 stream['direction']=stream['direction'][1:] 198 conn.Event(self.DBG_LINE,'STREAM COMMITTED',stream) 199 else: conn.send(Error(stanza,ERR_UNEXPECTED_REQUEST))
200