IBMStreams/streamsx.hdfs

Support dynamic filename writing and automatic deletion of files at the source post files transfer into HDFS

AayushaY opened this issue · 7 comments

  1. Dynamic filename - The HDFS2FIleSink operator should use the original file name when it writes to HDFS. That is, abc.json and xyz.json at the source(in my case, where FTPReader reads the files) should become abc.json and xyz.json at the destination(HDFS). If there are multiple files at the source. Each of them should get written separately and as their original filenames.

  2. Automatic deletion of files post transfer – An operator that deletes the files/contents at the source(where FTPReader reads the files) once it has reached the destination(HDFS).

I was wondering if these two functionalities above could be incorporated into the HDFS2FileSink Operator. I tried using fileAttributeName for the implementation of first feature. It didn't work out for me. Copying my code below for reference:

usecom.ibm.streamsx.inet.ftp::*;

usecom.ibm.streamsx.hdfs::HDFS2FileSink;

compositeSFTP2HDFS {

graph

// a trigger stream for the ftp directory scan

         stream<int32count> TriggerStream= Beacon() {

                param

                      initDelay : 2.0;

                      iterations : 1;

                      period : 10.0;

                outputTriggerStream: count = (int32)IterationCount();

                configplacement : partitionColocation("DIR");

         }


  stream<rstringline, rstringfileName, uint64size, rstringdate, rstringuser, booleanisFile, uint32transferCount, uint32failureCount, uint64bytesTransferred, float64speed> SignalOutputasOUT= FTPReader(TriggerStream)

{

param

  protocol : sftp;

  isDirReader : true;

  host : "<host>";

  path : "Inbox/";

  username : "<uid>";

  password : "<pwd>";

outputOUT:

  line = Line(),

  fileName = FileName(),

  size = FileSize(),

  date = FileDate(),

  user = FileUser(),

  isFile = IsFile(),

 transferCount = TransferCount(),

  failureCount = TransferFailureCount(),

  bytesTransferred = NoBytesTransferred();

  bytesTransferred = BytesTransferred(),

  speed = TransferSpeed();

}

stream FileContentsasOUT1= FTPReader(SignalOutputasIN)

{

  param

         protocol : sftp;

         isDirReader : false;

         host : "<host>";

         path : "Inbox/";

         filename : IN.fileName;

         username : "<uid>";

         password : "<pwd>";



  outputOUT1:

         line = Line();

}

() asWriter= HDFS2FileSink(FileContents SignalOutput)

{

   param



       authKeytab    : "/path/xyz.keytab";

       authPrincipal : "xyz@kerberosRealm";

       file          : "/path/file.json";

       vmArg         : "-Djava.security.krb5.conf=/path/krb5.conf";                

}

}

Let me know what you think.

We have improved some functionality in HDFS2FileSink in version 4.3.0:
https://github.com/IBMStreams/streamsx.hdfs/releases/tag/v4.3.0
Please check it.
You can now use the new parameter tempFile.
The tempFile parameter specifies the name of the file that the operator writes to. When the file is closed the file is renamed to the final filename defined by the file parameter.

I will check your SPL code.

We will have to wait for this updated toolkit until IBM adds it to the formally supported toolkits bundle my team can install. Is there a way this could be added to that formally supported bundle? Also, do you have any thoughts on the automatic deletion of files at source once transfer to HDFS is complete?

These two functionalities can be achieved with the existing tooling:

  1. Use parameter 'fileAttributeName' of operator HDFS2FileSink. If set, this points to an attribute containing the filename. The operator will close a file when value of this attribute changes. You should place a Custom operator between FTPReader and HDFS2FileSink with a little custom logic which translates the filename from the FTP site into the hdfs file name (set up the correct path in hdfs file system)
  2. Use the FTPCommand operator to delete the original file: The HDFS2FileSink operator has an optional output port, that issues the filename once the file was written. Take this output and instruct the FTPCommand operator to delete the file. You may have a look into the RemoteDirScanAndGet sample of the Inet toolkit. In this sample the FTPCommand operator is used to move the file after the transfer, but a delete command is very similar.

Here is an SPL sample:

You have to adapt the value of first parameters with your FTP and HDFS credentials.

/*
 * The following SPL application demonstrates how to connect to a FTP server and read files from a directory
 * And how to insert these files into a HDFS file system via HDFS2FileSink.
 * 'SignalOutput' creates a connection to the FTP server and reads the file names from a directory.
 * 'FileContents' reads the file contents and forwards the lines to the HdfsWriter.
 * Check if the FTP server is running.
 * service vsftpd status
 * 'HdfsWriter' writes all incoming lines into a file in your HDFS server in your user directory.
 * Make sure that directory defined in "HDFSPath" parameter exist and the user has permeation to write in this directory.
 * After a successfully copy of file in HDFS, 'DeleteFtpFile' deletes the file with FTP command
 * 
 * When your hadoop cluster is kerberosed, you can add $authKeytab an $authPrincipal parameter 
 * to the HDFS2FileSink.
 * Copy 'core-site.xml' file and authentication keytab from hadoop server in etc directory of your project.
 * Before you begin with submit your SPL application make sure that your keytab and principal works
 * for example: 
 * kinit -k -t etc/hdfs.headless.keytab hdfs-hdp2@HDP2.COM
 *  
 * The print streams are only to help for a better understanding of the whole streams.
 * They can be removed for production application.
 * 
 * Adapt the first parameters in this SPL file.
 * Replace only the values of these parameters with your FTP and HDFS credentials.
 */

