26
26
* Copyright 2017 RackTop Systems.
27
27
* Copyright (c) 2017 Open-E, Inc. All Rights Reserved.
28
28
* Copyright (c) 2019, 2020 by Christian Schwarz. All rights reserved.
29
+ * Copyright (c) 2021 Rich Ercolani.
29
30
*/
30
31
31
32
/*
@@ -96,6 +97,19 @@ static int g_fd = -1;
96
97
static pthread_mutex_t g_lock = PTHREAD_MUTEX_INITIALIZER ;
97
98
static int g_refcount ;
98
99
100
+ #ifdef __linux__
101
+ #ifndef F_SETPIPE_SZ
102
+ #define F_SETPIPE_SZ (F_SETLEASE + 7)
103
+ #endif /* F_SETPIPE_SZ */
104
+
105
+ #ifndef F_GETPIPE_SZ
106
+ #define F_GETPIPE_SZ (F_GETLEASE + 7)
107
+ #endif /* F_GETPIPE_SZ */
108
+ #endif
109
+
110
+ static unsigned long lzc_get_pipe_max (void );
111
+ static void lzc_set_pipe_max (int infd );
112
+
99
113
#ifdef ZFS_DEBUG
100
114
static zfs_ioc_t fail_ioc_cmd = ZFS_IOC_LAST ;
101
115
static zfs_errno_t fail_ioc_err ;
@@ -645,6 +659,92 @@ lzc_send_resume(const char *snapname, const char *from, int fd,
645
659
resumeoff , NULL ));
646
660
}
647
661
662
+ static unsigned long
663
+ lzc_get_pipe_max ()
664
+ {
665
+ /* FreeBSD automatically grows to 64k */
666
+ unsigned long max_psize = 65536 ;
667
+ #ifdef __linux__
668
+ FILE * procf = fopen ("/proc/sys/fs/pipe-max-size" , "re" );
669
+
670
+ if (procf != NULL ) {
671
+ if (fscanf (procf , "%lu" , & max_psize ) <= 0 ) {
672
+ max_psize = max_psize ;
673
+ }
674
+ fclose (procf );
675
+ }
676
+ #endif
677
+ return (max_psize );
678
+ }
679
+
680
+ static void
681
+ lzc_set_pipe_max (int infd )
682
+ {
683
+ #ifdef __linux__
684
+ unsigned long max_psize = lzc_get_pipe_max ();
685
+ long cur_psize ;
686
+ cur_psize = fcntl (infd , F_GETPIPE_SZ );
687
+ if (cur_psize > 0 &&
688
+ max_psize > (unsigned long ) cur_psize )
689
+ fcntl (infd , F_SETPIPE_SZ ,
690
+ max_psize );
691
+ #endif
692
+ }
693
+
694
+
695
+ struct sendargs {
696
+ int ioctlfd ;
697
+ int inputfd ;
698
+ int outputfd ;
699
+ };
700
+ typedef struct sendargs sendargs_t ;
701
+
702
+ static void *
703
+ do_send_output (void * voidargs )
704
+ {
705
+ sendargs_t * args = (sendargs_t * )voidargs ;
706
+ sigset_t sigs ;
707
+ int buflen = lzc_get_pipe_max ();
708
+
709
+ /*
710
+ * See the comment above the close() call for why
711
+ * we can't just die from SIGPIPE.
712
+ */
713
+ sigemptyset (& sigs );
714
+ sigaddset (& sigs , SIGPIPE );
715
+ pthread_sigmask (SIG_BLOCK , & sigs , NULL );
716
+
717
+
718
+ int err = 1 ;
719
+ #ifdef __linux__
720
+ while (err > 0 ) {
721
+ err = splice (args -> inputfd , NULL , args -> outputfd , NULL , buflen ,
722
+ SPLICE_F_MORE );
723
+ }
724
+ #else
725
+ void * buf = calloc (1 , buflen );
726
+ while (err > 0 ) {
727
+ err = read (args -> inputfd , buf , buflen );
728
+ if (err <= 0 ) {
729
+ break ;
730
+ }
731
+ err = write (args -> outputfd , buf , err );
732
+ }
733
+ free (buf );
734
+ #endif
735
+ if (err < 0 ) {
736
+ err = errno ;
737
+ }
738
+ /*
739
+ * If we just return here, the other thread often blocks
740
+ * indefinitely on the ioctl completing, which won't happen
741
+ * because we stopped consuming the data. So we close the pipe
742
+ * here, and the other thread exits in a timely fashion.
743
+ */
744
+ close (args -> inputfd );
745
+ return ((void * )(uintptr_t )err );
746
+ }
747
+
648
748
/*
649
749
* snapname: The name of the "tosnap", or the snapshot whose contents we are
650
750
* sending.
@@ -664,9 +764,18 @@ lzc_send_resume_redacted(const char *snapname, const char *from, int fd,
664
764
{
665
765
nvlist_t * args ;
666
766
int err ;
767
+ int pipefd [2 ];
768
+ pthread_t mythread ;
769
+ sendargs_t sendargs ;
770
+ int threadstatus ;
771
+
772
+
773
+ err = pipe2 (pipefd , O_CLOEXEC );
774
+
775
+ lzc_set_pipe_max (pipefd [0 ]);
667
776
668
777
args = fnvlist_alloc ();
669
- fnvlist_add_int32 (args , "fd" , fd );
778
+ fnvlist_add_int32 (args , "fd" , pipefd [ 1 ] );
670
779
if (from != NULL )
671
780
fnvlist_add_string (args , "fromsnap" , from );
672
781
if (flags & LZC_SEND_FLAG_LARGE_BLOCK )
@@ -686,8 +795,32 @@ lzc_send_resume_redacted(const char *snapname, const char *from, int fd,
686
795
if (redactbook != NULL )
687
796
fnvlist_add_string (args , "redactbook" , redactbook );
688
797
798
+ sendargs .inputfd = pipefd [0 ];
799
+ sendargs .outputfd = fd ;
800
+ sendargs .ioctlfd = pipefd [1 ];
801
+
802
+ pthread_create (& mythread , NULL , do_send_output , (void * )& sendargs );
803
+
689
804
err = lzc_ioctl (ZFS_IOC_SEND_NEW , snapname , args , NULL );
805
+
806
+ close (pipefd [1 ]);
807
+
808
+ pthread_join (mythread , (void * )& threadstatus );
809
+
690
810
nvlist_free (args );
811
+
812
+
813
+ if (threadstatus != 0 ) {
814
+ err = threadstatus ;
815
+ /*
816
+ * if we don't set errno here, there are some edge cases
817
+ * where we wind up dying unexpectedly with
818
+ * "internal error: [normal warning msg]: Success"
819
+ */
820
+ errno = threadstatus ;
821
+ }
822
+
823
+
691
824
return (err );
692
825
}
693
826
@@ -792,6 +925,7 @@ recv_impl(const char *snapname, nvlist_t *recvdprops, nvlist_t *localprops,
792
925
char * atp ;
793
926
int error ;
794
927
boolean_t payload = B_FALSE ;
928
+ struct stat sb ;
795
929
796
930
ASSERT3S (g_refcount , > , 0 );
797
931
VERIFY3S (g_fd , != , -1 );
@@ -811,6 +945,21 @@ recv_impl(const char *snapname, nvlist_t *recvdprops, nvlist_t *localprops,
811
945
* slashp = '\0' ;
812
946
}
813
947
948
+ /*
949
+ * The only way fstat can fail is if we do not have a valid file
950
+ * descriptor.
951
+ */
952
+ if (fstat (input_fd , & sb ) == -1 ) {
953
+ return (- errno );
954
+ }
955
+
956
+ /*
957
+ * It is not uncommon for gigabytes to be processed in zfs receive.
958
+ * Speculatively increase the buffer size if supported by the platform.
959
+ */
960
+ if (S_ISFIFO (sb .st_mode ))
961
+ lzc_set_pipe_max (input_fd );
962
+
814
963
/*
815
964
* The begin_record is normally a non-byteswapped BEGIN record.
816
965
* For resumable streams it may be set to any non-byteswapped
0 commit comments