namespace application ;

use com.ibm.streamsx.inet.ftp::* ;
use com.ibm.streamsx.hdfs::HDFS2FileSink ;

composite SFTP2HDFS
{

param
	expression<Protocol> $protocol : (Protocol)getSubmissionTimeValue("protocol", "sftp");
	expression<rstring> $host : getSubmissionTimeValue("host", "myftphost.fyre.ibm.com");
	expression<rstring> $path : getSubmissionTimeValue("path", "/Inbox/");
	expression<rstring> $username : getSubmissionTimeValue("username", "ftpuser");
	expression<rstring> $password : getSubmissionTimeValue("password", "myftppassword");
	expression<rstring> $HDFSPath : getSubmissionTimeValue("HDFSPath", "/user/hdfs/testDirectory/");
	expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hdfs.headless.keytab") ;
	expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "hdfs-hdp2@HDP2.COM") ;
	expression<rstring> $configPath : getSubmissionTimeValue("configPath", "etc") ;

	graph
		// a trigger stream for the ftp directory scan
		stream<int32 count> TriggerStream = Beacon()
		{
			param
				initDelay : 2.0 ;
				iterations : 1 ;
				period : 10.0 ;
			output
				TriggerStream : count =(int32) IterationCount() ;
			config
				placement : partitionColocation("DIR") ;
		}

		stream<rstring line, rstring fileName, uint64 size, rstring date, rstring user, boolean isFile,
			uint32 transferCount, uint32 failureCount, uint64 bytesTransferred, float64 speed> SignalOutput
			as OUT = FTPReader(TriggerStream)
		{
			param
				protocol : $protocol  ;
				isDirReader : true ;
				host : $host ;
				path : $path  ;
				username : $username  ;
				password : $password  ;
			output
				OUT : line = Line(), fileName = FileName(), size = FileSize(), date = FileDate(), user =
					FileUser(), isFile = IsFile(), transferCount = TransferCount(), failureCount =
					TransferFailureCount(), bytesTransferred = BytesTransferred(), speed = TransferSpeed() ;
		}

		stream<rstring line, rstring fileName> FileContents as OUT1 = FTPReader(SignalOutput as IN)
		{
			param
				protocol : $protocol  ;
				isDirReader : false ;
				filename : IN.fileName ;
				host : $host ;
				path : $path  ;
				username : $username  ;
				password : $password  ;
			output
				OUT1 : line = Line(), fileName = $HDFSPath + IN.fileName ;
		}
		
		() as printFTPReader=Custom(FileContents){
			logic
				onTuple FileContents :
				{
					printStringLn("HdfsWriter line : " + line + " fileName : " + fileName) ;
			}
		
		}
  
		stream<rstring out_file_name, uint64 size> HdfsWriter = HDFS2FileSink(FileContents)
		{
			logic
				onTuple FileContents :
				{
					printStringLn("HdfsWriter line : " + line + " fileName : " + fileName) ;
				}

			param
				authKeytab : $authKeytab ;
				authPrincipal : $authPrincipal ;
				configPath : "etc" ;
				fileAttributeName : "fileName" ;
//				vmArg : "-Djava.security.krb5.conf=/path/krb5.conf" ;
		}

		//prepare command stream
		stream<rstring command, rstring file> CommandStream as OUT = Custom(HdfsWriter as IN)
		{
			logic
				state : 
				{
					mutable rstring fileName = "" ;
					mutable int32 position1 = 0 ;
					mutable int32 position2 = 0 ;
					mutable int32 leng = 0 ;
				}
				onTuple IN :
				{
					printStringLn("file name " + IN.out_file_name) ;
					fileName =  IN.out_file_name ;
					// extract only file name without path
					leng = length(fileName) ;
					position1 = findLast(fileName, "/", leng - 1) ;
					fileName = substring(fileName, position1 + 1, leng - 1) ;
					
					submit({ command = "rm", file = fileName }, OUT) ;
				}

			config
				placement : partitionColocation("DELETE") ;
		}

		() as printCommandStream=Custom(CommandStream){
			logic
				onTuple CommandStream :
				{
					printStringLn("printCommandStream line : " + command + " fileName : " + file) ;
			}
		
		}
 	
		stream<boolean success, rstring fileName> DeleteFtpFile as OUT = FTPCommand(CommandStream as IN)
		{
				param
				protocol : $protocol  ;
				filename : IN.file ;
				command : IN.command ;
				host : $host ;
				path : $path  ;
				username : $username  ;
				password : $password  ;
				connectionCloseMode : never ;
				curlVerbose : false;
				
			output
				OUT : fileName = IN.file, success = Success() ;
			config
				placement : partitionColocation("DELETE") ;
		}
		
		() as printDeleteFtpFile=Custom(DeleteFtpFile){
			logic
				onTuple DeleteFtpFile :
				{
					printStringLn("DeleteFtpFile line : " + (rstring)success + " fileName : " + fileName) ;
			}
		
		}

}

Thank you for making modifications to my existing code! I will implement it and document the progress in this comment thread.

Works like a charm. Thanks! I will test it with heavier sized files and let you know.